Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is it a bug with Compression? #32

Closed
cloudaice opened this issue Sep 2, 2013 · 20 comments
Closed

Is it a bug with Compression? #32

cloudaice opened this issue Sep 2, 2013 · 20 comments

Comments

@cloudaice
Copy link

when I write produer with Compression CompressionGZIP. Then I can't read data from kafka with consumer, what's wrong ?

@eapache
Copy link
Contributor

eapache commented Sep 2, 2013

Is the consumer receiving no data, or is it receiving data that is incorrect? Does the producer return any errors? Does the consumer return any errors?

@cloudaice
Copy link
Author

When I use consumer to get data, there is no data, and don't return any errors. The producer don't return any errors too. It seems that the message are not logged by kafka. Because when I produce with no compression and use consumer to get messages, the offset is continue.

@cloudaice
Copy link
Author

I have read the some source codes, But I can't find any results: I show you my code bellow:

producer

  producer, err := sarama.NewProducer(client, "Go-test",
              &sarama.ProducerConfig{
                      RequiredAcks: sarama.WaitForLocal,
                      //Compression:  sarama.CompressionGZIP,
              })

      if err != nil {
              panic(err)
      }
      defer producer.Close()

      for i := 0; i < 10; i++ {
              err = producer.SendMessage(nil, sarama.StringEncoder("hello world"))
              if err != nil {
                      panic(err)
              }
      }

consumer

consumer, err := sarama.NewConsumer(client, "Go-test", 0, "Go-test-group",
              &sarama.ConsumerConfig{OffsetMethod: sarama.OffsetMethodNewest})
      if err != nil {
              panic(err)
      }

      for event := range consumer.Events() {
              if event.Err != nil {
                      panic(err)
              }
              fmt.Println(event.Offset, event.Value)
      }
 }

If I remove the comments before Compression: sarama.CompressionGZIP, . consumer won't get any data.

@eapache
Copy link
Contributor

eapache commented Sep 2, 2013

I can definitely reproduce this - the kafka broker is acking the message but not actually storing it in the log. Not sure if we're sending it something subtly malformed or if this is a Kafka bug...

@cloudaice
Copy link
Author

I am not familiar with the protocol. Maybe we can get some an answers from the kafka-python

@eapache eapache closed this as completed in 7d6070c Sep 3, 2013
@eapache
Copy link
Contributor

eapache commented Sep 3, 2013

My commit is wrong, we are encoding correctly. There must be a bug with the Kafka broker, so I suggest you file a ticket in their bug tracker.

@cloudaice
Copy link
Author

OK I will report this in kafka bug tracker. Additional, I think we need batch-send function soon. I read the source code of kafka-python project, it has batch-send function. but it don't hava compression function.

@amalakar
Copy link

If there is a kafka JIRA corresponding to this could you point me to it?

I am facing the same issue. When I produce using CompressionSnappy the consumer doesn't seem to receive any message. Using CompressionNone seems to work fine though. I am using sarama as the producer and a scala consumer.

@eapache
Copy link
Contributor

eapache commented Oct 29, 2013

I never heard anything back, but I suspect it would be this one: https://issues.apache.org/jira/browse/KAFKA-1039

TL;DR seems to be to make sure that you have the Snappy jar in your classpath or Kafka silently* drops messages :/

*silently with the default log settings at least, apparently you get a message if you do some futzing around with log4j

@amalakar
Copy link

I did some testing and it appears that it isn't a case of missing jar in server-side kafka. I modified ConsoleProducer that comes with kafka to use SnappyCompressionCodec instead of default GZip codec. I was able to produce/consume messages. Looking at the kafka log files I see that Snappy Compression was indeed getting used:

% bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/aruptest-0/00000000000000000000.log | tail -2
offset: 15 position: 18763 isvalid: true payloadsize: 52 magic: 0 compresscodec: SnappyCompressionCodec crc: 1602790249
offset: 16 position: 18841 isvalid: true payloadsize: 64 magic: 0 compresscodec: SnappyCompressionCodec crc: 181410682

Given this I am assuming the server is capable of handling snappy codec in my setup. Could it be possible that sarama is not encoding the messages properly?

Has anyone tested sarama with snappy? I am running kafka-0.8.0-beta1 by the way.

@eapache
Copy link
Contributor

eapache commented Oct 29, 2013

I have verified Sarama's output under snappy against the Kafka protocol spec [1]. I was never able to actually produce messages to a broker with snappy, but I had assumed that it was simply because I didn't have the JAR.

If the spec is wrong, the only way to tell would be to check the actual network traffic using Wireshark [2] (or tcpdump or a similar tool). If you want to take a network capture of encoding the same message when it works (ConsoleProducer) and when it doesn't (Sarama) I can compare the two traces and figure out what Sarama is doing differently.

[1] https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
[2] https://www.wireshark.org/

@eapache
Copy link
Contributor

eapache commented Oct 29, 2013

Although if Sarama were sending anything different from what Kafka was looking for I would expect an error from Kafka along the lines of "could not parse message" or "corrupt message" or something.

@amalakar
Copy link

kafka producer:

00:00:00:77:00:00:00:00:00:00:00:08:00:00:00:00:00:00:05:dc:00:00:00:01:00:08:61:72:75:70:74:65:73:74:00:00:00:01:00:00:00:01:00:00:00:4d:00:00:00:00:00:00:00:00:00:00:00:41:03:b8:d7:07:00:02:ff:ff:ff:ff:00:00:00:33:82:53:4e:41:50:50:59:00:00:00:00:01:00:00:00:01:00:00:00:1f:24:00:00:19:01:60:18:3f:98:c2:12:00:00:ff:ff:ff:ff:00:00:00:0a:61:6e:6f:74:68:65:72:6f:6e:65

sarama producer:
00:00:00:65:00:00:00:00:00:00:00:01:00:15:50:69:6e:67:64:2d:6d:74:76:2d:65:6e:67:2d:31:36:34:2e:6d:74:76:00:00:00:00:00:00:00:00:00:01:00:08:61:72:75:70:74:65:73:74:00:00:00:01:00:00:00:00:00:00:00:26:00:00:00:00:00:00:00:00:00:00:00:1a:b0:09:48:89:00:02:ff:ff:ff:ff:00:00:00:0c:0a:24:61:6e:6f:74:68:65:72:6f:6e:65

I can send the capture files your way if you are interested.

Message payload is same in both cases "anotherone". The client id is different though. Let me know if this works for you.

I agree, if kafka is unable to understand the message it should definitely respond back with an error. If it is not doing so, then it is a bug in kafka's side.

@amalakar
Copy link

CompressionGZIP doesn't seem to work either. This is the code to send message using sarama:
https://gist.github.com/amalakar/7225198

I see that error is nil in case of both zip/snappy:
2013/10/29 17:29:32 Successfully created kafka producer: [localhost:9092]. Topic: aruptest
helloworld
2013/10/29 17:29:35 Request successfully sent: [helloworld]

@eapache
Copy link
Contributor

eapache commented Oct 30, 2013

Please send the actual capture files my way, I'd much rather let Wireshark tell me where the fields are then dissect it myself :)

It's possible that Kafka is suppressing the error because you're specifying NoResponse. Do you get anything interesting if you specify WaitForAll instead?

@amalakar
Copy link

Find the attached pcap files, the packet number is part of the file name.

I tried WaitForAll but no success, it still returns nil error.

Arup Malakar

@eapache
Copy link
Contributor

eapache commented Oct 30, 2013

So I have no idea what the ConsoleProducer is actually sending in this case. The outer protocol layers in both cases look identical, but if you compare the actual message value:

  • Sarama sends two bytes of snappy header and then "anotherone" (since Snappy decides it's too short to properly encode, so makes it a literal). Pretty straightforward.
  • ConsoleProducer sends 0x82 then the string literal SNAPPY\0 then what appears to be a complete embedded produce request without any compression. This is neither valid snappy nor valid Kafka according to anything I've seen, so I'm pretty confused. It looks almost like an incorrect version of [1] but it's missing several key fields and the case of the identifying string is wrong.

I think we probably need to ask the Kafka folks to update their protocol spec page, since this definitely does not match what's there.

[1] http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt

@amalakar
Copy link

@eapache thanks for taking look at the tcpdump. I have filed a kafka jira: https://issues.apache.org/jira/browse/KAFKA-1110. Hopefully we will get a response regarding the spec. We could also look at the kafka source to see what it expects.

One difference between the ConsoleProducer and sarama is, in case of ConsoleProducer the client and server both uses the same snappy library (snappy-java-1.0.4.1.jar), so even if it strays from the snappy spec it would work just fine. In our case the messages are compressed using snappy-go so we may get hit by any incompatibility between snappy-go and snappy-java. I am hoping that is not the case.

Feel free to add more information in the JIRA I have created.

@eapache
Copy link
Contributor

eapache commented Oct 30, 2013

If either of the snappy libs were that terribly incompatible I would expect they would have their own bug reports by this time, but it's possible. I've filled in a few details on the JIRA, hopefully we get a response.

Last time I tried digging through the Kafka source was not particularly enlightening, but it's worth a shot if the JIRA doesn't produce anything useful.

@joestein
Copy link

joestein commented Nov 8, 2013

Can you make sure you have log4j in the classpath of the broker and see if there is an error coming from the Kafka broker when you are trying to-do this. There should be an error from what you are saying... Kafka will not log errors if log4j is not in the classpath in general https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whybrokersdonotreceiveproducersentmessages%3F you sure there is no error?

Also see my last comment in the Kafka JIRA ticket https://issues.apache.org/jira/browse/KAFKA-1110?focusedCommentId=13816991&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13816991

I can't reproduce this unfortunately I do not know Go (yet) and won't have time for a few weeks to pick it up to help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants