Skip to content

Commit

Permalink
[FLINK-15171] [serialization] fix performance regression caused by to…
Browse files Browse the repository at this point in the history
…o many buffer allocations on string serialization
  • Loading branch information
shuttie authored and pnowojski committed Dec 19, 2019
1 parent 8ec545d commit 0486ab0
Showing 1 changed file with 39 additions and 103 deletions.
142 changes: 39 additions & 103 deletions flink-core/src/main/java/org/apache/flink/types/StringValue.java
Expand Up @@ -61,8 +61,11 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
private static final int HIGH_BIT2 = 0x1 << 13;

private static final int HIGH_BIT2_MASK = 0x3 << 6;



private static final int SHORT_STRING_MAX_LENGTH = 2048;

private static final ThreadLocal<char[]> charBuffer = ThreadLocal.withInitial(() -> new char[SHORT_STRING_MAX_LENGTH]);

private char[] value; // character value of the string value, not necessarily completely filled

private int len; // length of the string value
Expand Down Expand Up @@ -769,61 +772,28 @@ public static String readString(DataInput in) throws IOException {
// subtract one for the null length
len -= 1;

/* as we have no idea about byte-length of the serialized string, we cannot fully
* read it into memory buffer. But we can do it in an optimistic way:
* 1. In a happy case when the string is an us-ascii one, then byte_len == char_len
* 2. If we spot at least one character with code >= 127, then we reallocate the buffer
* to accommodate for the next characters.
*/

// happily assume that the string is an 7 bit us-ascii one
byte[] buf = new byte[len];
in.readFully(buf);

final char[] data = new char[len];
int charPosition = 0;
int bufSize = len;
int bytePosition = 0;

while (charPosition < len) {
if (bytePosition == bufSize) {
// there is at least `char count - char position` bytes left in case if all the
// remaining characters are 7 bit.
int minRemainingChars = len - charPosition;
// need to refill the buffer as we already reached its end.
// we also reuse the old buffer, as it's capacity must always be >= minRemainingChars.
in.readFully(buf, 0, minRemainingChars);
bytePosition = 0;
bufSize = minRemainingChars;
}
int c = buf[bytePosition++] & 255;
// non 7-bit path
final char[] data;
if (len > SHORT_STRING_MAX_LENGTH) {
data = new char[len];
} else {
data = charBuffer.get();
}

for (int i = 0; i < len; i++) {
int c = in.readUnsignedByte();
if (c >= HIGH_BIT) {
int shift = 7;
int curr;
c = c & 0x7f;
if (bytePosition == bufSize) {
int minRemainingChars = len - charPosition;
in.readFully(buf, 0, minRemainingChars);
bytePosition = 0;
bufSize = minRemainingChars;
}

while ((curr = buf[bytePosition++] & 255) >= HIGH_BIT) {
while ((curr = in.readUnsignedByte()) >= HIGH_BIT) {
c |= (curr & 0x7f) << shift;
shift += 7;
if (bytePosition == bufSize) {
int minRemainingChars = len - charPosition;
// may need to refill the buffer if char bytes are split between the buffers.
in.readFully(buf, 0, minRemainingChars);
bytePosition = 0;
bufSize = minRemainingChars;
}
}
c |= curr << shift;
}
data[charPosition++] = (char) c;
data[i] = (char) c;
}

return new String(data, 0, len);
}

Expand All @@ -837,84 +807,50 @@ public static final void writeString(CharSequence cs, DataOutput out) throws IOE
throw new IllegalArgumentException("CharSequence is too long.");
}


// buffer size needed to contain all the characters
int buflen = 0;
if (strlen < 8) {
// for small strings it's much faster to over-allocate a buffer sized for the worst-case scenario
// when all the chars are encoded as 3-byte sequences, than iterating over the source string.
buflen = strlen * 3;
} else {
for (int i = 0; i < strlen; i++) {
char c = cs.charAt(i);
if ((c & 0xff80) == 0) {
buflen++;
} else if (c > 0x07FF) {
buflen += 3;
} else {
buflen += 2;
}
}
}
byte[] buffer;
int position = 0;
// string is prefixed by it's variable length encoded size, which can take 1-5 bytes.
if (lenToWrite < HIGH_BIT) {
buflen += 1;
buffer = new byte[buflen];
buffer[position++] = (byte) lenToWrite;
out.write((byte) lenToWrite);
} else if (lenToWrite < HIGH_BIT14) {
buflen += 2;
buffer = new byte[buflen];
buffer[position++] = (byte)(lenToWrite | HIGH_BIT);
buffer[position++] = (byte)((lenToWrite >>> 7));
out.write((lenToWrite | HIGH_BIT));
out.write((lenToWrite >>> 7));
} else if (lenToWrite < HIGH_BIT21) {
buflen += 3;
buffer = new byte[buflen];
buffer[position++] = (byte)(lenToWrite | HIGH_BIT);
buffer[position++] = (byte)((lenToWrite >>> 7) | HIGH_BIT);
buffer[position++] = (byte)((lenToWrite >>> 14));
out.write(lenToWrite | HIGH_BIT);
out.write((lenToWrite >>> 7) | HIGH_BIT);
out.write((lenToWrite >>> 14));
} else if (lenToWrite < HIGH_BIT28) {
buflen += 4;
buffer = new byte[buflen];
buffer[position++] = (byte)(lenToWrite | HIGH_BIT);
buffer[position++] = (byte)((lenToWrite >>> 7) | HIGH_BIT);
buffer[position++] = (byte)((lenToWrite >>> 14) | HIGH_BIT);
buffer[position++] = (byte)((lenToWrite >>> 21));
out.write(lenToWrite | HIGH_BIT);
out.write((lenToWrite >>> 7) | HIGH_BIT);
out.write((lenToWrite >>> 14) | HIGH_BIT);
out.write((lenToWrite >>> 21));
} else {
buflen += 5;
buffer = new byte[buflen];
buffer[position++] = (byte)(lenToWrite | HIGH_BIT);
buffer[position++] = (byte)((lenToWrite >>> 7) | HIGH_BIT);
buffer[position++] = (byte)((lenToWrite >>> 14) | HIGH_BIT);
buffer[position++] = (byte)((lenToWrite >>> 21) | HIGH_BIT);
buffer[position++] = (byte)((lenToWrite >>> 28));
out.write(lenToWrite | HIGH_BIT);
out.write((lenToWrite >>> 7) | HIGH_BIT);
out.write((lenToWrite >>> 14) | HIGH_BIT);
out.write((lenToWrite >>> 21) | HIGH_BIT);
out.write((lenToWrite >>> 28));
}


// write the char data, variable length encoded
for (int i = 0; i < strlen; i++) {
int c = cs.charAt(i);

// manual loop unroll, as it performs much better on jdk8
if (c < HIGH_BIT) {
buffer[position++] = (byte)c;
out.write(c);
} else if (c < HIGH_BIT14) {
buffer[position++] = (byte)(c | HIGH_BIT);
buffer[position++] = (byte)((c >>> 7));
out.write(c | HIGH_BIT);
out.write((c >>> 7));
} else {
buffer[position++] = (byte)(c | HIGH_BIT);
buffer[position++] = (byte)((c >>> 7) | HIGH_BIT);
buffer[position++] = (byte)((c >>> 14));
out.write(c | HIGH_BIT);
out.write((c >>> 7) | HIGH_BIT);
out.write((c >>> 14));
}
}
out.write(buffer, 0, position);

} else {
out.write(0);
}
}

public static final void copyString(DataInput in, DataOutput out) throws IOException {
int len = in.readUnsignedByte();
out.writeByte(len);
Expand Down

0 comments on commit 0486ab0

Please sign in to comment.