-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Murmur2 partitioner #1468
Murmur2 partitioner #1468
Conversation
Thanks for this, much appreciated and anticipated! I will review this more careful in a while, but from a quick glance there are some low hanging fruit that needs fixing:
|
I will use the endian neutral hash. |
this should probably be the hash function (MurmurHashNeutral2) which will be used by librdkafka, // MurmurHashNeutral2, by Austin Appleby would be lovely if another set of eyes would take a look at that implementation to see if there's anything suspicious. I will test it against the java implementation shortly |
I hope I addressed all of your requests with these commits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking better.
Still a few issues to sort out, and some massive whitespace diffs that needs to go away.
Also make sure to reading the coding guidelines regarding formatting and indentation.
// MurmurHash2 was written by Austin Appleby, and is placed in the public | ||
// domain. The author hereby disclaims copyright to this source code. | ||
|
||
Permission is hereby granted, free of charge, to any person obtaining a copy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the following license really apply to public domain?
Did Austin Appleby specify this license for the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes AppleBy distributed his software under the MIT license.
examples/Makefile
Outdated
@@ -89,4 +89,3 @@ rdkafka_zookeeper_example: ../src/librdkafka.a rdkafka_zookeeper_example.c | |||
|
|||
clean: | |||
rm -f $(EXAMPLES) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove whitespace diff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove whitespace diff
examples/rdkafka_simple_producer.c
Outdated
if (argc != 3) { | ||
fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]); | ||
return 1; | ||
const char *brokers = NULL; /* Argument: broker list */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rdkafka_simple_producer was added because rdkafka_example.c was getting heavy with options trying to show-case everything.
I'd like the simple producer to stay simple.
We can wait with example code until the config-string-based partitioner support is in place (so you could do conf_set("partitioner", "murmur2", ..)
).
So please remove the patch for this file.
src/MurMurHash2.h
Outdated
extern "C" { | ||
#endif | ||
|
||
void SetAffinity ( int cpu ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be used anywhere, remove.
src/rdcrc32.h
Outdated
@@ -30,10 +30,6 @@ | |||
#include <zlib.h> | |||
#endif | |||
|
|||
#ifdef __cplusplus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be removed.
src/rdmurmur2.c
Outdated
#include "rdmurmur2.h" | ||
|
||
int unittest_murmurhashneutral2 (void) { | ||
const char * keysToTest[] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent: 8 whitespaces.
Format: const char *keysToTest[] ..
(pay attention to *
placement)
src/rdmurmur2.c
Outdated
const char * keysToTest[] = { | ||
"kafka", | ||
"amqp", | ||
"giberish123456789" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the murmur2 code is prone to alignment and word issues I suggest you also add the following tests:
- unaligned access
- word-boundaries (i.e., lengths 7,8,9, 15,16,17)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have fixed all issues but this. What did you mean by word boundaries, so i allocate pointers and start at an address offset by 7 bits from the aligned address?
this is the only thing blocking me right now
src/rdmurmur2.h
Outdated
#ifndef __RDMURMUR2___H__ | ||
#define __RDMURMUR2___H__ | ||
|
||
#ifdef __cplusplus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed in C code, remove.
|
||
static RD_INLINE rd_murmur2_t rd_murmur2(const char *data, size_t data_len) { | ||
/* | ||
* last bit is set to 0 because the java implementation uses int_32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't a cast to int32_t sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no since the java version flips the first bit to make it positive and we need to maintain that behaviour
@@ -155,6 +156,7 @@ | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ClCompile Include="..\src\crc32c.c" /> | |||
<ClCompile Include="..\src\MurMurHash2.c" /> | |||
<ClCompile Include="..\src\rdaddr.c" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about rdmurmur2.h?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also please rebase this on current master and fixup / squash the commits?
src/rdkafka_msg.c
Outdated
int32_t partition_cnt, | ||
void *rkt_opaque, | ||
void *msg_opaque) { | ||
if (keylen == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the Java partitioner I believe this should probably be if (!key)
since there is a difference between no key (null) and an empty key (which is just an arbitrary key).
If so this is also incorrect on the existing partitioners, but we can't change their behaviour.
Sure thing, I will change it.
…On Oct 28, 2017 10:59 AM, "Magnus Edenhill" ***@***.***> wrote:
***@***.**** requested changes on this pull request.
Can you also please rebase this on current master and fixup / squash the
commits?
------------------------------
In src/rdkafka_msg.c
<#1468 (comment)>:
> @@ -570,6 +541,35 @@ int32_t rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt,
msg_opaque);
}
+int32_t rd_kafka_msg_partitioner_murmur2_consistent (const rd_kafka_topic_t *rkt,
+ const void *key, size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ return rd_murmur2(key, keylen) % partition_cnt;
+}
+
+int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt,
+ const void *key, size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ if (keylen == 0)
Looking at the Java partitioner I believe this should probably be if
(!key) since there is a difference between no key (null) and an empty key
(which is just an arbitrary key).
If so this is also incorrect on the existing partitioners, but we can't
change their behaviour.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#1468 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/ACWANHTtwFhXS54IjnMNoW6ohCdWvBHIks5swt7lgaJpZM4P-Rip>
.
|
@edenhill I have implemented most fixes except for the word boundary. Could you give a hint because I did not quite understand the unit test you requested. I would really appreciate it. |
@barrotsteindev I would love to see this PR happen and it looks close to completion. Do you foresee this getting merged? Thanks for your efforts here! |
c51e2fc
to
bcf7d84
Compare
i have rebased this pull request and responded to your requests @edenhill |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, but the whitespace diffs needs to be removed and the other formatting comments need to be addressed as well.
src/rdkafka_msg.c
Outdated
@@ -3,24 +3,24 @@ | |||
* | |||
* Copyright (c) 2012,2013 Magnus Edenhill | |||
* All rights reserved. | |||
* | |||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove whitespace diffs
examples/Makefile
Outdated
@@ -89,4 +89,3 @@ rdkafka_zookeeper_example: ../src/librdkafka.a rdkafka_zookeeper_example.c | |||
|
|||
clean: | |||
rm -f $(EXAMPLES) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove whitespace diff
src/rdkafka_msg.c
Outdated
@@ -222,7 +223,7 @@ int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition, | |||
int errnox; | |||
|
|||
/* Create message */ | |||
rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove whitespace diff
src/rdkafka_msg.c
Outdated
void *msg_opaque) { | ||
if (!key) | ||
return rd_kafka_msg_partitioner_random(rkt, | ||
key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty lines
src/rdkafka_msg.c
Outdated
@@ -799,4 +829,3 @@ int64_t rd_kafka_message_latency (const rd_kafka_message_t *rkmessage) { | |||
|
|||
return rd_clock() - rkm->rkm_ts_enq; | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove whitespace diff
Whitespace diff line 225
Fixed typo
Hey @edenhill, |
Looks good, thanks! |
@edenhill , I have examined the warnings from the osx build, |
We should probably use the optimized versions if the platform supports it, but we can fix that later. |
Big thanks for all the effort on this! |
Add murmur2 partitioner, currently using the fastest algorithm for murmur2, which is not endian neutral. Also implements an endian neutral algorithm which can be used instead.
Fixes part of issue #1424