-
Notifications
You must be signed in to change notification settings - Fork 224
Tests for SimpleConsumer.reset_offsets #213
Conversation
This avoids a test-cluster quirk where sometimes the first call to fetch_offsets() seems to end up with offsets stored to the cluster in a previous test run. I previously thought cb1ba29 had been sufficient to deal with this problem, but as it turns out, no: Traceback (most recent call last): File "/srv/pykafka/tests/pykafka/test_simpleconsumer.py", line 64, in test_offset_resume self.assertEquals(offsets_resumed, offsets_committed) AssertionError: {0: 99, 1: -1, 2: -1} != {0: 99, 1: -1, 2: 199} With this commit, I can no longer reproduce that (famous last words). Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Just a quick clean up, using the new feature. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
This reverts commit 4cbe0a1. Those changes resulted in KeyErrors because they perform a flush of owned_partition_offsets when that is still needed by _handle_success() later. I'll try to commit an alternative fix for the problem.
This aims to solve the same problem that 4cbe0a1 addressed, but which I reverted in the parent commit because it caused errors. (The problem it addressed, as far as I understand it, is that the way we updated owned_partition_offsets prior to 4cbe0a1 might drop partitions belonging to brokers that we hadn't actually processed yet.) Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Use of parts_by_error outside the loop where it gets assigned to, meant that these expressions would only 'see' errors that occurred for the last broker in that loop. The changes in the parent commit enable a quick fix for this. I'm afraid this comes without a proper test case to demonstrate the issue. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Note the FIXME, that I'm still struggling with. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
@emmett9001 I ran into an issue that came in with 4cbe0a1 (see commit message of 50561ee for details), and have put in an alternative fix here with 3cea51c, and some more changes in 3e76c15. I'd be grateful if you can check on those two commits in particular, because |
tests/pykafka/test_simpleconsumer.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This detects an off-by-one error, currently (I've committed -4 here to make the tests pass, but it should be -5 as per L102 above). I've no idea yet whether that's an error in the test code or elsewhere: any insight much appreciated.
Here's the test log:
======================================================================
FAIL: Test resetting to user-provided offsets
----------------------------------------------------------------------
Traceback (most recent call last):
File "/srv/pykafka/tests/pykafka/test_simpleconsumer.py", line 107, in test_reset_offsets
self.assertEqual(msg.offset, latest_offs[msg.partition_id] - 5)
AssertionError: 295 != 294
'295 != 294' = '%s != %s' % _common_shorten_repr(295, 294)
'295 != 294' = self._formatMessage('295 != 294', '295 != 294')
>> raise self.failureException('295 != 294')
-------------------- >> begin captured logging << --------------------
pykafka.simpleconsumer: INFO: Resetting offsets for 3 partitions
pykafka.simpleconsumer: INFO: Flushed queue for partition 1
pykafka.simpleconsumer: INFO: Flushed queue for partition 0
pykafka.simpleconsumer: INFO: Flushed queue for partition 2
pykafka.simpleconsumer: DEBUG: Fetcher thread exiting
pykafka.simpleconsumer: INFO: Starting 1 fetcher threads
pykafka.simpleconsumer: INFO: Resetting offsets for 3 partitions
pykafka.simpleconsumer: INFO: Flushed queue for partition 0
pykafka.simpleconsumer: INFO: Flushed queue for partition 2
pykafka.simpleconsumer: INFO: Flushed queue for partition 1
pykafka.simpleconsumer: WARNING: Offset reset for partition 1 to timestamp -6 failed. Setting partition 1's internal counter to -6
pykafka.simpleconsumer: INFO: Resetting offsets in response to OffsetOutOfRangeError
pykafka.simpleconsumer: INFO: Resetting offsets for 1 partitions
pykafka.simpleconsumer: INFO: Flushed queue for partition 1
pykafka.simpleconsumer: WARNING: Offset reset for partition 0 to timestamp 594 failed. Setting partition 0's internal counter to 594
pykafka.simpleconsumer: WARNING: Offset reset for partition 2 to timestamp 294 failed. Setting partition 2's internal counter to 294
pykafka.simpleconsumer: DEBUG: Fetched 5 messages for partition 0
pykafka.simpleconsumer: DEBUG: Partition 0 queue holds 5 messages
pykafka.simpleconsumer: DEBUG: Fetched 5 messages for partition 2
pykafka.simpleconsumer: DEBUG: Partition 2 queue holds 5 messages
--------------------- >> end captured logging << ---------------------
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah - the problem is in the test assertion of course: we're comparing a last_offset_consumed
to the next msg.offset
, so these should be off by one. Sorry for the noise.
I realised the failure noted in the parent commit was actually due to a confusion on my part, caused by the fact that interfaces like Topic.latest_available_offsets return the *next* offset you might want to read, whereas SimpleConsumer.reset_offsets accepts the last offset you want to mark as read. This rewrites the tests a bit to make that explicit. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Add an assert for when we specify an invalid offset. The rest of the test got rewritten in the process, now a bit less convoluted. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good catch, though I'm surprised it wasn't already fixed. I could've sworn I made this exact change somewhere else.
The tests look great, and as far as I can tell the important invariants of |
Thanks for checking this! So 50561ee is the result of a I then tried to do the same thing with 3cea51c, but wanted to check with you that that really does all the things it should do. |
I'm afraid I've one or two more commits coming up, because the last test is now so strict that there's no good way for (The difference in behaviour is, that when you hand SimpleConsumer an invalid offset, it almost immediately detects that, through the fetcher thread, and so a split second afterwards you've already got an auto-resetted valid offset. Whereas in RdKafkaSimpleConsumer, the held offsets only get updated once you've consumed from that partition, so it takes until after a consume() before the valid offset appears.) |
Thanks for the explanation, the error case makes sense now. |
This deals with a problem where a manual reset_offsets() (which flushes the queues of the affected partitions) followed by a consume() would get stuck in an infinite loop: the semaphore didn't get decremented during the flush(), and so the consume() keeps cycling partitions trying to find messages it thinks are there. The fix seems kind of inefficient, but I hope that's ok because flush() shouldn't ever be part of a performance-critical operation. The accompanying test update is how I hit the problem. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
It turned out the "set an invalid offset" part of the last test was asking a bit more than RdKafkaSimpleConsumer (see #176) can deliver: it expected invalid offsets to be reset almost immediately, whereas this reset, if it happens internally to the rdkafka consumer, only gets visible to us after we consume() a message. I think that's ok, for the uses where we do this at all: if we passed an invalid offset to reset_offsets(), that means we have some external offset store that held an invalid offset anyway, so the fact that self.held_offsets may keep reporting that invalid offset back to us for a bit longer (ie until a message is consumed) doesn't make matters worse. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
As of 2dc86ce (#194) reset_offsets() is a public method that should be working correctly on a running consumer. This commit deals with that by completely nuking the internal rdkafka consumer, which should be ok unless someone wanted to call reset_offsets() a lot more than expected. Tests for this are currently on a separate branch, see #213. As a bonus, this also made fetch_offsets work for a running consumer. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
…_offsets Just to resolve conflicts. * parsely/master: link to python docs tuple instead of two args pass around source_address to allow KafkaClient to configure it Revert "remove tabulate from main requirements" remove tabulate from main requirements stop using dict() in simpleconsumer Fixing a typo in topic.py, partiton instead of partitions Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl> Conflicts: pykafka/simpleconsumer.py
Ok, apologies for slapping on more commits, I missed some problems jumping back and forth between this branch and the rdkafka branch. Should now be really actually totally ready. The additional changes affect |
This is a safer version, that also doesn't need as many comments explaining it to myself. The previous version dangerously assumed that it's always called from code that holds self.fetch_lock, which is bound to blow up in some future use case. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Tests and fixes for SimpleConsumer.reset_offsets
This adds tests for reset_offsets (which I'm also looking to use to ensure the implementation of reset_offsets in #176 will be conformant), and fixes some issues with that method which I encountered in the course of writing tests.