Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer will stuck in start() coroutine with JoinGroupCoordinator if time between Consumer.__init__() and consumer.start() is greater than several minutes #764

Closed
mrmerkone opened this issue Jul 7, 2021 · 1 comment · Fixed by #766

Comments

@mrmerkone
Copy link

mrmerkone commented Jul 7, 2021

Describe the bug
Consumer will not perform join group request on consumer.start() and become stuck if consumer object created more than max_poll_interval_ms ago

Expected behaviour
Consumer performs join group request on consumer.start()

Environment (please complete the following information):

  • Python 3.8.7
  • aiokafka version 0.7.1 | 0.6.0
  • kafka-python version 2.0.2
  • Kafka Broker version: Apache Kafka 2.2.0-cp2 (i do not have direct access to the broker)
  • Other information: Confluent Platform 5.2.1

Reproducible example

import asyncio
from aiokafka import AIOKafkaConsumer

async def test_with_implicit_subscribe():
    consumer = AIOKafkaConsumer("myTopic", group_id="my_group") 
    await asyncio.sleep(900)  # some very long initialization here longer than max_poll_interval_ms
    await consumer.start()  # will stuck here
    print("i will never be called!")

async def test_with_explicit_subscribe():
    consumer = AIOKafkaConsumer(group_id="my_group")
    await asyncio.sleep(900)  # some very long initialization here longer than max_poll_interval_ms
    await consumer.subscribe(["myTopic"])
    await consumer.start()  # will stuck here
    print("i will never be called!")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test_with_implicit_subscribe())
    loop.run_until_complete(test_with_explicit_subscribe())

Logs:

[DEBUG][2021-07-07 12:34:42,739]asyncio: Using selector: KqueueSelector
[INFO][2021-07-07 12:34:42,739]aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'myTopic'})
[DEBUG][2021-07-07 12:49:42,751]aiokafka: Attempting to bootstrap via node at some_prod_host_1:9092
[DEBUG][2021-07-07 12:49:42,780]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Request 1: MetadataRequest_v0()
[DEBUG][2021-07-07 12:49:42,992]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Response 1: MetadataResponse_v0()
[DEBUG][2021-07-07 12:49:42,996]aiokafka.cluster: Updated cluster metadata to ClusterMetadata(brokers: 2, topics: 1073, groups: 0)
[DEBUG][2021-07-07 12:49:42,996]aiokafka.conn: Closing connection at some_prod_host_1:9092
[DEBUG][2021-07-07 12:49:42,996]aiokafka: Received cluster metadata: ClusterMetadata(brokers: 2, topics: 1073, groups: 0)
[DEBUG][2021-07-07 12:49:42,997]aiokafka: Initiating connection to node 2 at some_prod_host_2
[DEBUG][2021-07-07 12:49:43,113]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_2> Request 1: ApiVersionRequest_v0()
[DEBUG][2021-07-07 12:49:43,205]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_2> Response 1: ApiVersionResponse_v0()
[DEBUG][2021-07-07 12:49:43,205]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_2> Request 2: MetadataRequest_v0()
[DEBUG][2021-07-07 12:49:43,618]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_2> Response 2: MetadataResponse_v0()
[DEBUG][2021-07-07 12:49:43,618]aiokafka.conn: Closing connection at some_prod_host_2
[DEBUG][2021-07-07 12:49:43,619]aiokafka: Sending FindCoordinator request for key my_group to broker 1
[DEBUG][2021-07-07 12:49:43,619]aiokafka: Initiating connection to node 1 at some_prod_host_1:9092
[DEBUG][2021-07-07 12:49:43,620]aiokafka: Initiating connection to node 1 at some_prod_host_1:9092
[DEBUG][2021-07-07 12:49:43,705]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Request 1: ApiVersionRequest_v0()
[DEBUG][2021-07-07 12:49:43,805]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Response 1: ApiVersionResponse_v0()
[DEBUG][2021-07-07 12:49:43,805]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Request 2: FindCoordinatorRequest_v1()
[DEBUG][2021-07-07 12:49:43,805]aiokafka: Sending metadata request MetadataRequest_v1() to node 1
[DEBUG][2021-07-07 12:49:43,805]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Request 3: MetadataRequest_v1()
[DEBUG][2021-07-07 12:49:43,880]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Response 2: FindCoordinatorResponse_v1()
[DEBUG][2021-07-07 12:49:43,881]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Response 3: MetadataResponse_v1()
[DEBUG][2021-07-07 12:49:43,881]aiokafka: Received group coordinator response FindCoordinatorResponse_v1()
[DEBUG][2021-07-07 12:49:43,881]aiokafka: Initiating connection to node 1 at some_prod_host_1:9092
[DEBUG][2021-07-07 12:49:43,882]aiokafka.cluster: Updated cluster metadata to ClusterMetadata(brokers: 2, topics: 1, groups: 0)
[DEBUG][2021-07-07 12:49:44,023]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Request 1: ApiVersionRequest_v0()
[DEBUG][2021-07-07 12:49:44,131]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Response 1: ApiVersionResponse_v0()
[INFO][2021-07-07 12:49:44,131]aiokafka.consumer.group_coordinator: Discovered coordinator 1 for group my_group
[INFO][2021-07-07 12:49:44,131]aiokafka.consumer.group_coordinator: Revoking previously assigned partitions set() for group my_group
@ods
Copy link
Collaborator

ods commented Jul 12, 2021

A possible fix could be by changing this line from

if idle_time >= self._max_poll_interval:

to

if prev_assignment is not None and idle_time >= self._max_poll_interval:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants