Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use seek to manually manage offset, consumer lose some messages #1443

Closed
zyi1992 opened this issue Oct 11, 2022 · 9 comments
Closed

Use seek to manually manage offset, consumer lose some messages #1443

zyi1992 opened this issue Oct 11, 2022 · 9 comments

Comments

@zyi1992
Copy link

zyi1992 commented Oct 11, 2022

I have a script to test for at least one consume
the producer

import json
import random
import time

from confluent_kafka import Producer
import config

p = Producer({'bootstrap.servers':','.join(config.KAFKA_HOST),})
total_count = 0
c = 0
try:
    for i in range(20000):
        num = random.randint(1, 1000000)
        total_count += num
        a = {'t': num, 'time': time.time()}
        p.produce('test-topic-vv', json.dumps(a))
        c += 1
        if c %100 == 0:
            p.flush()
finally:
    p.flush()

the consumer

import json
import random
import sys

from confluent_kafka import Consumer, TopicPartition
import config
c = Consumer({
    'bootstrap.servers':','.join(config.KAFKA_HOST),
    'group.id': 'test-topic-consumer-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.offset.store': False,
    'enable.auto.commit': True,
})
topic = 'test-topic-vv'

def test_for_seek():
    try:
        pp = []
        pp.append(TopicPartition(topic, partition=8))
        c.assign(pp)
        while True:
            msgs = c.consume(num_messages=10, timeout=10)
            if not msgs:
                print('no data and wait')
                for i in c.assignment():
                    print(i.topic, i.partition, i.offset, c.get_watermark_offsets(i))
                continue
            for msg in msgs:
                t1 = msg.partition()
                o1 = msg.offset()
                print('Received message: {} par {} offset {}'.format(msg.value().decode('utf-8'), t1, o1))
            break
    finally:
        c.close()

def test_for_run():
    try:
        c.subscribe([topic])
        total_count = 0
        map_par = {}
        while True:
            msgs = c.consume(num_messages=10, timeout=5)
            if not msgs:
                print('no data and wait')
                for i in c.assignment():
                    print(i.topic, i.partition, i.offset, c.get_watermark_offsets(i))
                continue
            deald = []
            for msg in msgs:
                t1 = msg.partition()
                o1 = msg.offset()
                print('Received message: {} par {} offset {}'.format(msg.value().decode('utf-8'), t1, o1))
                if random.randint(1, 100) == 9:
                    # test for deal failed then retry again
                    print('deal failed will retry msg offset {} partition {}'.format(msg.offset(), msg.partition()))
                    break
                else:
                    total_count += json.loads(msg.value())['t']
                    # test for deal success
                    if t1 in map_par:
                        if map_par[t1] + 1 != o1:
                            raise Exception('deal partition {} except last offset {} current offset {}'.format(t1, map_par[t1], o1))
                    map_par[t1] = o1
                    c.store_offsets(msg)
                    deald.append(msg)
            group_partition = {}
            for msg in msgs:
                if msg in deald:
                    continue
                partition = msg.partition()
                offset = msg.offset()
                if partition in group_partition:
                    group_partition[partition] = min(group_partition[partition], offset)
                else:
                    group_partition[partition] = offset
            # seek to deal failed partition offset
            for k, v in group_partition.items():
                c.seek(TopicPartition(topic, partition=k, offset=v))
                print('deal failed will set msg offset {} partition {}'.format(v, k))
    finally:
        c.close()

if sys.argv[1] == 'test_for_seek':
    test_for_seek()
else:
    test_for_run()

the topic test-topic-vv has 9 partition

first i run producer to add some message to topic then consume it. but i got a exception
image

The latest message's offset of partition 8 should be 7382 but got 7391

then i run test_for_seek to check the consumer group's actually record offset it was 7382 indeed
image

I also check the broker's group offset record
image

it also was 7382

So what happened to consumer when use seek to manage offset, hope any one can help me to deal with the problem.

check information

  • confluent_kafka.version()==1.9.2
  • confluent_kafka.libversion()==1.9.2
  • Operating system: ubuntu 16.04
  • Python3.8
  • kafka 2.11-1.1.1
@mhowlett mhowlett added question investigate further It's unclear what the issue is at this time but there is enough interest to look into it MEDIUM labels Oct 24, 2022
@pranavrth
Copy link
Member

Hey zyi1992,

With your code, I am able to reproduce this issue.

I have tried multiple different settings to run your code. I didn't face issue when I use num_messages=1. I would suggest you to use the same property in order to avoid the issue till the time we don't reach a conclusion for the issue. Performance would be similar to what is right now as python client prefetches the messages in order to serve the messages ASAP.

@zyi1992
Copy link
Author

zyi1992 commented Oct 31, 2022

@pranavrth The num_messages=1 or use poll working good for me. Hope the batch consume can be repaired ASAP to get better performance.

@edenhill
Copy link
Contributor

Have you actually observed better performance with num_messages > 1?
I don't think there should be much of a difference.

@zyi1992
Copy link
Author

zyi1992 commented Nov 1, 2022

@edenhill I want to obtain msg in batches and process relevant business logic concurrently. With num_messages=1, the concurrent number will be limited by the number of partitions.

@pranavrth pranavrth added bug HIGH librdkafka and removed investigate further It's unclear what the issue is at this time but there is enough interest to look into it MEDIUM labels Nov 8, 2022
@keithks
Copy link

keithks commented Nov 29, 2022

@edenhill In our use cases we also want to batch consume up to k messages, and then time slice to do something else less important. We can mitigate by simulating this behaviour but it ends up being quite messy.

@edenhill
Copy link
Contributor

The problem with batch consumtion is that messages may be outdated by the time you get to process them;
this is also somewhat true for the non-batched approach, but you will at most process one outdated message before finding out that there's been a rebalance, revokal, etc.

@keithks
Copy link

keithks commented Nov 29, 2022

Thanks, that's a fair point. Although in our use case we only have 1 consumer per consumer group so rebalancing would not affect us much. It seems the bug would still affect this scenario from the repro.

@pranavrth
Copy link
Member

Fixed this particular issue as part of the PR -> confluentinc/librdkafka#4117

It will be available in the upcoming release.

@pranavrth
Copy link
Member

The issue that you were facing is fixed in Python v2.0.2 release.

There are other issues as well with batch consumer API. We have highlighted those problems in the release notes of librdkafka (Known issues column of Librdkafka v2.0.0 . We will be tackling those issues in near future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants