Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IllegalStateError causes consumer crash when commit is pending and a rebalance happens #94

Open
2 tasks done
bobh66 opened this issue Feb 4, 2021 · 0 comments
Open
2 tasks done

Comments

@bobh66
Copy link
Contributor

bobh66 commented Feb 4, 2021

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

When the Consumer has requested a commit on one or more partitions and a rebalance is triggered before the aiokafka consumer can perform the commit, it is possible that one or more of the partitions being committed is revoked by the rebalance and not re-assigned. This will cause an IllegalStateError exception from the kafka-python client code due to the attempt to commit a partition that isn't owned by the consumer.

The fix is to filter the commit partitions again in the aiokafka consumer commit code to ensure that they were not revoked during the await break.

Expected behavior

Commits after a rebalance should not cause an IllegalStateError exception

Actual behavior

Intermittent IllegalStateError exceptions after a rebalance

Full traceback

[2021-01-21 14:59:16,201] [9] [ERROR] [^---AIOKafkaConsumerThread]: Got exception: IllegalStateError("Partition TopicPartition(topic='spe_kxe_5', partition=40) is not assigned")
Current assignment: '\n+Topic Partition Set-------+\n| topic     | partitions   |\n+-----------+--------------+\n| spe_kxe_5 | {25, 52, 67} |\n+-----------+--------------+'
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 523, in _commit
    await consumer.commit(aiokafka_offsets)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 548, in commit
    "Partition {} is not assigned".format(tp))
kafka.errors.IllegalStateError: IllegalStateError: Partition TopicPartition(topic='spe_kxe_5', partition=40) is not assigned

Versions

  • Python version 3.7
  • Faust version 0.4.6
  • Operating system - CentOS
  • Kafka version N/A
  • RocksDB version (if applicable) N/A
bobh66 added a commit to bobh66/faust-1 that referenced this issue Feb 4, 2021
bobh66 added a commit to bobh66/faust-1 that referenced this issue Feb 4, 2021
patkivikram pushed a commit that referenced this issue Feb 5, 2021
* fix partition assignment issues (#93) and commit exceptions (#94)

* fix partition assignment issues (#93) and commit exceptions (#94)

* fix black formatting

* fix unused variable in test

* fix unit test

* fix unused import

* revert partition assignor changes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant