Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumers sometimes sees message timestamp as -1 #96

Closed
fillest opened this issue Dec 7, 2016 · 25 comments
Closed

consumers sometimes sees message timestamp as -1 #96

fillest opened this issue Dec 7, 2016 · 25 comments

Comments

@fillest
Copy link

fillest commented Dec 7, 2016

confluent-kafka==0.9.2, librdkafka 0.9.2, Python 2.7.6, Ubuntu 14.04, kafka 0.10.1.0

All producers run the same code and run on similar hosts. The consumer uses kafka-python==1.3.1 (instead of confluent-kafka) and record.timestamp sometimes is ok and sometimes (quite often) it is -1.

@fillest fillest changed the title message timestamp sometemies == -1 consumers sometimes sees message timestamp as -1 Dec 7, 2016
@edenhill
Copy link
Contributor

edenhill commented Dec 7, 2016

Are you using LogAppendTime (broker timestamps) or LogCreateTime (producer timestamps)?

@fillest
Copy link
Author

fillest commented Dec 7, 2016

I use CreateTime (I've just grepped log.message.timestamp.type in broker log)

@fillest
Copy link
Author

fillest commented Dec 7, 2016

Also I don't use compression

@edenhill
Copy link
Contributor

edenhill commented Dec 7, 2016

Make sure to set "api.version.request": True on your producer to enable message timestamps

@fillest
Copy link
Author

fillest commented Dec 7, 2016

Yep, it's already turned on.

_producer = confluent_kafka.Producer(**{
    'client.id': client_id,
    'bootstrap.servers': bootstrap_servers,
    'log.connection.close': False,
    'api.version.request': True,
    'queue.buffering.max.messages': 100000,
    'queue.buffering.max.kbytes': 4000000,
    'queue.buffering.max.ms': 1000,
    'message.send.max.retries': 9000,
    'batch.num.messages': 10000,
    'delivery.report.only.error': True,
    'default.topic.config': {
        'request.required.acks': -1,
    },
    'error_cb': error_cb,
    'on_delivery': on_delivery,
})

@edenhill
Copy link
Contributor

edenhill commented Dec 7, 2016

Have you verified the consumer side with any other client?
Do note that the consumer will need to use the new protocol feature set as well (so the kafka-python equivalent of api.version.request=true)

@fillest
Copy link
Author

fillest commented Dec 7, 2016

Nope. Well, I'm going to try again to switch this producer to confluent-kafka-python then.

@edenhill
Copy link
Contributor

edenhill commented Dec 7, 2016

I thought the producer was already on confluent-kafka-python, but the consumer was on kafka-python?

@fillest
Copy link
Author

fillest commented Dec 7, 2016

Yes.

@fillest
Copy link
Author

fillest commented Dec 7, 2016

Ah, sorry, yeah, I meant "switch this consumer to confluent-kafka-python"

@edenhill
Copy link
Contributor

Did you try switching the consumer to confluent-kafka-python, and if so, did it fix the receive timestamps?

@jeffwidman
Copy link
Contributor

@fillest if it is a problem with kafka-python, can you file a bug there? There's a few possible causes such as bug in broker version determination code, not using the right structs, etc.

But def first verify using another client such as the confluent-kafka-python consumer

@edenhill
Copy link
Contributor

edenhill commented Mar 2, 2017

Closing due to inactivity.
Reopen when there's more info to go on.

@shanavas-simelabs
Copy link

I am facing the same issue. we are using confluent-kafka version 0.11.6 producer and kafka stream consumer kafka broker version is 2.11.
It happens intermittently and unable to find the root cause of this. I get the following error

Input record ConsumerRecord(topic = <topic name>, partition = 0, leaderEpoch = null, offset = 652474, CreateTime = -1, serialized key size = 4, serialized value size = 134, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1003, value = <message data>) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

temporarily fixed the issue by using a different TimestampExtractor

@edenhill
Copy link
Contributor

What's the broker version? (2.11 is the scala version)
Also provide your producer config

@shanavas-simelabs
Copy link

2.11 is kafka version this one
producer config is simply {'bootstrap-servers': <ip>}

@edenhill
Copy link
Contributor

So that's Kafka 2.2.0 (built for scala 2.11).

On repeated connection failures the producer can downgrade to an older protocol future set (see https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility).
To circumvent this behaviour, set the following additional config params:
'api.version.fallback.ms': 0, 'broker.version.fallback': '2.2.0'

@shanavas-simelabs
Copy link

shanavas-simelabs commented May 10, 2019 via email

@srini6teen
Copy link

Hi @edenhill
We had the same issue where the confluent-kafka-python producer starts publishing messages suddenly with timestamp as -1.

We are using the following
confluent-kafka(producer) - 1.0.0
kafka brokers - 2.12-1.0.0
kafka-clients (java) - 1.0.0

We are using a java streams consumer. Would the above fix work for us as well? Should it be added both in producer(python) and consumer(java)? I assume that the producer has it as a default already?

@edenhill
Copy link
Contributor

This configuration does not apply to the Java consumer.

But with confluent-kafka-python it really shouldn't be needed, it already uses those settings as default.
Are you passing a custom timestamp to produce?

@srini6teen
Copy link

No. We are not using a custom timestamp to produce. It works fine for most of the time and then suddenly starts publishing these messages with negative timestamps.

@edenhill
Copy link
Contributor

Do you know if the the sudden change follows a reconnect to the brokers or any other trackable event?

@srini6teen
Copy link

Could possibly be. Didn't monitor it very closely. Had one of the zookeeper nodes restart and one of the brokers wasn't recognized.

@edenhill
Copy link
Contributor

The only reason a client will stop sending timestamps is if it downgrades the connection to an older protocol version, and this shouldn't happen with v1.0.0.
If this is reproducible it'd be fantastic to get logs with debug=broker,feature enabled.

@woohan
Copy link

woohan commented Jan 15, 2020

I use CreateTime (I've just grepped log.message.timestamp.type in broker log)

How did you set the timestamp type configuration? It's not one of the configs on topic level.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants