Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable_auto_commit=False only works after commiting consumer offset at least once #2269

Open
Vasco27 opened this issue Oct 12, 2021 · 0 comments

Comments

@Vasco27
Copy link

Vasco27 commented Oct 12, 2021

Problem statement

When enable_auto_commit=False, consumer.close(autocommit=False) and with no consumer.commit(), messages are not re-consumed when the program is restarted. It only works after we commit the offset to the topic at least once.

First Execution:

Producer.py:

producer = KafkaProducer(bootstrap_servers=Config.KAFKA_BROKER_ENDPOINT)
producer.send('hems.energy-prices', json.dumps(value).encode())
producer.flush()
producer.close()

Consumer.py:

consumer = KafkaConsumer('hems.energy-prices', group_id="energyPricesGroup", bootstrap_servers=Config.KAFKA_BROKER_ENDPOINT, consumer_timeout_ms=100, enable_auto_commit=False)

# Consume events until the program receives an exit signal
while not exitEvent.wait(timeout=0.01):
  try:
    msg = next(consumer)
    event = json.loads(msg.value)
    processEvent(event)
    # consumer.commit() # Consumer not committing, should keep receiving the same events after restart
  except StopIteration:
    pass

consumer.close(autocommit=False)

I successfully received the event and processed it. Since I am not committing the offset, I should receive the same event after restarting the program. However, in the second execution below, this is not happening.

Second Execution:

Consumer.py:

consumer = KafkaConsumer('hems.energy-prices', group_id="energyPricesGroup", bootstrap_servers=Config.KAFKA_BROKER_ENDPOINT, consumer_timeout_ms=100, enable_auto_commit=False)

# Consume events until the program receives an exit signal
while not exitEvent.wait(timeout=0.01):
  try:
    msg = next(consumer)
    event = json.loads(msg.value)
    processEvent(event)
    # consumer.commit() # Consumer not committing, should keep receiving the same events after restart
  except StopIteration:
    pass

consumer.close(autocommit=False)

In this execution, I did not receive the event. However, I was supposed to receive it because I did not commit the consumer offset.

Workaround:

The workaround we found was to commit the offset to the topic at least once.

First Execution:

Producer.py:

producer = KafkaProducer(bootstrap_servers=Config.KAFKA_BROKER_ENDPOINT)
producer.send('hems.energy-prices', json.dumps(value).encode())
producer.flush()
producer.close()

In the first execution, we will commit the offset to the topic.

Consumer.py:

consumer = KafkaConsumer('hems.energy-prices', group_id="energyPricesGroup", bootstrap_servers=Config.KAFKA_BROKER_ENDPOINT, consumer_timeout_ms=100, enable_auto_commit=False)

# Consume events until the program receives an exit signal
while not exitEvent.wait(timeout=0.01):
  try:
    msg = next(consumer)
    event = json.loads(msg.value)
    processEvent(event)
    consumer.commit()
  except StopIteration:
    pass

consumer.close(autocommit=False)

Second Execution:

Consumer.py:

consumer = KafkaConsumer('hems.energy-prices', group_id="energyPricesGroup", bootstrap_servers=Config.KAFKA_BROKER_ENDPOINT, consumer_timeout_ms=100, enable_auto_commit=False)

# Consume events until the program receives an exit signal
while not exitEvent.wait(timeout=0.01):
  try:
    msg = next(consumer)
    event = json.loads(msg.value)
    processEvent(event)
    # consumer.commit() # Consumer not committing
  except StopIteration:
    pass

consumer.close(autocommit=False)

We did not receive the event, because we already committed the consumer offset. This is working as expected.

Third Execution:

Producer.py:

producer = KafkaProducer(bootstrap_servers=Config.KAFKA_BROKER_ENDPOINT)
producer.send('hems.energy-prices', json.dumps(value).encode())
producer.flush()
producer.close()

Consumer.py:

consumer = KafkaConsumer('hems.energy-prices', group_id="energyPricesGroup", bootstrap_servers=Config.KAFKA_BROKER_ENDPOINT, consumer_timeout_ms=100, enable_auto_commit=False)

# Consume events until the program receives an exit signal
while not exitEvent.wait(timeout=0.01):
  try:
    msg = next(consumer)
    event = json.loads(msg.value)
    processEvent(event)
    # consumer.commit() # Consumer not committing
  except StopIteration:
    pass

consumer.close(autocommit=False)

As expected, I received the new event.

Fourth Execution:

Consumer.py:

consumer = KafkaConsumer('hems.energy-prices', group_id="energyPricesGroup", bootstrap_servers=Config.KAFKA_BROKER_ENDPOINT, consumer_timeout_ms=100, enable_auto_commit=False)

# Consume events until the program receives an exit signal
while not exitEvent.wait(timeout=0.01):
  try:
    msg = next(consumer)
    event = json.loads(msg.value)
    processEvent(event)
    # consumer.commit() # Consumer not committing
  except StopIteration:
    pass

consumer.close(autocommit=False)

Here, I received the same event, because I did not commit. In my understanding, this is the correct behavior. If I make a fifth execution of the consumer, I will receive the same event again, as expected.

Does anyone know how/why this is happening? And is there a way to overcome this problem, without having to make a first commit to the topic?

My goal is not to consume all the messages. It is to keep the integrity of the events. By this I mean if the program crashes while processing an event it should receive and process the same event, until it is committed (event was fully processed).

Versions

kafka-python: 2.0.2
kafka docker image: debezium/kafka:1.6

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant