Skip to content

[BUG] kafka consumer can not be assigned. #691

@mooc9988

Description

@mooc9988

Version & Environment

Commit:55496072a00f60f5

What went wrong?

consumer stuck in 'ensureCoordinatorReady' since coordinator is not available.

"main" #1 prio=5 os_prio=0 cpu=675604.67ms elapsed=61267.64s tid=0x00007f4ee00157c0 nid=0x7303 waiting on condition  [0x00007f4ee65fe000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(java.base@17.0.9/Native Method)
        at org.apache.kafka.common.utils.Utils.sleep(Utils.java:381)
        at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
        at org.apache.kafka.common.utils.Timer.sleep(Timer.java:203)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:286)
        - locked <0x0000000088700000> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240)
        - locked <0x0000000088700000> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnreadySync(ConsumerCoordinator.java:497)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:529)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1272)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1236)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at io.openmessaging.benchmark.longrunning.service.OneShotKafkaConsumingTaskService.prepare(OneShotKafkaConsumingTaskService.java:97)
        at io.openmessaging.benchmark.WorkloadGenerator.runOneShotKafkaConsumingTask(WorkloadGenerator.java:253)
        at io.openmessaging.benchmark.WorkloadGenerator.runTasks(WorkloadGenerator.java:362)
        at io.openmessaging.benchmark.WorkloadGenerator.longRunning(WorkloadGenerator.java:184)
        at io.openmessaging.benchmark.Benchmark.lambda$main$0(Benchmark.java:192)

consumer group log in server, throwing NOT_COORDINATOR error :

[2024-01-17 19:46:55,526] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group testMain-sub-oneShotKafkaConsumingTask in Empty state. Created a new member id consumer-testMain-sub-oneShotKafkaConsumingTask-3-7ea2d309-e1ec-4f7e-9c23-b742f25b3490 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2024-01-17 19:46:55,526] INFO [GroupCoordinator 0]: Preparing to rebalance group testMain-sub-oneShotKafkaConsumingTask in state PreparingRebalance with old generation 0 (__consumer_offsets-45) (reason: Adding new member consumer-testMain-sub-oneShotKafkaConsumingTask-3-7ea2d309-e1ec-4f7e-9c23-b742f25b3490 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
[2024-01-17 19:46:58,527] INFO [GroupCoordinator 0]: Stabilized group testMain-sub-oneShotKafkaConsumingTask generation 1 (__consumer_offsets-45) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2024-01-17 19:46:58,528] INFO [GroupCoordinator 0]: Assignment received from leader consumer-testMain-sub-oneShotKafkaConsumingTask-3-7ea2d309-e1ec-4f7e-9c23-b742f25b3490 for group testMain-sub-oneShotKafkaConsumingTask for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2024-01-17 19:46:58,530] INFO [GroupCoordinator 0]: Preparing to rebalance group testMain-sub-oneShotKafkaConsumingTask in state PreparingRebalance with old generation 1 (__consumer_offsets-45) (reason: Error NOT_COORDINATOR when storing group assignment during SyncGroup (member: consumer-testMain-sub-oneShotKafkaConsumingTask-3-7ea2d309-e1ec-4f7e-9c23-b742f25b3490)) (kafka.coordinator.group.GroupCoordinator)

stream info of __consumer_offsets-45 :

[2024-01-17 09:43:25,313] INFO [ElasticLog partition=__consumer_offsets-45 epoch=2] opened existing meta stream: stream_id=87 (kafka.log.streamaspect.ElasticLog$)
[2024-01-17 09:43:25,332] INFO [ElasticLog partition=__consumer_offsets-45 epoch=2] loaded partition meta: ElasticPartitionMeta{startOffset=0, cleanerOffset=0, recoverOffset=0, cleanedShutdown=true} (kafka.log.streamaspect.ElasticLog$)
[2024-01-17 09:43:25,332] INFO [ElasticLog partition=__consumer_offsets-45 epoch=2] loaded no producer snapshots (kafka.log.streamaspect.ElasticLog$)
[2024-01-17 09:43:25,343] INFO [ElasticLog partition=__consumer_offsets-45 epoch=2] loaded log meta: ElasticLogMeta{streamMap={log=126, tim=157, txn=-1}, lastNthSegmentMetas=[ElasticStreamSegmentMeta{baseOffset=0, createTimestamp=1705482090780, lastModifiedTimestamp=0, streamSuffix='', logSize=0, log=[0, -1], time=[0, -1], txn=[0, -1], firstBatchTimestamp=0, timeIndexLastEntry=TimestampOffsetData{timestamp=-1, offset=0}}]} (kafka.log.streamaspect.ElasticLog$)

While stream 87 keeps pending in fetching objects:

[2024-01-18 03:02:41,158] INFO [FetchObjects],[PENDING],streamId=115 startOffset=11 endOffset=16 limit=2 (kafka.log.stream.s3.metadata.StreamMetadataManager)
[2024-01-18 03:02:41,158] INFO [FetchObjects],[PENDING],streamId=86 startOffset=22 endOffset=28 limit=2 (kafka.log.stream.s3.metadata.StreamMetadataManager)
[2024-01-18 03:02:41,158] INFO [FetchObjects],[PENDING],streamId=146 startOffset=5 endOffset=10 limit=2 (kafka.log.stream.s3.metadata.StreamMetadataManager)
[2024-01-18 03:02:41,158] INFO [FetchObjects],[PENDING],streamId=44 startOffset=23 endOffset=28 limit=2 (kafka.log.stream.s3.metadata.StreamMetadataManager)
[2024-01-18 03:02:41,158] INFO [FetchObjects],[PENDING],streamId=55 startOffset=11 endOffset=16 limit=2 (kafka.log.stream.s3.metadata.StreamMetadataManager)
[2024-01-18 03:02:41,158] INFO [FetchObjects],[PENDING],streamId=87 startOffset=5 endOffset=10 limit=2 (kafka.log.stream.s3.metadata.StreamMetadataManager)
[2024-01-18 03:02:41,158] INFO [FetchObjects],[PENDING],streamId=124 startOffset=5 endOffset=10 limit=2 (kafka.log.stream.s3.metadata.StreamMetadataManager)
[2024-01-18 03:02:41,158] INFO [FetchObjects],[PENDING],streamId=76 startOffset=11 endOffset=16 limit=2 (kafka.log.stream.s3.metadata.StreamMetadataManager)
[2024-01-18 03:02:41,658] INFO [FetchObjects],[PENDING],streamId=123 startOffset=5 endOffset=10 limit=2 (kafka.log.stream.s3.metadata.StreamMetadataManager)
[2024-01-18 03:02:41,658] INFO [FetchObjects],[PENDING],streamId=105 startOffset=17 endOffset=22 limit=2 (kafka.log.stream.s3.metadata.StreamMetadataManager)
[2024-01-18 03:02:41,658] INFO [FetchObjects],[PENDING],streamId=99 startOffset=5 endOffset=10 limit=2 (kafka.log.stream.s3.metadata.StreamMetadataManager)
[2024-01-18 03:02:41,658] INFO [FetchObjects],[PENDING],streamId=64 startOffset=29 endOffset=34 limit=2 (kafka.log.stream.s3.metadata.StreamMetadataManager)

What should have happened instead?

How to reproduce the issue?

Additional information

Please attach any relevant logs, backtraces, or metric charts.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions