Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Conversation

emmettbutler
Copy link
Contributor

This pull request adds a public interface to the consumers that allows client code to set the current partition offsets to whatever they choose. Previously, this had to be done via a semi-public undocumented interface as detailed in this issue.

Given these changes, the new workflow for specifying offsets from which to consume looks like this:

consumer = topic.get_simple_consumer(consumer_group="testgroup")
partition_offset_pairs = [(p, get_offset_for_partition(p)) for p in consumer.partitions.itervalues()]
consumer.reset_offsets(partition_offsets=partition_offset_pairs)

This pull request also ties reset_offsets to the fetch locking mechanism so that a partition can never be fetching and resetting its offsets at the same time. This makes the unlocking logic at the end of fetch() notably more complicated, since one of its error conditions involves calling reset_offsets.

The following has been resolved in this pull request
The only issue I see with this method is that the consumer has a chance to start fetching messages between get_simple_consumer and reset_offsets. To get around this, we could accept the partition_offset_pairs in get_simple_consumer, but that would require some way to obtain a list of partitions before actually instantiating a consumer. This sounds like a road I'd rather not go down, so the question is: how bad is it that it's impossible to start the consumer at an arbitrary offset that's not the head or tail of the partition? My gut tells me it doesn't matter and this solution is ok.
Let me know what you think, @kbourgoin and @yungchin.

@yungchin
Copy link
Contributor

Cool cool, this is a feature I've good use for.

The only issue I see with this method is that the consumer has a chance to start fetching messages between get_simple_consumer and reset_offsets.

Ah, good point - I hadn't realised that when I posted on #187. So iiuc, that means if you start the consumer, then change the offsets, you might get a message from consume() at a different offset, because that got on the queue inbetween? That could be confusing. Would it be possible to flush the queues on all partitions passed to reset_offsets()?

we could accept the partition_offset_pairs in get_simple_consumer, but that would require some way to obtain a list of partitions before actually instantiating a consumer

That wouldn't be completely terrible either. Presumably, someone using this feature has some other place where they store offsets (I've an app where the offsets sit in postgres), and so they would usually know which partitions they want to begin with.

Although... it might get messy when the number of partitions is changed (by a cluster operator). If you then have to provide a list of partitions+offsets on init, you then can't make it grab the new partitions. So I guess for that scenario your current solution is better: you can grab whatever partitions there are (ie don't specify a list on init), and then for the ones you already knew about you can reset the offsets.

Note to self: I think I have to handle reset_offsets() in #176, if it's going to be a public method. Otherwise, the rdkafka consumer would completely ignore changed offsets after start().

@emmettbutler
Copy link
Contributor Author

I'm not totally convinced that there's a good reason to allow setting offsets on __init__, but I do think it's a smart idea to have reset_offsets always flush the internal queues when called by a user. When called internally in response to an error, it shouldn't, but this can be handled with a kwarg.

@yungchin
Copy link
Contributor

I'm not totally convinced that there's a good reason to allow setting offsets on __init__

Yes, no indeed - after re-reading my earlier ramble, I agree :)

@emmettbutler
Copy link
Contributor Author

I've added the option to lock and flush the partition queues during an offset reset. This means that users no longer need to manually pause consumption before calling reset_offsets. In implementing this, I also discovered a bug: if consume is called when there are some messages in the queue and after reset_offsets (_reset_offsets in current master) has been called, the partition's internal offset counter (last_offset_consumed) will be reset to match the message just returned from consume. That means that the offset set by reset_offsets will be effectively overwritten and the offset will appear not to have been reset. It's quite possible that this is the bug we've been seeing in production in which not all offsets are reset when deploying with reset_offsets_on_start.
To fix this, we now lock all fetcher threads and clear the per-partition message queues before resetting offsets so that any subsequent calls to consume will start from the offset intended by reset_offsets, not from the offset of some other message hanging around in the queue.

@emmettbutler
Copy link
Contributor Author

This is ready for review @yungchin @kbourgoin. I'll be testing it against real streams as soon as possible, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this become "Iterable of (int, int)" - so that is, (partition_id, offset) - instead? Otherwise a user would have to go through self._partitions_by_id to obtain instances of OwnedPartition first before they'd call this, I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can actually use the partitions property that's currently in master. I think it's cleaner and more consistent with the rest of the API to supply Partition instances over their ids.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean that that feels inconsistent, but on the other hand as a lazy user of the interface I'd say that the nice thing about ids is that you can save them to disk, and so when you get them from disk, you could pass them straight in here.

Maybe I'm missing a trick though, and there's a shortcut somehow. Am I right in thinking you'd have to get the ids, then map them to partition.Partitions (which is what you get from the BalancedConsumer.partitions property) and then again map to simpleconsumer.OwnedPartitions to call reset_offsets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sure - if you're saving partition IDs to disk then you would have to do a bit of work to construct the input to reset_offsets. I think a cleaner solution is to stop this function from accepting OwnedPartition and have it take Partition instead, since OwnedPartition is supposed to be private. I'll hold off on merging until I've figured that out, then.

@emmettbutler emmettbutler force-pushed the feature/reset_offsets branch from 2876a58 to a3d7d24 Compare June 29, 2015 22:33
@yungchin
Copy link
Contributor

Ok, so basically I've only lazy complaints about the function signature :)
The lack of exceptions I agree is not a regression from current master, so it makes sense to save them for a separate ticket, and as for how it looks under the hood otherwise: no complaints at all.

@emmettbutler
Copy link
Contributor Author

Awesome, thanks for the feedback. Your comments did make me realize it's no good to expose OwnedPartition to the client, so I need to figure that out.

@emmettbutler emmettbutler force-pushed the feature/reset_offsets branch from b7881b1 to 3388a6b Compare July 10, 2015 18:10
@emmettbutler
Copy link
Contributor Author

It turns out that kafka-python allows resetting to custom offsets by promising to fetch the given offset next, not by actually committing the given offset to kafka. This is a problem because it creates a mismatch between the offset stored in kafka and the offset stored in the consumer; this is contrary to the design we've tried to maintain in pykafka so far.
Given that we'd like to commit offsets as soon as possible to kafka, we could try simply sending the user-supplied offsets to kafka during reset_offsets in an OffsetRequest. Unfortunately, the OffsetRequest only accepts timestamps, not actual offset values (We agree that this API is slightly funky.). When supplied with a timestamp it doesn't like, the request returns with an error code of 0 and an empty list for the offset, but still appears to set the offset in kafka's offset store. This seems to be a bug or at least a poor design choice in kafka.
In this branch, we allow the user to supply either timestamps or actual offset values to reset_offsets. If the request succeeds and returns an offset (the dual error condition that we apparently have to check), we're all good. If it returns the empty offset list, we simply set the consumer's internal offset to the supplied value and continue on. This will mirror the behavior of kafka-python and provide a bit more versatility.
If the offset provided by the client is invalid, this branch prints an error message and proceeds with the normal offset reset procedure.

@emmettbutler
Copy link
Contributor Author

This is now ready for review from @yungchin and @kbourgoin. @yungchin, this hasn't changed a ton since you looked at it, but I fixed some deadlocks and improved the interface for user-supplied offsets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that partitions has been dropped from the code above this, I think L523-525 may need rewriting to write into owned_partition_offsets instead?

@yungchin
Copy link
Contributor

Cool, very happy with the addition of held_offsets. The deadlock fix looks perfectly good too (it did take me quite some time to get my head around that commit - it's become quite a challenge to reason about where we may grab and release fetch_lock!)

The new way of handling both timestamps and actual offsets seems a neat solution to me. I should add though that I don't feel particularly qualified to comment on that bit, given that I've only a poor understanding of Kafka's OffsetRequest semantics so far. Probably best if @kbourgoin can give that a read too.

I've added one new note (see above), which seems important for repeating failed requests. Other than that, all happy!

emmettbutler added a commit that referenced this pull request Jul 13, 2015
public interface for offset resetting
@emmettbutler emmettbutler merged commit 2dc86ce into master Jul 13, 2015
@emmettbutler emmettbutler deleted the feature/reset_offsets branch July 13, 2015 23:04
@yungchin yungchin mentioned this pull request Jul 24, 2015
11 tasks
yungchin added a commit that referenced this pull request Jul 28, 2015
As of 2dc86ce (#194) reset_offsets() is a public method that should be
working correctly on a running consumer.  This commit deals with that by
completely nuking the internal rdkafka consumer, which should be ok
unless someone wanted to call reset_offsets() a lot more than expected.
Tests for this are currently on a separate branch, see #213.

As a bonus, this also made fetch_offsets work for a running consumer.

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants