Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-7153: Fix parser to handle incomplete Redis packet #5044

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.processors.rest.protocols.tcp.redis;

import org.apache.commons.lang.RandomStringUtils;
import org.junit.Assert;
import redis.clients.jedis.Jedis;

Expand Down Expand Up @@ -70,4 +71,21 @@ public void testSelect() throws Exception {
Assert.assertEquals("v0", jedis.get("k0"));
}
}

//IGNITE-7153
public void testSetBigObject1() throws Exception {
String randomString1 = RandomStringUtils.random(8 << 10, true, true); //8192
String randomString2 = RandomStringUtils.random(10 << 10, true, true); //10240
String randomString3 = RandomStringUtils.random(8 << 11, true, true); //16384
try (Jedis jedis = pool.getResource()) {
jedis.set("b1".getBytes(), randomString1.getBytes());
Assert.assertEquals(randomString1, jedis.get("b1"));

jedis.set("b2".getBytes(), randomString2.getBytes());
Assert.assertEquals(randomString2, jedis.get("b2"));

jedis.set("b3".getBytes(), randomString3.getBytes());
Assert.assertEquals(randomString3, jedis.get("b3"));
}
}
}
Expand Up @@ -156,7 +156,7 @@ public GridTcpRestParser(boolean routerClient, Marshaller marsh) {
break;

case REDIS:
res = GridRedisProtocolParser.readArray(buf);
res = parseRedisPacket(ses, buf, state);

break;

Expand Down Expand Up @@ -246,6 +246,45 @@ else if (msg instanceof GridRouterRequest) {
}
}

/**
* Parses redis protocol message.
*
* @param ses Session.
* @param buf Buffer containing not parsed bytes.
* @param state Current parser state.
* @return Parsed packet.s
* @throws IOException If packet cannot be parsed.
* @throws IgniteCheckedException If deserialization error occurred.
*/
private GridClientMessage parseRedisPacket(GridNioSession ses, ByteBuffer buf, ParserState state)
throws IOException, IgniteCheckedException {
assert state.packetType() == GridClientPacketType.REDIS;

ByteArrayOutputStream tmp = state.buffer();

if(GridRedisProtocolParser.validatePacket(buf)) {
//single parsable packet
return GridRedisProtocolParser.readArray(buf);
} else if(GridRedisProtocolParser.validatePacketFooter(buf)) { //Scenario 2

int fullLength = tmp.size() + buf.limit();
ByteBuffer fullPacket = ByteBuffer.allocate(fullLength);
fullPacket.put(tmp.toByteArray());
fullPacket = fullPacket.put(buf);
fullPacket.flip();

tmp.reset();
return GridRedisProtocolParser.readArray(fullPacket);
} else { //Scenario 1 or 3

byte[] data = new byte[buf.limit()];
buf.get(data);
tmp.write(data);

return null;
}
}

/**
* Parses memcache protocol message.
*
Expand Down
Expand Up @@ -17,10 +17,13 @@

package org.apache.ignite.internal.processors.rest.protocols.tcp.redis;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestParser;

/**
* Parser to decode/encode Redis protocol (RESP) requests.
Expand Down Expand Up @@ -112,6 +115,60 @@ public static String readBulkStr(ByteBuffer buf) throws IgniteCheckedException {
return new String(bulkStr);
}

/*
* A validation method to check packet completeness.
* return true if and only if
* 1. First byte is ARRAY (43)
* 2. Last two bytes are CR(13) LF(10)
*
* Otherwise, return false representing this is an incomplete packet with three possible scenarios:
* 1. A beginning packet with leading ARRAY byte
* 2. A continual packet with ending CRLF bytes.
* 3. A continual packet with neither conditions above.
*/
public static boolean validatePacket(ByteBuffer buf) {
return validatePacketHeader(buf) && validatePacketFooter(buf);
}

public static boolean validatePacketHeader(ByteBuffer buf) {
boolean result = true;

//mark at initial position
buf.mark();

if(buf.get() != ARRAY) {
result = false;
}

//reset to initial position
buf.reset();

return result;
}

public static boolean validatePacketFooter(ByteBuffer buf) {
boolean result = true;

//mark at initial position
buf.mark();


int limit = buf.limit();

assert limit > 2;

//check the final CR(last -2 ) and LF(last -1) byte
if (buf.get(limit - 2) != CR || buf.get(limit - 1) != LF) {
result = false;
}


//reset to initial position
buf.reset();

return result;
}

/**
* Counts elements in buffer.
*
Expand Down