Skip to content

KafkaConsumer is losing partition in 1.4.3 version #1590

@nikitacherevko

Description

@nikitacherevko

We had been using 1.3.5 version until we had faced the issue with DNS lookup, so we decided to upgrade the version of kafka-python to 1.4.3. But somehow consumer stops consume messages (but we send it for sure), lag becomes bigger and bigger for 1 partition (total amount of partitions for group is 1), so we assume consumer lost partition.
Here is the way how we create consumer:

 consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
                                     group_id=group,
                                     auto_offset_reset='earliest')
 consumer.subscribe(['{}'.format(topic)])

Besides I'd like to add some logs:

Time Level Message
  12:16:08.891 DEBUG Completed autocommit of offsets {} for group staging_kilda-tpe
  12:16:08.839 DEBUG No offsets to commit
  12:16:03.866 DEBUG No partitions assigned; sleeping for 2.94898533821
  12:16:03.815 INFO Successfully joined group staging_kilda-tpe with generation 361
  12:16:03.815 INFO Setting newly assigned partitions set([]) for group staging_kilda-tpe
  12:16:03.815 INFO Updated partition assignment: []
  12:16:03.814 DEBUG Received correlation id: 2
  12:16:03.814 DEBUG Processing response SyncGroupResponse_v0
  12:16:03.814 DEBUG Updated cluster metadata to ClusterMetadata(brokers: 3, topics: 1, groups: 1)
  12:16:03.814 DEBUG <BrokerConnection node_id=1010 host=host1.com:6667 [IPv4 ('192.168.0.138', 6667)]> Response 2 (3.32689285278 ms): SyncGroupResponse_v0(error_code=0, member_assignment='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00\x00\x00\x00\x00')
  12:16:03.813 DEBUG Received correlation id: 5
  12:16:03.813 DEBUG Processing response MetadataResponse_v1
  12:16:03.813 DEBUG <BrokerConnection node_id=1011 host=host3.com:6667 [IPv4 ('192.168.0.111', 6667)]> Response 5 (1.08695030212 ms): MetadataResponse_v1(brokers=[(node_id=1010, host=u'host1.com', port=6667, rack=u'/default-rack'), (node_id=1012, host=u'host2.com', port=6667, rack=u'/default-rack'), (node_id=1011, host=u'host3.com', port=6667, rack=u'/default-rack')], controller_id=1011, topics=[(error_code=0, topic=u'staging_kilda.topo.eng', is_internal=False, partitions=[(error_code=0, partition=0, leader=1012, replicas=[1012], isr=[1012])])])
  12:16:03.812 DEBUG Sending request MetadataRequest_v1(topics=['staging_kilda.topo.eng'])
  12:16:03.812 DEBUG <BrokerConnection node_id=1011 host=host3.com:6667 [IPv4 ('192.168.0.111', 6667)]> Request 5: MetadataRequest_v1(topics=['staging_kilda.topo.eng'])
  12:16:03.811 DEBUG <BrokerConnection node_id=1010 host=host1.com:6667 [IPv4 ('192.168.0.138', 6667)]> Request 2: SyncGroupRequest_v0(group='staging_kilda-tpe', generation_id=361, member_id=u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65', group_assignment=[(member_id=u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65', member_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00\x00\x00\x00\x00'), (member_id=u'kafka-python-1.4.3-dbcfb90f-cf07-41c1-b40b-9b0dda445010', member_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')])
  12:16:03.811 DEBUG Sending metadata request MetadataRequest_v1(topics=['staging_kilda.topo.eng']) to node 1011
  12:16:03.810 DEBUG Sending request SyncGroupRequest_v0(group='staging_kilda-tpe', generation_id=361, member_id=u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65', group_assignment=[(member_id=u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65', member_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00\x00\x00\x00\x00'), (member_id=u'kafka-python-1.4.3-dbcfb90f-cf07-41c1-b40b-9b0dda445010', member_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')])
  12:16:03.810 DEBUG Sending leader SyncGroup for group staging_kilda-tpe to coordinator 1010: SyncGroupRequest_v0(group='staging_kilda-tpe', generation_id=361, member_id=u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65', group_assignment=[(member_id=u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65', member_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00\x00\x00\x00\x00'), (member_id=u'kafka-python-1.4.3-dbcfb90f-cf07-41c1-b40b-9b0dda445010', member_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')])
  12:16:03.809 DEBUG Finished assignment for group staging_kilda-tpe: {u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65': ConsumerProtocolMemberAssignment(version=0, assignment=[(topic=u'staging_kilda.topo.eng', partitions=[])], user_data=''), u'kafka-python-1.4.3-dbcfb90f-cf07-41c1-b40b-9b0dda445010': ConsumerProtocolMemberAssignment(version=0, assignment=[(topic=u'staging_kilda.topo.eng', partitions=[0])], user_data='')}
  12:16:03.809 DEBUG Performing assignment for group staging_kilda-tpe using strategy range with subscriptions {u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65': ConsumerProtocolMemberMetadata(version=0, subscription=[u'staging_kilda.topo.eng'], user_data=''), u'kafka-python-1.4.3-dbcfb90f-cf07-41c1-b40b-9b0dda445010': ConsumerProtocolMemberMetadata(version=0, subscription=[u'staging_kilda.topo.eng'], user_data='')}
  12:16:03.808 DEBUG Received successful JoinGroup response for group staging_kilda-tpe: JoinGroupResponse_v1(error_code=0, generation_id=361, group_protocol=u'range', leader_id=u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65', member_id=u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65', members=[(member_id=u'kafka-python-1.4.3-dbcfb90f-cf07-41c1-b40b-9b0dda445010', member_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00'), (member_id=u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65', member_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00')])
  12:16:03.808 INFO Elected group leader -- performing partition assignments using range
  12:16:03.807 DEBUG <BrokerConnection node_id=1010 host=host1.com:6667 [IPv4 ('192.168.0.138', 6667)]> Response 1 (146.106958389 ms): JoinGroupResponse_v1(error_code=0, generation_id=361, group_protocol=u'range', leader_id=u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65', member_id=u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65', members=[(member_id=u'kafka-python-1.4.3-dbcfb90f-cf07-41c1-b40b-9b0dda445010', member_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00'), (member_id=u'kafka-python-1.4.3-fd4b56a0-c514-4f96-8a65-024c31ac4c65', member_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00')])
  12:16:03.806 DEBUG Received correlation id: 1
  12:16:03.806 DEBUG Processing response JoinGroupResponse_v1
  12:16:03.664 DEBUG Updated cluster metadata to ClusterMetadata(brokers: 3, topics: 1, groups: 1)
  12:16:03.663 DEBUG Received correlation id: 4
  12:16:03.663 DEBUG <BrokerConnection node_id=1011 host=host3.com:6667 [IPv4 ('192.168.0.111', 6667)]> Response 4 (1.62696838379 ms): MetadataResponse_v1(brokers=[(node_id=1010, host=u'host1.com', port=6667, rack=u'/default-rack'), (node_id=1012, host=u'host2.com', port=6667, rack=u'/default-rack'), (node_id=1011, host=u'host3.com', port=6667, rack=u'/default-rack')], controller_id=1011, topics=[(error_code=0, topic=u'staging_kilda.topo.eng', is_internal=False, partitions=[(error_code=0, partition=0, leader=1012, replicas=[1012], isr=[1012])])])
  12:16:03.663 DEBUG Processing response MetadataResponse_v1
  12:16:03.662 DEBUG <BrokerConnection node_id=1011 host=host3.com:6667 [IPv4 ('192.168.0.111', 6667)]> Request 4: MetadataRequest_v1(topics=['staging_kilda.topo.eng'])
  12:16:03.661 DEBUG Sending metadata request MetadataRequest_v1(topics=['staging_kilda.topo.eng']) to node 1011
  12:16:03.661 DEBUG <BrokerConnection node_id=1010 host=host1.com:6667 [IPv4 ('192.168.0.138', 6667)]> Request 1: JoinGroupRequest_v1(group='staging_kilda-tpe', session_timeout=10000, rebalance_timeout=300000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00')])
  12:16:03.661 DEBUG Sending request MetadataRequest_v1(topics=['staging_kilda.topo.eng'])
  12:16:03.660 DEBUG Sending JoinGroup (JoinGroupRequest_v1(group='staging_kilda-tpe', session_timeout=10000, rebalance_timeout=300000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00')])) to coordinator 1010
  12:16:03.660 DEBUG Sending request JoinGroupRequest_v1(group='staging_kilda-tpe', session_timeout=10000, rebalance_timeout=300000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata='\x00\x00\x00\x00\x00\x01\x00\x16staging_kilda.topo.eng\x00\x00\x00\x00')])
  12:16:03.659 INFO (Re-)joining group staging_kilda-tpe

We definitely have only one consumer per group and one partition.

We have resolved this issue by downgrading the version of kafka-python to 1.4.0, but I guess it is just a temporary solution and we would like to know if we are doing something wrong or it is an issue in the library.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions