Skip to content

Commit

Permalink
plog-common, with netty-friendly murmurhash3
Browse files Browse the repository at this point in the history
plog-common should include code that doesn't fit in the API
or distribution, but benefits from being shared between many components.

As getting out of ByteBufs for checksumming turns out expensive,
but Guava doesn't expose any API allowing us to checksum Netty buffers,
get started with a naive implementation (hopefully not too naive).
  • Loading branch information
Pierre Carrier committed May 7, 2014
1 parent 18f8db5 commit 90e6c82
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 15 deletions.
2 changes: 2 additions & 0 deletions plog-client/build.gradle
@@ -1,4 +1,6 @@
dependencies {
compile project(':plog-api')
compile project(':plog-common')

testCompile project(':plog-server')
}
@@ -1,9 +1,8 @@
package com.airbnb.plog.client.fragmentation;

import com.airbnb.plog.Message;
import com.airbnb.plog.server.pipeline.ByteBufs;
import com.airbnb.plog.common.Murmur3;
import com.google.common.base.Charsets;
import com.google.common.hash.Hashing;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
Expand All @@ -25,12 +24,15 @@ public Fragmenter(int maxFragmentSize) {
}

public ByteBuf[] fragment(ByteBufAllocator alloc, byte[] payload, Collection<String> tags, int messageIndex) {
final int hash = Hashing.murmur3_32().hashBytes(payload).asInt();
return fragment(alloc, Unpooled.wrappedBuffer(payload), tags, messageIndex, payload.length, hash);
final ByteBuf buf = Unpooled.wrappedBuffer(payload);
final int hash = Murmur3.hash32(buf, 0, payload.length);
return fragment(alloc, buf, tags, messageIndex, payload.length, hash);
}

public ByteBuf[] fragment(ByteBufAllocator alloc, ByteBuf payload, Collection<String> tags, int messageIndex) {
return fragment(alloc, ByteBufs.toByteArray(payload), tags, messageIndex);
final int length = payload.readableBytes();
final int hash = Murmur3.hash32(payload, 0, length);
return fragment(alloc, payload, tags, messageIndex, length, hash);
}

public ByteBuf[] fragment(ByteBufAllocator alloc, Message msg, int messageIndex) {
Expand Down
65 changes: 65 additions & 0 deletions plog-common/src/main/java/com/airbnb/plog/common/Murmur3.java
@@ -0,0 +1,65 @@
package com.airbnb.plog.common;

import io.netty.buffer.ByteBuf;
import lombok.extern.slf4j.Slf4j;

import java.nio.ByteOrder;

@Slf4j
public class Murmur3 {
private static final int C1 = 0xcc9e2d51;
private static final int C2 = 0x1b873593;

public static int hash32(ByteBuf data) {
return hash32(data, data.readerIndex(), data.readableBytes(), 0);
}

public static int hash32(ByteBuf data, final int offset, final int length) {
return hash32(data, offset, length, 0);
}

public static int hash32(ByteBuf data, final int offset, final int length, final int seed) {
final ByteBuf ordered = data.order(ByteOrder.LITTLE_ENDIAN);

int h = seed;

final int len4 = length >>> 2;
final int end4 = offset + (len4 << 2);

for (int i = offset; i < end4; i += 4) {
int k = ordered.getInt(i);

k *= C1;
k = k << 15 | k >>> 17;
k *= C2;

h ^= k;
h = h << 13 | h >>> 19;
h = h * 5 + 0xe6546b64;
}

int k = 0;
switch (length & 3) {
case 3:
k = ordered.getByte(end4 + 2) << 16;
case 2:
k |= ordered.getByte(end4 + 1) << 8;
case 1:
k |= ordered.getByte(end4);

k *= C1;
k = (k << 15) | (k >>> 17);
k *= C2;
h ^= k;
}

h ^= length;
h ^= h >>> 16;
h *= 0x85ebca6b;
h ^= h >>> 13;
h *= 0xc2b2ae35;
h ^= h >>> 16;

return h;
}
}
@@ -0,0 +1,31 @@
package com.airbnb.plog.common

import com.google.common.hash.Hashing
import io.netty.buffer.Unpooled

class Murmur3Test extends GroovyTestCase {
void testGuavaCompat() {
final raw = 'abcdefghijklmnopqrstuvwxyz'.bytes

for (seed in [0, 1, 10])
for (len in 0..20) {
final model = Arrays.copyOf(raw, len)
final guavaHash = Hashing.murmur3_32(seed).hashBytes(model).asInt()
final plogHash = Murmur3.hash32(Unpooled.wrappedBuffer(model), 0, len, seed)
assert plogHash == guavaHash
}
}

void testOffsetsAndLength() {
final raw = 'abcdef'.bytes
final bb = Unpooled.wrappedBuffer(raw)

for (from in 0..raw.length)
for (to in from..raw.length) {
final model = Arrays.copyOfRange(raw, from, to)
final guavaHash = Hashing.murmur3_32().hashBytes(model).asInt()
final plogHash = Murmur3.hash32(bb, from, to - from, 0)
assert plogHash == guavaHash
}
}
}
1 change: 1 addition & 0 deletions plog-server/build.gradle
Expand Up @@ -3,6 +3,7 @@ apply plugin: 'coveralls'

dependencies {
compile project(':plog-api')
compile project(':plog-common')

testCompile project(':plog-console')
testCompile 'ch.qos.logback:logback-classic:1.1.2'
Expand Down
@@ -1,12 +1,10 @@
package com.airbnb.plog.server.fragmentation;

import com.airbnb.plog.MessageImpl;
import com.airbnb.plog.common.Murmur3;
import com.airbnb.plog.server.packetloss.ListenerHoleDetector;
import com.airbnb.plog.server.stats.StatisticsReporter;
import com.airbnb.plog.server.pipeline.ByteBufs;
import com.google.common.cache.*;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.typesafe.config.Config;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -112,15 +110,15 @@ private void pushPayloadIfValid(final ByteBuf payload,
final int fragmentCount,
Collection<String> tags,
List<Object> out) {
final byte[] bytes = ByteBufs.toByteArray(payload);
final int computedHash = Hashing.murmur3_32().hashBytes(bytes).asInt();
if (computedHash == expectedHash) {
final int computedHash = Murmur3.hash32(payload);

if (Murmur3.hash32(payload) == expectedHash) {
payload.retain();
out.add(new MessageImpl(payload, tags));
this.stats.receivedV0MultipartMessage();
} else {
log.warn("Client sent hash {}, not matching computed hash {} for bytes {} (fragment count {})",
expectedHash, computedHash, BaseEncoding.base16().encode(bytes), fragmentCount);
log.warn("Client sent hash {}, not matching computed hash {} (fragment count {})",
expectedHash, computedHash, fragmentCount);
this.stats.receivedV0InvalidChecksum(fragmentCount);
}
}
Expand Down
@@ -1,11 +1,11 @@
package com.airbnb.plog.stress;

import com.airbnb.plog.client.fragmentation.Fragmenter;
import com.airbnb.plog.common.Murmur3;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.RateLimiter;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
Expand Down Expand Up @@ -73,7 +73,7 @@ private void run(Config config) {
log.info("Generating {} different hashes", differentSizes);
final int[] precomputedHashes = new int[differentSizes];
for (int i = 0; i < differentSizes; i++)
precomputedHashes[i] = Hashing.murmur3_32().hashBytes(randomBytes, 0, minSize + sizeIncrements * i).asInt();
precomputedHashes[i] = Murmur3.hash32(randomMessage, 0, minSize + sizeIncrements * i, 0);

final ByteBufAllocator allocator = new PooledByteBufAllocator();

Expand Down
1 change: 1 addition & 0 deletions settings.gradle
@@ -1,6 +1,7 @@
rootProject.name = 'plog'
include 'plog-api',
'plog-client',
'plog-common',
'plog-console',
'plog-distro',
'plog-kafka',
Expand Down

0 comments on commit 90e6c82

Please sign in to comment.