0.7 support #15

ccarollo opened this Issue Apr 24, 2012 · 20 comments


None yet
6 participants

ccarollo commented Apr 24, 2012

Does brod in its current form support 0.7 Kafka? I have it running and it seems fine, but I believe I saw somewhere on the brod github repo where it explicitly said it doesn't support 0.7.

Any details about support current and future would be greatly appreciated.



ormsbee commented Apr 25, 2012

brod doesn't support compression, which was added as an option in 0.7. Otherwise, I believe it's compatible.


ccarollo commented Apr 25, 2012

Thank you David. Are you planning on including compression support at some point or adding future support to brod, inline with advances that take place in Kafka?


ormsbee commented Apr 25, 2012

That's my intention, but I'm afraid I can't give any firm timelines.


ccarollo commented Apr 27, 2012

We are looking at using kafka and brod in our environment. Would you be interested in having us develop the compression piece for brod and submitting as a pull request?


ormsbee commented Apr 27, 2012

Sure thing. :-)


ccarollo commented May 4, 2012

Hi David,

My coworker has been working on compression support for Brod. And he has a first pass that he would like to share with you. It is a work in progress but basic compression (producer) and decompression (consumer) are working for gzip. He has a few questions and we are hoping to work with you on resolving them as they could potentially impact other parts of Brod. Once we have a better understanding of how to proceed we can continue refining compression support for gzip.
My coworkers handle is rasprague.


Hi David,

I've branched this repo into https://github.com/rasprague/brod. This branch adds Kafka 0.7 message header support as well as preliminary gzip compression support.

GZIP compression works, but I'm not very pleased with how I've implemented it. Also, automatic GZIP decompression has been implemented, but this causes the "expected offset" unit test to fail.

I'd be honored if you could take a look at my changes and maybe give me a little guidance in how to better implement auto decompression, as well as correcting the offset calculation for compressed messages.

Please see https://github.com/rasprague/brod/blob/master/notes.txt for more details.


Richard "spike" Sprague


ormsbee commented May 4, 2012

Awesome, thank you. I'll look it over this weekend.


ormsbee commented May 7, 2012

I think your troubles primarily come from problems in our implementation. Carlo and I were kind of discovering Kafka as we went along with this implementation, and we didn't really see the compression bit coming.

Breaking expected offsets with auto-decompression will cause a lot of problems, both for the simple API and the ZKConsumer. I think the solution (and a pretty decent sized refactoring) would be to make MessageSets smarter, and to use them more extensively. So basically, a BaseKafka.fetch() would return a MessageSet, and the MessageSet would encapsulate whether something has compression, would return the appropriate offsets, etc. MessageSets should be constructable given just a byte stream. The proper behavior for iterating a compressed set of offsets/messages should be to have the offset be the offset of the real compressed message boundary, not the individual messages inside. So for instance:

(0, "Hello world"),
(0, "This is compressed."),
(0, "If this read fails, we're going to have to query offset 0 again.")

I have a little free time coming up late this week, so I should be able to take a stab at this. You're more than welcome to dive into this yourself as well. Until this is done though, I think auto-decompression shouldn't happen in brod.

As for the other code -- it looks pretty reasonable to me given the current state of things. I'd encourage you to automatically detect which header you're getting on the fetch/parse side of things (0.6 vs. 0.7 via the magic number). We should only need to use the ugly VERSION_0_7 thing on the producer side of things. You could even make this an optional arg to BaseKafka -- I always thought having that define was an ugly kludge on my part.

Thank you very much for taking an interest in brod. :-)


Great! I can start work on factoring out the VERSION_0_7 boolean from the code, then I'll see about tackling the MessageSet refactoring. I'll keep you posted on my progress.


Hi David,

I've finally been able to put in some time into refactoring the code. I've moved message parsing into MessageSet, and added Message and CompressedMessage objects which get returned by MessageSet's parse method. CompressedMessage has a method to decompress itself and return another MessageSet. I've left automatic decompression out of brod, so as not to mess with the expected offset functionality, but the CompressedMessage objects makes handling compression a little easier. I've also had to modify TestKafkaBlockingWithCompression in test_brod.py to do the offset and message checking in two distinct steps.

I'd like to know what you think, if and when you have time to take a look at my changes.


Hi David,

I don't know if my last post may have fell through the cracks. I've made quite a bit of progress getting message compression support into brod. When you have time, I'd like it if you could take a look at my changes, make sure I'm headed in the right direction, and maybe make some recommendations on how to get compression support merged into the main line.

Here's the link to my branch: https://github.com/rasprague/brod.

