Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsumerMessage timestamp is inconsistent with clients in other languages when producer using compression #885

Closed
dvsekhvalnov opened this issue May 26, 2017 · 9 comments
Labels

Comments

@dvsekhvalnov
Copy link
Contributor

dvsekhvalnov commented May 26, 2017

Versions

Sarama Version: 1.12.0
Kafka Version: 0.10.0.1
Go Version: go1.8.1 darwin/amd64

Configuration

Kafka configured to use broker timestamps:
log.message.timestamp.type=LogAppendTime

Kafka producer is configured to use compression (other than none)

Problem Description

Probably more like a question, but i noticed that sarama returns inner message timestamps instead of timestamp of outer compression block when using ConsumePartition() function. Relevant code to populate ConsumerMessage struct from FetchResponse: https://github.com/Shopify/sarama/blob/master/consumer.go#L528

Which is by some reason different from results by other clients (java, librdkafka). It's only reproducing when Kafka broker is configured to timestamp incoming messages itself (LogAppendTime).

Below is test go snippet which shows difference:

package main

import (
	"fmt"
	"gopkg.in/Shopify/sarama.v1"
)

func main() {

	config := sarama.NewConfig()
	config.Version = sarama.V0_10_0_1

	topic := "local.test"
	broker := sarama.NewBroker("127.0.0.1:9092")

	err := broker.Open(config)

	if err != nil {
		panic(err)
	}

	defer broker.Close()

	fetchRequest := &sarama.FetchRequest{MaxWaitTime: 250, MinBytes: 1}
	fetchRequest.Version = 2

	fetchRequest.AddBlock(topic, 0, 0, 32768)

	fetchResponse, err := broker.Fetch(fetchRequest)

	if err != nil {
		panic(err)
	}

	for _, msgBlock := range fetchResponse.GetBlock(topic, 0).MsgSet.Messages {
		fmt.Printf("Block: [%v] %v %v\n", msgBlock.Offset, msgBlock.Msg.Version, msgBlock.Msg.Timestamp.UnixNano()/1000000)

		for _, inner := range msgBlock.Messages() {
			fmt.Printf("Inner: [%v] %v %v\n", inner.Offset, inner.Msg.Version, inner.Msg.Timestamp.UnixNano()/1000000)
		}
	}

}

, with my test data it produces:

Block: [0] 1 1495819250370 <- this is LogAppendTime, produced by Kafka itself, other clients returns it as timestamp
Inner: [0] 1 1495819250303  <- this is CreateTime, produced by filebeat, returned by sarama
Block: [1] 1 1495819250373  <- this is LogAppendTime by Kafka itself, returned by other clients
Inner: [0] 1 1495819250303  <- this is CreateTime, produced by filebeat, returned by sarama

Probably there is something filebeat specific (which is using sarama as well), i wasn't able to reproduce it with sarama producer myself, but probably somebody can shed light on it?

@eapache
Copy link
Contributor

eapache commented May 26, 2017

That is really weird and off the top of my head I have no idea, sorry. What is filebeat?

@eapache
Copy link
Contributor

eapache commented May 26, 2017

Hmm, googled around a bit.

Filebeat will set the timestamp to something specific in some cases: https://github.com/elastic/beats/blob/3865f220cbb5ed3d03ee632aa00f5413dead882e/libbeat/outputs/kafka/client.go#L158-L163. Don't know if that is being hit in your case.

Sarama picks the timestamp of the first inner message if relevant when producing: https://github.com/Shopify/sarama/blob/c01858abb625b73a3af51d0798e4ad42c8147093/produce_set.go#L88-L107

@dvsekhvalnov
Copy link
Contributor Author

dvsekhvalnov commented May 26, 2017

Filebeat setting timestamp when configured to use v0.10+ of Kafka. (CreateTime timestamp) This part is fine.

Kafka is configured to ignore producer timestamp and generate its own timestamp (LogAppendTime).

When consuming such recursive messages sarama behavior is different form other clients. That's part is not fine :) All other clients returns outer message timestamp.

Probably it is something to do with Kafka itself, i can check 0.10.1 and 0.10.2. May be Kafka should recursively update all nested timestamps with LogAppendTime option configured.

@dvsekhvalnov
Copy link
Contributor Author

How about if i rephrase issue like: can we include BlockTimestamp field to ConsumerMessage struct along with existing Timestamp.

Semantic will stay backward compatible with existing sarama users and new field can be used to provide parity with other clients?

@dvsekhvalnov dvsekhvalnov changed the title ConsumerMessage timestamp is inconsistent with clients in other languages? ConsumerMessage timestamp is inconsistent with clients in other languages when producer using compression May 26, 2017
@eapache
Copy link
Contributor

eapache commented May 26, 2017

rdkafka overrides the inner timestamps at https://github.com/edenhill/librdkafka/blame/6a735dd3b86a93b78f61855a7691439e89864368/src/rdkafka_broker.c#L3779-L3781 because of confluentinc/librdkafka@6aa20e1 but I'm not entirely convinced that logic is correct in all cases? @edenhill I don't think your intention was to completely blow away the inner message timestamps when both are provided? Or is the fact that both are provided at all a bug?

How about if i rephrase issue like: can we include BlockTimestamp field to ConsumerMessage struct along with existing Timestamp.

That will work, but I'd like to get to the bottom of this issue, something seems fishy.

@dvsekhvalnov
Copy link
Contributor Author

Ok, figured it out, when producer using compression, sarama will pick up inner message timestamp (which is not updated by Kafka - to be verified with latest versions).

Other clients respects outer compressed message timestamp.

@dvsekhvalnov
Copy link
Contributor Author

@eapache ,

tested latest v0.10.2.1, same broker behavior. After asking in kafka user list, it seems to be expected, check it out: http://search-hadoop.com/m/Kafka/uyzND1IGfjBm6AxA?subj=Re+Kafka+LogAppendTime+compressed+messages

So, do you want pull request or prefer to fix yourself?

@eapache
Copy link
Contributor

eapache commented May 31, 2017

OK, I guess it's intentional. I still find it kind of weird though :)

I'm happy to accept a PR for this.

dvsekhvalnov added a commit to dvsekhvalnov/sarama that referenced this issue Jun 15, 2017
@dvsekhvalnov
Copy link
Contributor Author

@eapache , here is pull request. Sorry for delay.

I wasn't able to figure out how to write test for that. It looks there are no tests for timestamps at all.
If you would like me to write one i'll appreciate some guidance here.

eapache added a commit that referenced this issue Jun 15, 2017
#885: added BlockTimestamp to ConsumerMessage
@eapache eapache closed this as completed Jun 15, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants