Skip to content

Commit

Permalink
Implement compact long encoding for V1 message encoder/decoder.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Oct 12, 2018
1 parent 2ab4fa9 commit e51ae3f
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 10 deletions.
Expand Up @@ -69,7 +69,7 @@ private static class Escape extends RuntimeException {
private Address senderAddress;

private ProtocolMessage.Type type;
private int messageId;
private long messageId;
private int contentLength;
private byte[] content;
private int subjectLength;
Expand Down Expand Up @@ -112,7 +112,7 @@ protected void decode(
currentState = DecoderState.READ_MESSAGE_ID;
case READ_MESSAGE_ID:
try {
messageId = readInt(buffer);
messageId = readLong(buffer);
} catch (Escape e) {
return;
}
Expand Down Expand Up @@ -262,6 +262,123 @@ static int readIntSlow(ByteBuf buffer) {
return result;
}

static long readLong(ByteBuf buffer) {
if (buffer.readableBytes() < 9) {
return readLongSlow(buffer);
} else {
return readLongFast(buffer);
}
}

static long readLongFast(ByteBuf buffer) {
int b = buffer.readByte();
long result = b & 0x7F;
if ((b & 0x80) != 0) {
b = buffer.readByte();
result |= (b & 0x7F) << 7;
if ((b & 0x80) != 0) {
b = buffer.readByte();
result |= (b & 0x7F) << 14;
if ((b & 0x80) != 0) {
b = buffer.readByte();
result |= (b & 0x7F) << 21;
if ((b & 0x80) != 0) {
b = buffer.readByte();
result |= (long) (b & 0x7F) << 28;
if ((b & 0x80) != 0) {
b = buffer.readByte();
result |= (long) (b & 0x7F) << 35;
if ((b & 0x80) != 0) {
b = buffer.readByte();
result |= (long) (b & 0x7F) << 42;
if ((b & 0x80) != 0) {
b = buffer.readByte();
result |= (long) (b & 0x7F) << 49;
if ((b & 0x80) != 0) {
b = buffer.readByte();
result |= (long) b << 56;
}
}
}
}
}
}
}
}
return result;
}

static long readLongSlow(ByteBuf buffer) {
buffer.markReaderIndex();
int b = buffer.readByte();
long result = b & 0x7F;
if ((b & 0x80) != 0) {
if (buffer.readableBytes() == 0) {
buffer.resetReaderIndex();
throw ESCAPE;
}
b = buffer.readByte();
result |= (b & 0x7F) << 7;
if ((b & 0x80) != 0) {
if (buffer.readableBytes() == 0) {
buffer.resetReaderIndex();
throw ESCAPE;
}
b = buffer.readByte();
result |= (b & 0x7F) << 14;
if ((b & 0x80) != 0) {
if (buffer.readableBytes() == 0) {
buffer.resetReaderIndex();
throw ESCAPE;
}
b = buffer.readByte();
result |= (b & 0x7F) << 21;
if ((b & 0x80) != 0) {
if (buffer.readableBytes() == 0) {
buffer.resetReaderIndex();
throw ESCAPE;
}
b = buffer.readByte();
result |= (long) (b & 0x7F) << 28;
if ((b & 0x80) != 0) {
if (buffer.readableBytes() == 0) {
buffer.resetReaderIndex();
throw ESCAPE;
}
b = buffer.readByte();
result |= (long) (b & 0x7F) << 35;
if ((b & 0x80) != 0) {
if (buffer.readableBytes() == 0) {
buffer.resetReaderIndex();
throw ESCAPE;
}
b = buffer.readByte();
result |= (long) (b & 0x7F) << 42;
if ((b & 0x80) != 0) {
if (buffer.readableBytes() == 0) {
buffer.resetReaderIndex();
throw ESCAPE;
}
b = buffer.readByte();
result |= (long) (b & 0x7F) << 49;
if ((b & 0x80) != 0) {
if (buffer.readableBytes() == 0) {
buffer.resetReaderIndex();
throw ESCAPE;
}
b = buffer.readByte();
result |= (long) b << 56;
}
}
}
}
}
}
}
}
return result;
}

static String readString(ByteBuf buffer, int length, Charset charset) {
if (buffer.isDirect()) {
final String result = buffer.toString(buffer.readerIndex(), length, charset);
Expand Down
Expand Up @@ -73,7 +73,7 @@ private void encodeMessage(ProtocolMessage message, ByteBuf buffer) {
buffer.writeByte(message.type().id());

// Write the message ID as a variable length integer
writeInt(buffer, message.id());
writeLong(buffer, message.id());

final byte[] payload = message.payload();

Expand Down Expand Up @@ -130,6 +130,64 @@ private void writeInt(ByteBuf buf, int value) {
}
}

private void writeLong(ByteBuf buf, long value) {
if (value >>> 7 == 0) {
buf.writeByte((byte) value);
} else if (value >>> 14 == 0) {
buf.writeByte((byte) ((value & 0x7F) | 0x80));
buf.writeByte((byte) (value >>> 7));
} else if (value >>> 21 == 0) {
buf.writeByte((byte) ((value & 0x7F) | 0x80));
buf.writeByte((byte) (value >>> 7 | 0x80));
buf.writeByte((byte) (value >>> 14));
} else if (value >>> 28 == 0) {
buf.writeByte((byte) ((value & 0x7F) | 0x80));
buf.writeByte((byte) (value >>> 7 | 0x80));
buf.writeByte((byte) (value >>> 14 | 0x80));
buf.writeByte((byte) (value >>> 21));
} else if (value >>> 35 == 0) {
buf.writeByte((byte) ((value & 0x7F) | 0x80));
buf.writeByte((byte) (value >>> 7 | 0x80));
buf.writeByte((byte) (value >>> 14 | 0x80));
buf.writeByte((byte) (value >>> 21 | 0x80));
buf.writeByte((byte) (value >>> 28));
} else if (value >>> 42 == 0) {
buf.writeByte((byte) ((value & 0x7F) | 0x80));
buf.writeByte((byte) (value >>> 7 | 0x80));
buf.writeByte((byte) (value >>> 14 | 0x80));
buf.writeByte((byte) (value >>> 21 | 0x80));
buf.writeByte((byte) (value >>> 28 | 0x80));
buf.writeByte((byte) (value >>> 35));
} else if (value >>> 49 == 0) {
buf.writeByte((byte) ((value & 0x7F) | 0x80));
buf.writeByte((byte) (value >>> 7 | 0x80));
buf.writeByte((byte) (value >>> 14 | 0x80));
buf.writeByte((byte) (value >>> 21 | 0x80));
buf.writeByte((byte) (value >>> 28 | 0x80));
buf.writeByte((byte) (value >>> 35 | 0x80));
buf.writeByte((byte) (value >>> 42));
} else if (value >>> 56 == 0) {
buf.writeByte((byte) ((value & 0x7F) | 0x80));
buf.writeByte((byte) (value >>> 7 | 0x80));
buf.writeByte((byte) (value >>> 14 | 0x80));
buf.writeByte((byte) (value >>> 21 | 0x80));
buf.writeByte((byte) (value >>> 28 | 0x80));
buf.writeByte((byte) (value >>> 35 | 0x80));
buf.writeByte((byte) (value >>> 42 | 0x80));
buf.writeByte((byte) (value >>> 49));
} else {
buf.writeByte((byte) ((value & 0x7F) | 0x80));
buf.writeByte((byte) (value >>> 7 | 0x80));
buf.writeByte((byte) (value >>> 14 | 0x80));
buf.writeByte((byte) (value >>> 21 | 0x80));
buf.writeByte((byte) (value >>> 28 | 0x80));
buf.writeByte((byte) (value >>> 35 | 0x80));
buf.writeByte((byte) (value >>> 42 | 0x80));
buf.writeByte((byte) (value >>> 49 | 0x80));
buf.writeByte((byte) (value >>> 56));
}
}

@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
try {
Expand Down
Expand Up @@ -279,7 +279,7 @@ private void timeoutAllCallbacks() {

@Override
public CompletableFuture<Void> sendAsync(Address address, String type, byte[] payload) {
int messageId = messageIdGenerator.incrementAndGet();
long messageId = messageIdGenerator.incrementAndGet();
ProtocolRequest message = new ProtocolRequest(
messageId,
returnAddress,
Expand All @@ -305,7 +305,7 @@ public CompletableFuture<byte[]> sendAndReceive(Address address, String type, by

@Override
public CompletableFuture<byte[]> sendAndReceive(Address address, String type, byte[] payload, Duration timeout, Executor executor) {
int messageId = messageIdGenerator.incrementAndGet();
long messageId = messageIdGenerator.incrementAndGet();
ProtocolRequest message = new ProtocolRequest(
messageId,
returnAddress,
Expand Down
Expand Up @@ -60,10 +60,10 @@ public static Type forId(int id) {
}
}

private final int id;
private final long id;
private final byte[] payload;

protected ProtocolMessage(int id, byte[] payload) {
protected ProtocolMessage(long id, byte[] payload) {
this.id = id;
this.payload = payload;
}
Expand All @@ -78,7 +78,7 @@ public boolean isReply() {
return type() == Type.REPLY;
}

public int id() {
public long id() {
return id;
}

Expand Down
Expand Up @@ -89,7 +89,7 @@ public static Status forId(int id) {

private final Status status;

public ProtocolReply(int id, byte[] payload, Status status) {
public ProtocolReply(long id, byte[] payload, Status status) {
super(id, payload);
this.status = status;
}
Expand Down
Expand Up @@ -26,7 +26,7 @@ public final class ProtocolRequest extends ProtocolMessage {
private final Address sender;
private final String subject;

public ProtocolRequest(int id, Address sender, String subject, byte[] payload) {
public ProtocolRequest(long id, Address sender, String subject, byte[] payload) {
super(id, payload);
this.sender = sender;
this.subject = subject;
Expand Down

0 comments on commit e51ae3f

Please sign in to comment.