Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#38 Set message time when publishing #85

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

stevedh
Copy link

@stevedh stevedh commented May 17, 2019

I'd like to get some feedback on this -- the basics seem pretty straightforward and I'd like see if there's anything I missed. I need to figure out what to do with the throttle time, and see if this breaks the consumer.

straightforward and I'd like see if there's anything I missed.  We
need to figure out what to do with the throttle time, and see if this
breaks the consumer.
@twm
Copy link
Collaborator

twm commented May 24, 2019

Sorry it took so long to respond here — I'm trying as best I can to get #37 in before vacation next week.

I don't think that the changes to afkak.common.Message can be accepted because adding a field to the namedtuple is a backwards-incompatible change. It adds a new required parameter and changes the unpacking behavior of the object. The Message object is actually part of the Consumer public API surface (also technically the low-level methods on KafkaClient but I don't think absolute backwards compatibility is required there). More on this below.

For reference I think that KIP-32 is the original KIP with background on timestamps.

Producing with Timestamps

If you only need timestamps when producing messages you don't need to change Message at all: you can add a new, private _Message1 type with the additional field and alter Producer, afkak.create_message(), and afkak.create_message_set() to use it internally, and tack appropriate serialization logic somewhere (sorry for the mess that is KafkaCodec). I'm not a huge fan of accepting a (ts, payload) tuple. I'd rather define a specific type (not a namedtuple!) because there will be more extension coming — I plan to add support for the new Record format which adds key/value headers eventually. Plus who knows what Confluent will add... they tinker endlessly.

Consuming Timestamps

Adding timestamps to the consumer API is more complicated. The existing Message class could be augmented in a backwards-compatible way — remove __slots__ = () so that timestamp can go in __dict__, override __new__ to not pass it to namedtuple.__new__, etc. However, the Consumer API is already a bit of a mess from a user standpoint. The processor callback gets List[SourcedMessage] such that getting at the actual message bytes is pretty awkward:

def processor(messages):
    for sourced_message in messages:
        the_bytes = sourced_message.message.value

I think that we could do with some changes here (again also considering how to add headers), but the best way to do it is probably to add a new consumer API.

Throttling

I think that handling the throttle time can be handled as a separate PR. I skimmed a few KIPs that talk about it, and it seems that it only comes up when quotas are involved? I'm not sure about that, but we can handle it at a quite different layer so splitting it up would be reasonable.

@stevedh
Copy link
Author

stevedh commented May 24, 2019

No problem! I agree with you on most of the points actually, was just trying to get something quick and dirty out there to see if the project is active. The reason for the patch by the way is that if you try to use KSQL with this producer, it results in errors:

Input record ConsumerRecord(topic = READINGS.V1, partition = 1, leaderEpoch = null, offset = 24332535, CreateTime = -1, serialized key size = 36, serialized value size = 108, headers = RecordHeaders(headers = [], isReadOnly = false), key = c9530746-690c-54b4-8c1a-239fa787c1dd, value = [ ‘c9530746-690c-54b4-8c1a-239fa787c1dd’ | 1558048636 | 203 | 786676.5913660526 ]) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

So your choices are to use a TimestampExtractor (which is kind of a pain) or patch the encoding. For the Internet's benefit, you can work around this by setting LOG.MESSAGE_TIMESTAMP_TYPE=LogAppendTime on the kafka broker, but this has other undesirable side effects... Therefore, from my point of view it's valuable to just patch the producer side like we do here.

Do you see a need to continue to support MessageSet v0 production? I think some older Kafka servers may require it.

I'll try to find some time to revise this in the next couple of weeks. If you're going to introduce a more structured message production class though perhaps I will just wait for that.

@twm
Copy link
Collaborator

twm commented May 24, 2019

No, no need to support MessageSet v0. We already dropped support for brokers that old in Afkak.

It's best not to wait for me, as I have lots of tasks competing for Afkak time. I will do my best to review promptly, though.

@twm
Copy link
Collaborator

twm commented Jun 14, 2019

Just to follow up on throttling, I stumbled across the primary KIP for it: KIP-124 Request rate quotas. It looks like the field is just an FYI — it is the time the request was throttled — so we don't need to take any particular action client-site.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants