Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aiokafka consumer don't reset offset when offset is out of range in specific case. #261

Closed
tfilipczak opened this issue Nov 23, 2017 · 12 comments
Labels
Milestone

Comments

@tfilipczak
Copy link

Version information:

Kafka cluster: 0.10.2.1
aiokafka: 0.3.1
Python: 3.6.1

Consumer script:

import logging
import sys

from aiokafka import AIOKafkaConsumer
import asyncio


logging.basicConfig(format='%(asctime)-15s %(levelname)s %(module)s:%(lineno)s %(name)s: %(message)s',
        level=logging.INFO)
logger = logging.getLogger()


loop = asyncio.get_event_loop()


async def consume():
    consumer = AIOKafkaConsumer(
            'test-topic',
            loop=loop, bootstrap_servers='192.168.1.101:9092',
            group_id='aiokafka-test',
            auto_offset_reset='earliest')
    await consumer.start()
    try:
        async for msg in consumer:
            logger.info("consumed: %s, %s, %s, %s, %s, %s", msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp)
    finally:
        await consumer.stop()

loop.run_until_complete(consume())

Bug reproduce flow:

  1. Set topic retention in kafka - for example: 5 min.
  2. Start producing messages to kafka.
  3. Start consumer script - it consume messages from kafka correctly.
  4. Stop consumer script (kill) to generate lag in Kafka - messages are still produced.
  5. Wait until kafka delete part of data which was not consumed. In that moment our consumer group 'aiokafka-test' has no active members.
~$ kafka-consumer-groups.sh --bootstrap-server 192.168.1.101:9092 --group aiokafka-test --describe
Consumer group 'aiokafka-test' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic 6 3263917 3263942 25 - - -
test-topic 2 3268233 3268253 20 - - -
test-topic 7 3264122 3264139 17 - - -
test-topic 8 3262524 3262553 29 - - -
test-topic 3 3264791 3264810 19 - - -
test-topic 4 3265363 3265386 23 - - -
test-topic 0 3262134 3262160 26 - - -
test-topic 5 3266556 3266575 19 - - -
test-topic 1 3264517 3264539 22 - - -
  1. Start consumer script - consumer notice that offset is out of range (log), but don't reset it.
    In this case consumer don't consume messages in this group_id.
$ kafka-consumer-groups.sh --bootstrap-server 192.168.1.101:9092 --group aiokafka-test --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic 0 3262134 3262198 64 aiokafka-0.3.1-56c9c309-0d0f-4bb9-8a4c-e3353ca5f69d/192.168.1.100 aiokafka-0.3.1
test-topic 1 3264517 3264566 49 aiokafka-0.3.1-56c9c309-0d0f-4bb9-8a4c-e3353ca5f69d/192.168.1.100 aiokafka-0.3.1
test-topic 2 3268233 3268285 52 aiokafka-0.3.1-56c9c309-0d0f-4bb9-8a4c-e3353ca5f69d/192.168.1.100 aiokafka-0.3.1
test-topic 3 3264791 3264834 43 aiokafka-0.3.1-56c9c309-0d0f-4bb9-8a4c-e3353ca5f69d/192.168.1.100 aiokafka-0.3.1
test-topic 4 3265363 3265424 61 aiokafka-0.3.1-56c9c309-0d0f-4bb9-8a4c-e3353ca5f69d/192.168.1.100 aiokafka-0.3.1
test-topic 5 3266556 3266618 62 aiokafka-0.3.1-56c9c309-0d0f-4bb9-8a4c-e3353ca5f69d/192.168.1.100 aiokafka-0.3.1
test-topic 6 3263917 3263978 61 aiokafka-0.3.1-56c9c309-0d0f-4bb9-8a4c-e3353ca5f69d/192.168.1.100 aiokafka-0.3.1
test-topic 7 3264122 3264173 51 aiokafka-0.3.1-56c9c309-0d0f-4bb9-8a4c-e3353ca5f69d/192.168.1.100 aiokafka-0.3.1
test-topic 8 3262524 3262581 57 aiokafka-0.3.1-56c9c309-0d0f-4bb9-8a4c-e3353ca5f69d/192.168.1.100 aiokafka-0.3.1

In logs:

2017-11-23 12:25:31,586 INFO subscription_state:147 kafka.consumer.subscription_state: Updating subscribed topics to: ('test-topic',)
2017-11-23 12:25:31,600 INFO group_coordinator:695 aiokafka.group_coordinator: Discovered coordinator 508 for group aiokafka-test
2017-11-23 12:25:31,602 INFO group_coordinator:312 aiokafka.group_coordinator: Revoking previously assigned partitions set() for group aiokafka-test
2017-11-23 12:25:31,602 INFO group_coordinator:880 aiokafka.group_coordinator: (Re-)joining group aiokafka-test
2017-11-23 12:25:31,607 INFO group_coordinator:936 aiokafka.group_coordinator: Joined group 'aiokafka-test' (generation 136) with member_id aiokafka-0.3.1-56c9c309-0d0f-4bb9-8a4c-e3353ca5f69d
2017-11-23 12:25:31,607 INFO group_coordinator:940 aiokafka.group_coordinator: Elected group leader -- performing partition assignments using roundrobin
2017-11-23 12:25:31,610 INFO group_coordinator:1025 aiokafka.group_coordinator: Successfully synced group aiokafka-test with generation 136
2017-11-23 12:25:31,610 INFO subscription_state:233 kafka.consumer.subscription_state: Updated partition assignment: [TopicPartition(topic='test-topic', partition=0), TopicPartition(topic='test-topic', partition=1), TopicPartition(topic='test-topic', partition=2), TopicPartition(topic='test-topic', partition=3), TopicPartition(topic='test-topic', partition=4), TopicPartition(topic='test-topic', partition=5), TopicPartition(topic='test-topic', partition=6), TopicPartition(topic='test-topic', partition=7), TopicPartition(topic='test-topic', partition=8)]
2017-11-23 12:25:31,610 INFO group_coordinator:390 aiokafka.group_coordinator: Setting newly assigned partitions {TopicPartition(topic='test-topic', partition=5), TopicPartition(topic='test-topic', partition=7), TopicPartition(topic='test-topic', partition=1), TopicPartition(topic='test-topic', partition=8), TopicPartition(topic='test-topic', partition=3), TopicPartition(topic='test-topic', partition=4), TopicPartition(topic='test-topic', partition=6), TopicPartition(topic='test-topic', partition=0), TopicPartition(topic='test-topic', partition=2)} for group aiokafka-test
2017-11-23 12:25:31,801 INFO fetcher:446 aiokafka.fetcher: Fetch offset 3262524 is out of range, resetting offset
2017-11-23 12:25:31,801 INFO fetcher:446 aiokafka.fetcher: Fetch offset 3262134 is out of range, resetting offset
2017-11-23 12:25:31,801 INFO fetcher:446 aiokafka.fetcher: Fetch offset 3265363 is out of range, resetting offset
2017-11-23 12:25:31,802 INFO fetcher:446 aiokafka.fetcher: Fetch offset 3266556 is out of range, resetting offset
2017-11-23 12:25:31,803 INFO fetcher:446 aiokafka.fetcher: Fetch offset 3264517 is out of range, resetting offset
2017-11-23 12:25:31,804 INFO fetcher:446 aiokafka.fetcher: Fetch offset 3264122 is out of range, resetting offset
2017-11-23 12:25:31,804 INFO fetcher:446 aiokafka.fetcher: Fetch offset 3263917 is out of range, resetting offset
2017-11-23 12:25:31,804 INFO fetcher:446 aiokafka.fetcher: Fetch offset 3264791 is out of range, resetting offset
2017-11-23 12:25:31,805 INFO fetcher:446 aiokafka.fetcher: Fetch offset 3268233 is out of range, resetting offset

In debug level appears periodically:

2017-11-23 12:26:56,740 DEBUG group_coordinator:786 aiokafka.group_coordinator: Heartbeat: aiokafka-test[138] aiokafka-0.3.1-091330e5-c8e5-43ae-b348-13bf685b73ce
2017-11-23 12:26:56,741 DEBUG conn:227 aiokafka.conn: <AIOKafkaConnection host=vm-192-168-1-102 port=9092> Response 4: HeartbeatResponse_v0(error_code=0)
2017-11-23 12:26:56,741 DEBUG group_coordinator:819 aiokafka.group_coordinator: Received successful heartbeat response for group aiokafka-test
2017-11-23 12:26:58,840 DEBUG group_coordinator:517 aiokafka.group_coordinator: No offsets to commit

After I run kafka-console-consumer.sh for group.id=aiokafka-test offset was reset and aiokafka consumer again consume messages.

@tvoinarovskyi
Copy link
Member

tvoinarovskyi commented Dec 11, 2017

Hi there, sorry for the long wait. Now this is quite critical. I have a hunch this was fixed in https://github.com/aio-libs/aiokafka/pull/231/files. Will try to reproduce on master. If you could do it also would be great.

@czchen
Copy link

czchen commented Dec 15, 2017

Hi @tvoinarovskyi ,

We also have this issue, and master branch does not work for us. In our case, only partition 4 is reset, other partitions are not.

In requirements.txt

git+git://github.com/aio-libs/aiokafka.git#egg=aiokafka

In our consumer via aiokafka

2017-12-15 11:16:41,621 - aiokafka.consumer.fetcher - INFO - Fetch offset 540644690 is out of range for partition TopicPartition(topic='rt_postback', 
partition=1), resetting offset                                                                                                                        
2017-12-15 11:16:41,621 - aiokafka.consumer.fetcher - INFO - Fetch offset 540644690 is out of range for partition TopicPartition(topic='rt_postback', 
partition=1), resetting offset                                                                                                                        
2017-12-15 11:16:41,622 - aiokafka.consumer.fetcher - INFO - Fetch offset 540571453 is out of range for partition TopicPartition(topic='rt_postback', 
partition=7), resetting offset                                                                                                                        
2017-12-15 11:16:41,622 - aiokafka.consumer.fetcher - INFO - Fetch offset 540571453 is out of range for partition TopicPartition(topic='rt_postback', 
partition=7), resetting offset                                                                                                                        
2017-12-15 11:16:41,622 - aiokafka.consumer.fetcher - INFO - Fetch offset 540631693 is out of range for partition TopicPartition(topic='rt_postback', 
partition=3), resetting offset                                                                                                                        
2017-12-15 11:16:41,622 - aiokafka.consumer.fetcher - INFO - Fetch offset 540631693 is out of range for partition TopicPartition(topic='rt_postback', 
partition=3), resetting offset     

From kafka log

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST
                CLIENT-ID
rt_postback                    0          540727601       541229666       502065     ...
rt_postback                    1          540818396       541277628       459232     ...
rt_postback                    2          540759567       541219507       459940     ...
rt_postback                    3          540795985       541277870       481885     ...
rt_postback                    4          541224957       541225780       823        ...
rt_postback                    5          540828746       541272914       444168     ...
rt_postback                    6          540718779       541226172       507393     ...
rt_postback                    7          540749927       541271950       522023     ...
rt_postback                    8          540708775       541212706       503931     ...
rt_postback                    9          540806220       541274354       468134     ...
rt_postback                    10         540708422       541207159       498737     ...
rt_postback                    11         540794260       541272072       477812     ...

@tvoinarovskyi tvoinarovskyi added this to the 0.4.0 milestone Dec 15, 2017
@tvoinarovskyi
Copy link
Member

Will try to reproduce it and see what can be done on this weekend than, great thanks for the feedback.

@tvoinarovskyi
Copy link
Member

@czchen Say, what version of Kafka broker do you use? I have trouble reproducing this on 0.10.1...

@tvoinarovskyi
Copy link
Member

Tried with master and Kafka 1.0.0. Still no luck. Partitions reset properly:

MacBook-Pro-Taras:kafka_2.11-1.0.0 taras$ ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group aiokafka-test --describe
Note: This will not show information about old Zookeeper-based consumers.


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test-topic                     0          1446            1446            0          aiokafka-0.3.2.dev-14877f42-755b-44c2-9f00-2588fb53f67d/192.168.100.110               aiokafka-0.3.2.dev
test-topic                     1          24              24              0          aiokafka-0.3.2.dev-14877f42-755b-44c2-9f00-2588fb53f67d/192.168.100.110               aiokafka-0.3.2.dev
test-topic                     2          26              26              0          aiokafka-0.3.2.dev-14877f42-755b-44c2-9f00-2588fb53f67d/192.168.100.110               aiokafka-0.3.2.dev
test-topic                     3          31              31              0          aiokafka-0.3.2.dev-14877f42-755b-44c2-9f00-2588fb53f67d/192.168.100.110               aiokafka-0.3.2.dev
test-topic                     4          31              32              1          aiokafka-0.3.2.dev-14877f42-755b-44c2-9f00-2588fb53f67d/192.168.100.110               aiokafka-0.3.2.dev
test-topic                     5          29              29              0          aiokafka-0.3.2.dev-14877f42-755b-44c2-9f00-2588fb53f67d/192.168.100.110               aiokafka-0.3.2.dev
test-topic                     6          22              22              0          aiokafka-0.3.2.dev-14877f42-755b-44c2-9f00-2588fb53f67d/192.168.100.110               aiokafka-0.3.2.dev
test-topic                     7          29              30              1          aiokafka-0.3.2.dev-14877f42-755b-44c2-9f00-2588fb53f67d/192.168.100.110               aiokafka-0.3.2.dev
test-topic                     8          32              32              0          aiokafka-0.3.2.dev-14877f42-755b-44c2-9f00-2588fb53f67d/192.168.100.110               aiokafka-0.3.2.dev
test-topic                     9          22              22              0          aiokafka-0.3.2.dev-14877f42-755b-44c2-9f00-2588fb53f67d/192.168.100.110               aiokafka-0.3.2.dev

Used script provided by @tfilipczak. Producer sent 1 msg per second.

@czchen
Copy link

czchen commented Dec 16, 2017

@tvoinarovskyi The Kafka version we used is 1.0.0. Any log we shall turn on for debugging this issue? We can help to collect log.

@czchen
Copy link

czchen commented Jan 10, 2018

Any update for this issue?

@tvoinarovskyi
Copy link
Member

Hey there, there is quite a bit of progress, but to solve this I had to rewrite a lot. It's mostly finished but needs debugging and some tests still fail. I think I will post some PR any time soon, can't promise it will be this week thou.

To be specific the big problem here is that some state management that workes in kafka-python and Java client can't be used in asyncio. So I rewrote the subscription_state component, that currently uses kafka-python's implementation. It's quite hard to manage the flow here correctly, a lot thing can happen during a request to the broker.

@tvoinarovskyi
Copy link
Member

@czchen Hey there, if it's not too hard, could you try #286? It still misses some tests and cleanup, but the main functional test suite passes.

@czchen
Copy link

czchen commented Jan 22, 2018

@tvoinarovskyi I got the following error when reprocedure this issue. Could you help to take a look at it?

2018-01-22 17:48:03,453 - aiokafka.consumer.fetcher - ERROR - Unexpected error in fetcher routine                                                     
Traceback (most recent call last):                                                                                                                    
  File ".../venv/src/aiokafka/aiokafka/consumer/fetcher.py", line 360, in _fetch_requests_routine       
    has_new_data = any(fut.result() for fut in done_pending)  
  File ".../venv/src/aiokafka/aiokafka/consumer/fetcher.py", line 360, in <genexpr>
    has_new_data = any(fut.result() for fut in done_pending)                                                                     
  File ".../venv/src/aiokafka/aiokafka/consumer/fetcher.py", line 519, in _proc_fetch_request
    tp_state.await_reset(tp, self._default_reset_strategy)                                                                         
TypeError: await_reset() takes 2 positional arguments but 3 were given     

@tvoinarovskyi
Copy link
Member

Hey there, you probably tried a bit older commit on the PR, sorry. I fixed that mistype after covering it properly. It should be good to go now. Try e9ea0b8.

@tvoinarovskyi
Copy link
Member

Should be fixed by #286

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants