Skip to content

Commit

Permalink
ARTEMIS-1573 Improve UTF translation allowing zero copy
Browse files Browse the repository at this point in the history
The UTF translations has been improved by:
- zero copy on array based buffers
- zero copy UTF length calculation
- faster array access using Netty PlatformDependent.get|putByte
- improved perf tests UTF8Test
  • Loading branch information
franz1981 committed Jan 8, 2018
1 parent 719adab commit 1954146
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 107 deletions.
Expand Up @@ -19,6 +19,7 @@
import java.lang.ref.SoftReference;

import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
Expand All @@ -34,7 +35,7 @@ public final class UTF8Util {

private static final boolean isTrace = ActiveMQUtilLogger.LOGGER.isTraceEnabled();

private static final ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<>();
private static final ThreadLocal<SoftReference<StringUtilBuffer>> currentBuffer = new ThreadLocal<>();

private UTF8Util() {
// utility class
Expand All @@ -48,16 +49,20 @@ public static void writeNullableString(ByteBuf buffer, final String val) {
}
}

private static void writeAsShorts(final ByteBuf buffer, final String val) {
for (int i = 0; i < val.length(); i++) {
buffer.writeShort((short) val.charAt(i));
}
}

public static void writeString(final ByteBuf buffer, final String val) {
int length = val.length();

buffer.writeInt(length);

if (length < 9) {
// If very small it's more performant to store char by char
for (int i = 0; i < val.length(); i++) {
buffer.writeShort((short) val.charAt(i));
}
writeAsShorts(buffer, val);
} else if (length < 0xfff) {
// Store as UTF - this is quicker than char by char for most strings
saveUTF(buffer, val);
Expand All @@ -68,149 +73,213 @@ public static void writeString(final ByteBuf buffer, final String val) {
}

public static void saveUTF(final ByteBuf out, final String str) {
StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer();

if (str.length() > 0xffff) {
throw ActiveMQUtilBundle.BUNDLE.stringTooLong(str.length());
}

final int len = UTF8Util.calculateUTFSize(str, buffer);
final int len = UTF8Util.calculateUTFSize(str);

if (len > 0xffff) {
throw ActiveMQUtilBundle.BUNDLE.stringTooLong(len);
}

out.writeShort((short) len);

if (len > buffer.byteBuffer.length) {
buffer.resizeByteBuffer(len);
final int stringLength = str.length();

if (UTF8Util.isTrace) {
// This message is too verbose for debug, that's why we are using trace here
ActiveMQUtilLogger.LOGGER.trace("Saving string with utfSize=" + len + " stringSize=" + stringLength);
}

if (len == (long) str.length()) {
for (int byteLocation = 0; byteLocation < len; byteLocation++) {
buffer.byteBuffer[byteLocation] = (byte) buffer.charBuffer[byteLocation];
if (out.hasArray()) {
out.ensureWritable(len);
final byte[] bytes = out.array();
final int writerIndex = out.writerIndex();
final int index = out.arrayOffset() + writerIndex;
if (PlatformDependent.hasUnsafe()) {
unsafeWriteUTF(str, bytes, index, stringLength);
} else {
writeUTF(str, bytes, index, stringLength);
}
out.writeBytes(buffer.byteBuffer, 0, len);
out.writerIndex(writerIndex + len);
} else {
if (UTF8Util.isTrace) {
// This message is too verbose for debug, that's why we are using trace here
ActiveMQUtilLogger.LOGGER.trace("Saving string with utfSize=" + len + " stringSize=" + str.length());
final StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer();
final byte[] bytes = buffer.borrowByteBuffer(len);
writeUTF(str, bytes, 0, stringLength);
out.writeBytes(bytes, 0, len);
}
}

private static int writeUTF(final CharSequence str, final byte[] bytes, final int index, final int length) {
int charCount = index;

for (int i = 0; i < length; i++) {
char charAtPos = str.charAt(i);
if (charAtPos <= 0x7f) {
bytes[charCount++] = (byte) charAtPos;
} else if (charAtPos >= 0x800) {
bytes[charCount++] = (byte) (0xE0 | charAtPos >> 12 & 0x0F);
bytes[charCount++] = (byte) (0x80 | charAtPos >> 6 & 0x3F);
bytes[charCount++] = (byte) (0x80 | charAtPos >> 0 & 0x3F);
} else {
bytes[charCount++] = (byte) (0xC0 | charAtPos >> 6 & 0x1F);
bytes[charCount++] = (byte) (0x80 | charAtPos >> 0 & 0x3F);
}
}

int stringLength = str.length();

int charCount = 0;

for (int i = 0; i < stringLength; i++) {
char charAtPos = buffer.charBuffer[i];
if (charAtPos <= 0x7f) {
buffer.byteBuffer[charCount++] = (byte) charAtPos;
} else if (charAtPos >= 0x800) {
buffer.byteBuffer[charCount++] = (byte) (0xE0 | charAtPos >> 12 & 0x0F);
buffer.byteBuffer[charCount++] = (byte) (0x80 | charAtPos >> 6 & 0x3F);
buffer.byteBuffer[charCount++] = (byte) (0x80 | charAtPos >> 0 & 0x3F);
} else {
buffer.byteBuffer[charCount++] = (byte) (0xC0 | charAtPos >> 6 & 0x1F);
buffer.byteBuffer[charCount++] = (byte) (0x80 | charAtPos >> 0 & 0x3F);
}
final int writtenBytes = (charCount - index);
return writtenBytes;
}

private static int unsafeWriteUTF(final CharSequence str, final byte[] bytes, final int index, final int length) {
int charCount = index;
for (int i = 0; i < length; i++) {
char charAtPos = str.charAt(i);
if (charAtPos <= 0x7f) {
PlatformDependent.putByte(bytes, charCount++, (byte) charAtPos);
} else if (charAtPos >= 0x800) {
PlatformDependent.putByte(bytes, charCount++, (byte) (0xE0 | charAtPos >> 12 & 0x0F));
PlatformDependent.putByte(bytes, charCount++, (byte) (0x80 | charAtPos >> 6 & 0x3F));
PlatformDependent.putByte(bytes, charCount++, (byte) (0x80 | charAtPos >> 0 & 0x3F));
} else {
PlatformDependent.putByte(bytes, charCount++, (byte) (0xC0 | charAtPos >> 6 & 0x1F));
PlatformDependent.putByte(bytes, charCount++, (byte) (0x80 | charAtPos >> 0 & 0x3F));
}
out.writeBytes(buffer.byteBuffer, 0, len);
}

final int writtenBytes = (charCount - index);
return writtenBytes;
}

public static String readUTF(final ActiveMQBuffer input) {
StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer();

final int size = input.readUnsignedShort();

if (size > buffer.byteBuffer.length) {
buffer.resizeByteBuffer(size);
}

if (size > buffer.charBuffer.length) {
buffer.resizeCharBuffer(size);
}

if (UTF8Util.isTrace) {
// This message is too verbose for debug, that's why we are using trace here
ActiveMQUtilLogger.LOGGER.trace("Reading string with utfSize=" + size);
}

int count = 0;
final byte[] bytes;
final int index;
if (input.byteBuf() != null && input.byteBuf().hasArray()) {
final ByteBuf byteBuf = input.byteBuf();
bytes = byteBuf.array();
index = byteBuf.arrayOffset() + input.readerIndex();
input.skipBytes(size);
} else {
bytes = buffer.borrowByteBuffer(size);
index = 0;
input.readBytes(bytes, 0, size);
}
final char[] chars = buffer.borrowCharBuffer(size);
if (PlatformDependent.hasUnsafe()) {
return unsafeReadUTF(bytes, index, chars, size);
} else {
return readUTF(bytes, index, chars, size);
}
}

private static String unsafeReadUTF(final byte[] bytes, final int index, final char[] chars, final int size) {
int count = index;
final int limit = index + size;
int byte1, byte2, byte3;
int charCount = 0;

input.readBytes(buffer.byteBuffer, 0, size);

while (count < size) {
byte1 = buffer.byteBuffer[count++];
while (count < limit) {
byte1 = PlatformDependent.getByte(bytes, count++);

if (byte1 >= 0 && byte1 <= 0x7F) {
buffer.charBuffer[charCount++] = (char) byte1;
chars[charCount++] = (char) byte1;
} else {
int c = byte1 & 0xff;
switch (c >> 4) {
case 0xc:
case 0xd:
byte2 = buffer.byteBuffer[count++];
buffer.charBuffer[charCount++] = (char) ((c & 0x1F) << 6 | byte2 & 0x3F);
byte2 = PlatformDependent.getByte(bytes, count++);
chars[charCount++] = (char) ((c & 0x1F) << 6 | byte2 & 0x3F);
break;
case 0xe:
byte2 = buffer.byteBuffer[count++];
byte3 = buffer.byteBuffer[count++];
buffer.charBuffer[charCount++] = (char) ((c & 0x0F) << 12 | (byte2 & 0x3F) << 6 | (byte3 & 0x3F) << 0);
byte2 = PlatformDependent.getByte(bytes, count++);
byte3 = PlatformDependent.getByte(bytes, count++);
chars[charCount++] = (char) ((c & 0x0F) << 12 | (byte2 & 0x3F) << 6 | (byte3 & 0x3F) << 0);
break;
default:
throw new InternalError("unhandled utf8 byte " + c);
}
}
}

return new String(buffer.charBuffer, 0, charCount);
return new String(chars, 0, charCount);
}

private static String readUTF(final byte[] bytes, final int index, final char[] chars, final int size) {
int count = index;
final int limit = index + size;
int byte1, byte2, byte3;
int charCount = 0;

while (count < limit) {
byte1 = bytes[count++];

if (byte1 >= 0 && byte1 <= 0x7F) {
chars[charCount++] = (char) byte1;
} else {
int c = byte1 & 0xff;
switch (c >> 4) {
case 0xc:
case 0xd:
byte2 = bytes[count++];
chars[charCount++] = (char) ((c & 0x1F) << 6 | byte2 & 0x3F);
break;
case 0xe:
byte2 = bytes[count++];
byte3 = bytes[count++];
chars[charCount++] = (char) ((c & 0x0F) << 12 | (byte2 & 0x3F) << 6 | (byte3 & 0x3F) << 0);
break;
default:
throw new InternalError("unhandled utf8 byte " + c);
}
}
}

return new String(chars, 0, charCount);
}

public static StringUtilBuffer getThreadLocalBuffer() {
SoftReference<StringUtilBuffer> softReference = UTF8Util.currenBuffer.get();
private static StringUtilBuffer getThreadLocalBuffer() {
SoftReference<StringUtilBuffer> softReference = UTF8Util.currentBuffer.get();
StringUtilBuffer value;
if (softReference == null) {
value = new StringUtilBuffer();
softReference = new SoftReference<>(value);
UTF8Util.currenBuffer.set(softReference);
UTF8Util.currentBuffer.set(softReference);
} else {
value = softReference.get();
}

if (value == null) {
value = new StringUtilBuffer();
softReference = new SoftReference<>(value);
UTF8Util.currenBuffer.set(softReference);
UTF8Util.currentBuffer.set(softReference);
}

return value;
}

public static void clearBuffer() {
SoftReference<StringUtilBuffer> ref = UTF8Util.currenBuffer.get();
if (ref.get() != null) {
SoftReference<StringUtilBuffer> ref = UTF8Util.currentBuffer.get();
if (ref != null && ref.get() != null) {
ref.clear();
}
}

public static int calculateUTFSize(final String str, final StringUtilBuffer stringBuffer) {
public static int calculateUTFSize(final String str) {
int calculatedLen = 0;

int stringLength = str.length();

if (stringLength > stringBuffer.charBuffer.length) {
stringBuffer.resizeCharBuffer(stringLength);
}

str.getChars(0, stringLength, stringBuffer.charBuffer, 0);

for (int i = 0; i < stringLength; i++) {
char c = stringBuffer.charBuffer[i];

for (int i = 0, stringLength = str.length(); i < stringLength; i++) {
final char c = str.charAt(i);
if (c <= 0x7f) {
calculatedLen++;
} else if (c >= 0x800) {
Expand All @@ -222,31 +291,24 @@ public static int calculateUTFSize(final String str, final StringUtilBuffer stri
return calculatedLen;
}

public static class StringUtilBuffer {
private static final class StringUtilBuffer {

public char[] charBuffer;
private char[] charBuffer = null;

public byte[] byteBuffer;
private byte[] byteBuffer = null;

public void resizeCharBuffer(final int newSize) {
if (newSize > charBuffer.length) {
public char[] borrowCharBuffer(final int newSize) {
if (charBuffer == null || newSize > charBuffer.length) {
charBuffer = new char[newSize];
}
return charBuffer;
}

public void resizeByteBuffer(final int newSize) {
if (newSize > byteBuffer.length) {
public byte[] borrowByteBuffer(final int newSize) {
if (byteBuffer == null || newSize > byteBuffer.length) {
byteBuffer = new byte[newSize];
}
}

public StringUtilBuffer() {
this(1024, 1024);
}

public StringUtilBuffer(final int sizeChar, final int sizeByte) {
charBuffer = new char[sizeChar];
byteBuffer = new byte[sizeByte];
return byteBuffer;
}

}
Expand Down

0 comments on commit 1954146

Please sign in to comment.