Skip to content

Commit

Permalink
Add tags support
Browse files Browse the repository at this point in the history
Went with a simpler design than originally planned.

We might could if performance ever turns out insufficient,
but my thinking here is that if a pipeline needs higher perf,
it'd be better to create a dedicated listener and avoid
using tags altogether.

Closes #32.
  • Loading branch information
Pierre Carrier committed May 5, 2014
1 parent 4e6fdd6 commit 2fad58e
Show file tree
Hide file tree
Showing 17 changed files with 103 additions and 39 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,5 +158,7 @@ Note that 1-fragment fragmented messages are perfectly possible.
Needs to increment with each message for hole detection.
- Bytes 12-15: signed, big-endian, 32-bit integer below 2,147,483,647. Total byte length of the message.
- Bytes 16-19: big-endian, 32-bit MurmurHash3 hash of the total message payload.
- Bytes 20-23: zeroes. Reserved, might be used in later revisions.
- Bytes 24-: bytes (UTF-8 by default). Payload. Will only read the payload length.
- Bytes 20-21: unsigned, big-endian, 16-bit integer. `taglength`: Size used to represent tags.
- Bytes 22-23: zeroes. Reserved, might be used in later revisions.
- Bytes 24-(24+taglength): Bytes. List of tags (`\0`-separated UTF-8 strings; can be `\0`-terminated or not).
- Bytes (24+taglength)-: Bytes. Payload. Will only read the payload length.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ subprojects {

dependencies {
compile 'org.projectlombok:lombok:1.12.6'
compile 'com.google.guava:guava:16.0.1'
compile 'com.google.guava:guava:17.0'
compile 'com.typesafe:config:1.2.0'
compile 'com.eclipsesource.minimal-json:minimal-json:0.9.1'
compile "io.netty:netty-handler:${nettyVersion}"
Expand Down
4 changes: 1 addition & 3 deletions plog-api/src/main/java/com/airbnb/plog/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import io.netty.buffer.ByteBufHolder;

public interface Message extends ByteBufHolder {
public interface Message extends ByteBufHolder, Tagged {
byte[] asBytes();

byte[][] getTags();
}
8 changes: 5 additions & 3 deletions plog-api/src/main/java/com/airbnb/plog/MessageImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;

import java.util.Collection;

@Data
@EqualsAndHashCode(callSuper = false)
public class MessageImpl extends DefaultByteBufHolder implements Message {
private final byte[][] tags;
private final Collection<String> tags;

@Getter(AccessLevel.NONE)
private byte[] memoizedBytes;

public MessageImpl(ByteBuf data, byte[][] tags) {
public MessageImpl(ByteBuf data, Collection<String> tags) {
super(data);
this.tags = tags;
}

public static Message fromBytes(ByteBufAllocator alloc, byte[] bytes, byte[][] tags) {
public static Message fromBytes(ByteBufAllocator alloc, byte[] bytes, Collection<String> tags) {
final ByteBuf data = alloc.buffer(bytes.length, bytes.length);
data.writeBytes(bytes);
return new MessageImpl(data, tags);
Expand Down
7 changes: 7 additions & 0 deletions plog-api/src/main/java/com/airbnb/plog/Tagged.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.airbnb.plog;

import java.util.Collection;

public interface Tagged {
Collection<String> getTags();
}
16 changes: 13 additions & 3 deletions plog-api/src/test/groovy/com/airbnb/plog/MessageImplTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,18 @@ package com.airbnb.plog
import io.netty.buffer.ByteBufAllocator

class MessageImplTest extends GroovyTestCase {
void testCreateAndRead() {
def content = 'foo'.bytes
assert MessageImpl.fromBytes(ByteBufAllocator.DEFAULT, content).asBytes() == content
final payload = 'foo'.bytes
final tags = ['bar', 'baz']
final simpleMsg = MessageImpl.fromBytes(ByteBufAllocator.DEFAULT, payload, null)
final taggedMsg = MessageImpl.fromBytes(ByteBufAllocator.DEFAULT, payload, tags)

void testRead() {
assert simpleMsg.asBytes() == payload
assert taggedMsg.asBytes() == payload
}

void testToString() {
assert simpleMsg.toString() == 'foo'
assert taggedMsg.toString() == '[bar,baz] foo'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public class ConsoleOutputHandler extends SimpleChannelInboundHandler<Message> i

@Override
protected final void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
final byte[][] tags = msg.getTags();
target.println(msg.toString());
logged.incrementAndGet();
}
Expand Down
2 changes: 0 additions & 2 deletions plog-server/src/main/java/com/airbnb/plog/PlogServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import com.typesafe.config.ConfigFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;

import java.net.UnknownHostException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import lombok.extern.slf4j.Slf4j;

import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -94,22 +95,23 @@ protected void decode(ChannelHandlerContext ctx, Fragment fragment, List<Object>
if (fragment.isAlone()) {
if (detector != null)
detector.reportNewMessage(fragment.getMsgId());
pushPayloadIfValid(fragment.content(), fragment.getMsgHash(), 1, out);
pushPayloadIfValid(fragment.content(), fragment.getMsgHash(), 1, fragment.getTags(), out);
} else {
message = ingestIntoIncompleteMessage(fragment);
if (message.isComplete())
pushPayloadIfValid(message.getPayload(), message.getChecksum(), message.getFragmentCount(), out);
pushPayloadIfValid(message.getPayload(), message.getChecksum(), message.getFragmentCount(), message.getTags(), out);
}
}

private void pushPayloadIfValid(final ByteBuf payload,
final int expectedHash,
final int fragmentCount,
Collection<String> tags,
List<Object> out) {
final byte[] bytes = ByteBufs.toByteArray(payload);
final int computedHash = Hashing.murmur3_32().hashBytes(bytes).asInt();
if (computedHash == expectedHash) {
out.add(new MessageImpl(payload, null));
out.add(new MessageImpl(payload, tags));
this.stats.receivedV0MultipartMessage();
} else {
log.warn("Client sent hash {}, not matching computed hash {} for bytes {} (fragment count {})",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package com.airbnb.plog.fragmentation;

import com.airbnb.plog.Tagged;
import com.airbnb.plog.utils.ByteBufs;
import com.google.common.base.Charsets;
import com.google.common.base.Splitter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultByteBufHolder;
import io.netty.channel.socket.DatagramPacket;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.ToString;

import java.nio.ByteOrder;
import java.util.Collection;
import java.util.Collections;

public class Fragment extends DefaultByteBufHolder {
@ToString(exclude = {"tagsBuffer"})
public class Fragment extends DefaultByteBufHolder implements Tagged {
static final int HEADER_SIZE = 24;

@Getter
Expand All @@ -23,14 +32,25 @@ public class Fragment extends DefaultByteBufHolder {
@Getter
private final int msgHash;

public Fragment(int fragmentCount, int fragmentIndex, int fragmentSize, long msgId, int totalLength, int msgHash, ByteBuf data) {
@Getter(AccessLevel.MODULE)
private final ByteBuf tagsBuffer;

public Fragment(int fragmentCount,
int fragmentIndex,
int fragmentSize,
long msgId,
int totalLength,
int msgHash,
ByteBuf data,
ByteBuf tagsBuffer) {
super(data);
this.fragmentCount = fragmentCount;
this.fragmentIndex = fragmentIndex;
this.fragmentSize = fragmentSize;
this.msgId = msgId;
this.totalLength = totalLength;
this.msgHash = msgHash;
this.tagsBuffer = tagsBuffer;
}

public static Fragment fromDatagram(DatagramPacket packet) {
Expand All @@ -56,14 +76,27 @@ public static Fragment fromDatagram(DatagramPacket packet) {
throw new IllegalArgumentException("Cannot support length " + totalLength + " > 2^31");

final int msgHash = content.getInt(16);
final ByteBuf payload = content.slice(HEADER_SIZE, length - HEADER_SIZE);

final int tagsBufferLength = content.getUnsignedShort(20);
final ByteBuf tagsBuffer = tagsBufferLength == 0 ? null : content.slice(HEADER_SIZE, tagsBufferLength);

final ByteBuf payload = content.slice(HEADER_SIZE + tagsBufferLength, length - HEADER_SIZE - tagsBufferLength);

final int port = packet.sender().getPort();
final long msgId = (((long) port) << Integer.SIZE) + idRightPart;
return new Fragment(fragmentCount, fragmentIndex, fragmentSize, msgId, totalLength, msgHash, payload);

return new Fragment(fragmentCount, fragmentIndex, fragmentSize, msgId, totalLength, msgHash, payload, tagsBuffer);
}

boolean isAlone() {
return fragmentCount == 1;
}

@Override
public Collection<String> getTags() {
if (tagsBuffer == null)
return Collections.emptyList();
final String seq = new String(ByteBufs.toByteArray(tagsBuffer), Charsets.UTF_8);
return Splitter.on('\0').omitEmptyStrings().splitToList(seq);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.airbnb.plog.fragmentation;

import com.airbnb.plog.Tagged;
import com.airbnb.plog.stats.StatisticsReporter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
Expand All @@ -8,10 +9,11 @@
import lombok.extern.slf4j.Slf4j;

import java.util.BitSet;
import java.util.Collection;

@Slf4j
@ToString(exclude = {"payload"})
public class FragmentedMessage {
public class FragmentedMessage implements Tagged {
private final ByteBuf payload;
@Getter
private final BitSet receivedFragments;
Expand All @@ -23,6 +25,8 @@ public class FragmentedMessage {
private final int checksum;
@Getter
private boolean complete = false;
@Getter
private Collection<String> tags;

private FragmentedMessage(ByteBufAllocator alloc,
final int totalLength,
Expand Down Expand Up @@ -55,6 +59,7 @@ public final void ingestFragment(final Fragment fragment, StatisticsReporter sta
final int fragmentIndex = fragment.getFragmentIndex();
final boolean fragmentIsLast = (fragmentIndex == fragmentCount - 1);
final int foffset = fragmentSize * fragmentIndex;
final ByteBuf fragmentTagsBuffer = fragment.getTagsBuffer();

final int lengthOfCurrentFragment = fragmentPayload.capacity();
final boolean validFragmentLength;
Expand All @@ -74,6 +79,9 @@ public final void ingestFragment(final Fragment fragment, StatisticsReporter sta
return;
}

if (fragmentTagsBuffer != null)
this.tags = fragment.getTags();

// valid fragment
synchronized (receivedFragments) {
receivedFragments.set(fragmentIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import com.typesafe.config.Config;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.airbnb.plog.stats;

import com.airbnb.plog.handlers.Handler;
import com.airbnb.plog.fragmentation.Defragmenter;
import com.airbnb.plog.handlers.Handler;
import com.eclipsesource.json.JsonArray;
import com.eclipsesource.json.JsonObject;
import com.google.common.cache.CacheStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class ProtocolDecoderTest extends GroovyTestCase {
void testForwardsFragments() {
runTest { EmbeddedChannel channel, StatisticsReporter stats ->
// I'm so sad fragment count and index are in this order
final payload = (0..1) + (5..2) + (6..30)
final payload = (0..1) + (5..2) + (6..19) + [0, 0, 0, 0] + (24 .. 30)
insert(payload as byte[], channel)
final frag = (Fragment) channel.readInbound()
assert frag.fragmentCount == (5 << 8) + 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.netty.buffer.Unpooled

class FragmentTest extends GroovyTestCase {
void testRejectsTooSmallForHeader() {
final validPayload = (0..1) + (5..2) + (6..23)
final validPayload = (0..1) + (5..2) + (6..19) + [0, 0, 0, 0]
fragmentFromPayload(validPayload)
final truncated = validPayload[0..22]
shouldFail IllegalArgumentException, {
Expand Down Expand Up @@ -42,19 +42,19 @@ class FragmentTest extends GroovyTestCase {
}

void testAlone() {
assert fragmentFromPayload((0..1) + [0, 1, 0, 0] + (6..24)).isAlone()
assert fragmentFromPayload((0..1) + [0, 1, 0, 0] + (6..19) + [0, 0, 0, 0]).isAlone()
}

void testNotAlone() {
assert !fragmentFromPayload((0..1) + [0, 10, 0, 5] + (6..24)).isAlone()
assert !fragmentFromPayload((0..1) + [0, 10, 0, 5] + (6..19) + [0, 0, 0, 0]).isAlone()
}

void testNotAloneButFirst() {
assert !fragmentFromPayload((0..1) + [0, 2, 0, 0] + (6..24)).isAlone()
assert !fragmentFromPayload((0..1) + [0, 2, 0, 0] + (6..19) + [0, 0, 0, 0]).isAlone()
}

void testToString() {
final fragment = fragmentFromPayload((0..1) + (5..2) + (6..24) as byte[])
final fragment = fragmentFromPayload((0..1) + (5..2) + (6..19) + [0, 0, 0, 0] as byte[])
final expected = 'Fragment(fragmentCount=1284, fragmentIndex=770, fragmentSize=1543, msgId=38789515787, totalLength=202182159, msgHash=269554195)'
assert fragment.toString() == expected
}
Expand All @@ -70,4 +70,4 @@ class FragmentTest extends GroovyTestCase {
private static Fragment fragmentFromPayload(Collection payload) {
fragmentFromPayload(payload as byte[])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ class FragmentedMessageTest extends GroovyTestCase {
private static final StatisticsReporter stats = new SimpleStatisticsReporter()

// Always go with id=0 & hash=0
private static create(int count, int index, int fsize, int length, byte[] payload) {
FragmentedMessage.fromFragment(
new Fragment(count, index, fsize, 0, length, 0, Unpooled.wrappedBuffer(payload)), stats)
private static create(int count, int index, int fsize, int length, byte[] payload, Collection<String> tags = []) {
final tagRep = Unpooled.wrappedBuffer(tags.join('\0').bytes)
FragmentedMessage.fromFragment(new Fragment(count, index, fsize, 0, length, 0, Unpooled.wrappedBuffer(payload), tagRep), stats)
}

private
static ingest(FragmentedMessage msg, int count, int index, int fsize, int length, byte[] payload) {
msg.ingestFragment(new Fragment(count, index, fsize, 0, length, 0, Unpooled.wrappedBuffer(payload)), stats)
static ingest(FragmentedMessage msg, int count, int index, int fsize, int length, byte[] payload, Collection<String> tags = []) {
final tagRep = Unpooled.wrappedBuffer(tags.join('\0').bytes)
msg.ingestFragment(new Fragment(count, index, fsize, 0, length, 0, Unpooled.wrappedBuffer(payload), tagRep), stats)
}

private static
Expand Down Expand Up @@ -119,9 +120,9 @@ class FragmentedMessageTest extends GroovyTestCase {
void testCatchesChecksumInconsistency() {
final long initial = stats.receivedV0InvalidMultipartFragment(2, 5)
final msg = FragmentedMessage.fromFragment(
new Fragment(5, 2, 10, 0, 45, 42, Unpooled.wrappedBuffer('0123456789'.bytes)), stats)
new Fragment(5, 2, 10, 0, 45, 42, Unpooled.wrappedBuffer('0123456789'.bytes), Unpooled.EMPTY_BUFFER), stats)
assert stats.receivedV0InvalidMultipartFragment(2, 5) == initial + 1
msg.ingestFragment(new Fragment(5, 2, 10, 0, 45, 24, Unpooled.wrappedBuffer('0123456789'.bytes)), stats)
msg.ingestFragment(new Fragment(5, 2, 10, 0, 45, 24, Unpooled.wrappedBuffer('0123456789'.bytes), Unpooled.EMPTY_BUFFER), stats)
assert stats.receivedV0InvalidMultipartFragment(2, 5) == initial + 3
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package com.airbnb.plog.listeners

import com.typesafe.config.ConfigFactory

class UDPListenerTest extends GroovyTestCase {
Expand Down

0 comments on commit 2fad58e

Please sign in to comment.