diff --git a/plog-client/build.gradle b/plog-client/build.gradle index d56109e..c9d0742 100644 --- a/plog-client/build.gradle +++ b/plog-client/build.gradle @@ -1,4 +1,6 @@ dependencies { compile project(':plog-api') + compile project(':plog-common') + testCompile project(':plog-server') } diff --git a/plog-client/src/main/java/com/airbnb/plog/client/fragmentation/Fragmenter.java b/plog-client/src/main/java/com/airbnb/plog/client/fragmentation/Fragmenter.java index b57b94f..ccfc8fa 100644 --- a/plog-client/src/main/java/com/airbnb/plog/client/fragmentation/Fragmenter.java +++ b/plog-client/src/main/java/com/airbnb/plog/client/fragmentation/Fragmenter.java @@ -1,9 +1,8 @@ package com.airbnb.plog.client.fragmentation; import com.airbnb.plog.Message; -import com.airbnb.plog.server.pipeline.ByteBufs; +import com.airbnb.plog.common.Murmur3; import com.google.common.base.Charsets; -import com.google.common.hash.Hashing; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; @@ -25,12 +24,15 @@ public Fragmenter(int maxFragmentSize) { } public ByteBuf[] fragment(ByteBufAllocator alloc, byte[] payload, Collection tags, int messageIndex) { - final int hash = Hashing.murmur3_32().hashBytes(payload).asInt(); - return fragment(alloc, Unpooled.wrappedBuffer(payload), tags, messageIndex, payload.length, hash); + final ByteBuf buf = Unpooled.wrappedBuffer(payload); + final int hash = Murmur3.hash32(buf, 0, payload.length); + return fragment(alloc, buf, tags, messageIndex, payload.length, hash); } public ByteBuf[] fragment(ByteBufAllocator alloc, ByteBuf payload, Collection tags, int messageIndex) { - return fragment(alloc, ByteBufs.toByteArray(payload), tags, messageIndex); + final int length = payload.readableBytes(); + final int hash = Murmur3.hash32(payload, 0, length); + return fragment(alloc, payload, tags, messageIndex, length, hash); } public ByteBuf[] fragment(ByteBufAllocator alloc, Message msg, int messageIndex) { diff --git a/plog-common/src/main/java/com/airbnb/plog/common/Murmur3.java b/plog-common/src/main/java/com/airbnb/plog/common/Murmur3.java new file mode 100644 index 0000000..9eed28e --- /dev/null +++ b/plog-common/src/main/java/com/airbnb/plog/common/Murmur3.java @@ -0,0 +1,65 @@ +package com.airbnb.plog.common; + +import io.netty.buffer.ByteBuf; +import lombok.extern.slf4j.Slf4j; + +import java.nio.ByteOrder; + +@Slf4j +public class Murmur3 { + private static final int C1 = 0xcc9e2d51; + private static final int C2 = 0x1b873593; + + public static int hash32(ByteBuf data) { + return hash32(data, data.readerIndex(), data.readableBytes(), 0); + } + + public static int hash32(ByteBuf data, final int offset, final int length) { + return hash32(data, offset, length, 0); + } + + public static int hash32(ByteBuf data, final int offset, final int length, final int seed) { + final ByteBuf ordered = data.order(ByteOrder.LITTLE_ENDIAN); + + int h = seed; + + final int len4 = length >>> 2; + final int end4 = offset + (len4 << 2); + + for (int i = offset; i < end4; i += 4) { + int k = ordered.getInt(i); + + k *= C1; + k = k << 15 | k >>> 17; + k *= C2; + + h ^= k; + h = h << 13 | h >>> 19; + h = h * 5 + 0xe6546b64; + } + + int k = 0; + switch (length & 3) { + case 3: + k = ordered.getByte(end4 + 2) << 16; + case 2: + k |= ordered.getByte(end4 + 1) << 8; + case 1: + k |= ordered.getByte(end4); + + k *= C1; + k = (k << 15) | (k >>> 17); + k *= C2; + h ^= k; + } + + h ^= length; + h ^= h >>> 16; + h *= 0x85ebca6b; + h ^= h >>> 13; + h *= 0xc2b2ae35; + h ^= h >>> 16; + + return h; + } +} diff --git a/plog-common/src/test/groovy/com/airbnb/plog/common/Murmur3Test.groovy b/plog-common/src/test/groovy/com/airbnb/plog/common/Murmur3Test.groovy new file mode 100644 index 0000000..7f8cf68 --- /dev/null +++ b/plog-common/src/test/groovy/com/airbnb/plog/common/Murmur3Test.groovy @@ -0,0 +1,31 @@ +package com.airbnb.plog.common + +import com.google.common.hash.Hashing +import io.netty.buffer.Unpooled + +class Murmur3Test extends GroovyTestCase { + void testGuavaCompat() { + final raw = 'abcdefghijklmnopqrstuvwxyz'.bytes + + for (seed in [0, 1, 10]) + for (len in 0..20) { + final model = Arrays.copyOf(raw, len) + final guavaHash = Hashing.murmur3_32(seed).hashBytes(model).asInt() + final plogHash = Murmur3.hash32(Unpooled.wrappedBuffer(model), 0, len, seed) + assert plogHash == guavaHash + } + } + + void testOffsetsAndLength() { + final raw = 'abcdef'.bytes + final bb = Unpooled.wrappedBuffer(raw) + + for (from in 0..raw.length) + for (to in from..raw.length) { + final model = Arrays.copyOfRange(raw, from, to) + final guavaHash = Hashing.murmur3_32().hashBytes(model).asInt() + final plogHash = Murmur3.hash32(bb, from, to - from, 0) + assert plogHash == guavaHash + } + } +} diff --git a/plog-server/build.gradle b/plog-server/build.gradle index eecf2b7..ad22fcd 100644 --- a/plog-server/build.gradle +++ b/plog-server/build.gradle @@ -3,6 +3,7 @@ apply plugin: 'coveralls' dependencies { compile project(':plog-api') + compile project(':plog-common') testCompile project(':plog-console') testCompile 'ch.qos.logback:logback-classic:1.1.2' diff --git a/plog-server/src/main/java/com/airbnb/plog/server/fragmentation/Defragmenter.java b/plog-server/src/main/java/com/airbnb/plog/server/fragmentation/Defragmenter.java index f65ea03..4342f33 100644 --- a/plog-server/src/main/java/com/airbnb/plog/server/fragmentation/Defragmenter.java +++ b/plog-server/src/main/java/com/airbnb/plog/server/fragmentation/Defragmenter.java @@ -1,12 +1,10 @@ package com.airbnb.plog.server.fragmentation; import com.airbnb.plog.MessageImpl; +import com.airbnb.plog.common.Murmur3; import com.airbnb.plog.server.packetloss.ListenerHoleDetector; import com.airbnb.plog.server.stats.StatisticsReporter; -import com.airbnb.plog.server.pipeline.ByteBufs; import com.google.common.cache.*; -import com.google.common.hash.Hashing; -import com.google.common.io.BaseEncoding; import com.typesafe.config.Config; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -112,15 +110,15 @@ private void pushPayloadIfValid(final ByteBuf payload, final int fragmentCount, Collection tags, List out) { - final byte[] bytes = ByteBufs.toByteArray(payload); - final int computedHash = Hashing.murmur3_32().hashBytes(bytes).asInt(); - if (computedHash == expectedHash) { + final int computedHash = Murmur3.hash32(payload); + + if (Murmur3.hash32(payload) == expectedHash) { payload.retain(); out.add(new MessageImpl(payload, tags)); this.stats.receivedV0MultipartMessage(); } else { - log.warn("Client sent hash {}, not matching computed hash {} for bytes {} (fragment count {})", - expectedHash, computedHash, BaseEncoding.base16().encode(bytes), fragmentCount); + log.warn("Client sent hash {}, not matching computed hash {} (fragment count {})", + expectedHash, computedHash, fragmentCount); this.stats.receivedV0InvalidChecksum(fragmentCount); } } diff --git a/plog-stress/src/main/java/com/airbnb/plog/stress/PlogStress.java b/plog-stress/src/main/java/com/airbnb/plog/stress/PlogStress.java index f9466ea..0794457 100644 --- a/plog-stress/src/main/java/com/airbnb/plog/stress/PlogStress.java +++ b/plog-stress/src/main/java/com/airbnb/plog/stress/PlogStress.java @@ -1,11 +1,11 @@ package com.airbnb.plog.stress; import com.airbnb.plog.client.fragmentation.Fragmenter; +import com.airbnb.plog.common.Murmur3; import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; -import com.google.common.hash.Hashing; import com.google.common.util.concurrent.RateLimiter; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -73,7 +73,7 @@ private void run(Config config) { log.info("Generating {} different hashes", differentSizes); final int[] precomputedHashes = new int[differentSizes]; for (int i = 0; i < differentSizes; i++) - precomputedHashes[i] = Hashing.murmur3_32().hashBytes(randomBytes, 0, minSize + sizeIncrements * i).asInt(); + precomputedHashes[i] = Murmur3.hash32(randomMessage, 0, minSize + sizeIncrements * i, 0); final ByteBufAllocator allocator = new PooledByteBufAllocator(); diff --git a/settings.gradle b/settings.gradle index dd09d12..53ba437 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,6 +1,7 @@ rootProject.name = 'plog' include 'plog-api', 'plog-client', + 'plog-common', 'plog-console', 'plog-distro', 'plog-kafka',