Thanks a lot for your time, I look forward to hearing from you soon.

  • Richard "spike" Sprague

clofresh commented Jun 21, 2012

Hey Richard,

I've taken over as maintainer of brod and I just saw this thread just now. I'll take a look at your changes and do some testing on my end over the next few days and get back to you. Haven't fully caught up on the thread, but are your changes backwards compatible with 0.6?


Hi Carlo,

Yes, I've made sure that my changes are backwards compatible with 0.6. Message parsing will read the magic byte to automatically determine if it's dealing with a 0.6 or a 0.7 message. As for producing messages, the BaseKafka constructor takes in an optional param to allow for either 0.6 or 0.7 message producing.

.Richard "spike" Sprague

Hi Carlo,

Just wanted to check in to see if you were still interested in the 0.7 compatible branch that I cooked up. Ping me to let me know what's up.



clofresh commented Jul 16, 2012

In one of our internal tests, I'm getting some errors trying to run your brod branch against a 0.6 kafka broker:

Traceback (most recent call last):
  File "prozess/daniels/test.py", line 348, in <module>
  File "prozess/daniels/test.py", line 64, in test_daniels
    offsets1 = get_latest_offsets([metrics_bp, events_bp])
  File "prozess/daniels/test.py", line 313, in get_latest_offsets
    offsets[broker_partition] = kafka.latest_offset(broker_partition.topic, broker_partition.partition)
  File "/Users/carlo/Projects/datadog/brod/brod/base.py", line 489, in latest_offset
    return self.offsets(topic, LATEST_OFFSET, max_offsets=1, partition=partition)[0]
  File "/Users/carlo/Projects/datadog/brod/brod/base.py", line 478, in offsets
    partial(self._read_offset_response, callback)))
  File "/Users/carlo/Projects/datadog/brod/brod/blocking.py", line 104, in _write
    return callback()
  File "/Users/carlo/Projects/datadog/brod/brod/base.py", line 655, in _wrote_request_size
    return self._write(request, partial(self._wrote_request, callback))
  File "/Users/carlo/Projects/datadog/brod/brod/blocking.py", line 104, in _write
    return callback()
  File "/Users/carlo/Projects/datadog/brod/brod/base.py", line 660, in _wrote_request
    partial(self._read_response_size, callback))
  File "/Users/carlo/Projects/datadog/brod/brod/blocking.py", line 55, in _read
    chunk = self._socket.recv(length)
socket.error: [Errno 54] Connection reset by peer

In kafka's logs:

[2012-07-16 15:22:27,122] ERROR kafka.message.InvalidMessageException (kafka.network.Processor)
    at kafka.log.Log$$anonfun$append$1.apply(Log.scala:203)
    at kafka.log.Log$$anonfun$append$1.apply(Log.scala:201)
    at scala.collection.Iterator$class.foreach(Iterator.scala:631)
    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:29)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
    at kafka.message.MessageSet.foreach(MessageSet.scala:63)
    at kafka.log.Log.append(Log.scala:201)
    at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:57)
    at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:41)
    at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:41)
    at kafka.network.Processor.handle(SocketServer.scala:268)
    at kafka.network.Processor.read(SocketServer.scala:291)
    at kafka.network.Processor.run(SocketServer.scala:202)
    at java.lang.Thread.run(Thread.java:680)

Has querying for the latest offset changed?


clofresh commented Jul 19, 2012

(@rasprague still interested btw, just wanna make sure it doesn't break my existing stuff)

@rasprague have you had success decoding gzipped messages?

I am on Kafka 0.7.2... datadog/brod only retrieves one message I guess because it doesn't understand 0.7 headers. I'm trying your fork and it is able to retrieve a full list of messages at least. But they don't appear to be decoded properly, just gobbledegook:

7021: �-�MK�@�@�?�zj!ٙٝ��*�R�
�]5�!S�eTdd�U#Xu��m�TC0�0��2��{裿�2�z�o�:�/��p   ��Ƌ��ղ�6o���ö�����{Ih�!�6���7�Ss�����p2G�

I put some debugging in and the messages are detected by Brod as gzip compression and CompressedMessage objects are instantiated but the code path that decompresses the message is never reached.

Shame the API is more ugly, but I just found this client works perfectly: https://github.com/mumrah/kafka-python


technovangelist commented Mar 26, 2015

Hi @ccarollo - Thanks for your interest. This repo is no longer maintained and will be removed in 3 months (on or soon after June 30, 2015). If you are still interested in the functionality offered, we suggest looking at https://github.com/mumrah/kafka-python which is being actively maintained.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment