Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't fail GroupCoordinator._on_join_prepare() if commit_offset() throws exception #230

Conversation

shargan
Copy link
Contributor

@shargan shargan commented Sep 28, 2017

As it currently stands, a Consumer whose membership in a group expires (whether due to message processing that exceeds the session_timeout_ms or a brief network interruption) enters an unrecoverable state. Every cycle of the heartbeat routine calls ensure_active_group(), which attempts to commit outstanding offsets before taking any further action. If membership has expired, however, interacting with the coordinator will throw an UnknownMemberIdError exception and the process will begin again on the next heartbeat cycle.

To illustrate, start a Kafka broker with group.min.session.timeout.ms set to a very low value. Populate some data into the test topic and then run the following code (#229 is required to set the session timeout properly):

import aiokafka
import asyncio
import logging
import sys
import time

logging.basicConfig(level=logging.INFO, stream=sys.stderr)


async def doit(loop):
    consumer = aiokafka.AIOKafkaConsumer(
        loop=loop,
        group_id="test",
        session_timeout_ms=100,
        heartbeat_interval_ms=33,
    )
    consumer.subscribe("test")
    await consumer.start()

    print("\n### Sleeping to induce error\n")
    time.sleep(1)

    await consumer.getmany()
    await asyncio.sleep(1)
    await consumer.stop()

loop = asyncio.get_event_loop()
loop.run_until_complete(doit(loop))
loop.close()

The result:

INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ['test']
INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 1001 for group test
INFO:aiokafka.consumer.group_coordinator:Revoking previously assigned partitions set() for group test
INFO:aiokafka.consumer.group_coordinator:(Re-)joining group test
INFO:aiokafka.consumer.group_coordinator:Joined group 'test' (generation 1) with member_id aiokafka-0.3.2.dev-4470f1dc-e352-4ce5-97eb-2abc16a99414
INFO:aiokafka.consumer.group_coordinator:Elected group leader -- performing partition assignments using roundrobin
INFO:aiokafka.consumer.group_coordinator:Successfully synced group test with generation 1
INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='test', partition=0)]
INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='test', partition=0)} for group test

### Sleeping to induce error

WARNING:aiokafka.consumer.group_coordinator:Heartbeat failed: local member_id was not recognized; resetting and re-joining group
ERROR:aiokafka.consumer.group_coordinator:Heartbeat session expired - marking coordinator dead
WARNING:aiokafka.consumer.group_coordinator:Marking the coordinator dead (node 1001)for group test: None.
INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 1001 for group test
ERROR:aiokafka.consumer.group_coordinator:OffsetCommit failed for group test due to group error ([Error 25] UnknownMemberIdError: test), will rejoin
ERROR:aiokafka.consumer.group_coordinator:Skipping heartbeat: no active group: UnknownMemberIdError('test',)
ERROR:aiokafka.consumer.group_coordinator:OffsetCommit failed for group test due to group error ([Error 25] UnknownMemberIdError: test), will rejoin
ERROR:aiokafka.consumer.group_coordinator:Skipping heartbeat: no active group: UnknownMemberIdError('test',)
ERROR:aiokafka.consumer.group_coordinator:OffsetCommit failed for group test due to group error ([Error 25] UnknownMemberIdError: test), will rejoin
ERROR:aiokafka.consumer.group_coordinator:Skipping heartbeat: no active group: UnknownMemberIdError('test',)
ERROR:aiokafka.consumer.group_coordinator:OffsetCommit failed for group test due to group error ([Error 25] UnknownMemberIdError: test), will rejoin
ERROR:aiokafka.consumer.group_coordinator:Skipping heartbeat: no active group: UnknownMemberIdError('test',)
ERROR:aiokafka.consumer.group_coordinator:OffsetCommit failed for group test due to group error ([Error 25] UnknownMemberIdError: test), will rejoin
ERROR:aiokafka.consumer.group_coordinator:Skipping heartbeat: no active group: UnknownMemberIdError('test',)
ERROR:aiokafka.consumer.group_coordinator:OffsetCommit failed for group test due to group error ([Error 25] UnknownMemberIdError: test), will rejoin
ERROR:aiokafka.consumer.group_coordinator:Skipping heartbeat: no active group: UnknownMemberIdError('test',)
ERROR:aiokafka.consumer.group_coordinator:OffsetCommit failed for group test due to group error ([Error 25] UnknownMemberIdError: test), will rejoin
ERROR:aiokafka.consumer.group_coordinator:Skipping heartbeat: no active group: UnknownMemberIdError('test',)
ERROR:aiokafka.consumer.group_coordinator:OffsetCommit failed for group test due to group error ([Error 25] UnknownMemberIdError: test), will rejoin
ERROR:aiokafka.consumer.group_coordinator:Skipping heartbeat: no active group: UnknownMemberIdError('test',)
ERROR:aiokafka.consumer.group_coordinator:OffsetCommit failed for group test due to group error ([Error 25] UnknownMemberIdError: test), will rejoin
ERROR:aiokafka.consumer.group_coordinator:Skipping heartbeat: no active group: UnknownMemberIdError('test',)
ERROR:aiokafka.consumer.group_coordinator:OffsetCommit failed for group test due to group error ([Error 25] UnknownMemberIdError: test), will rejoin
WARNING:aiokafka.consumer.group_coordinator:Auto offset commit failed: [Error 25] UnknownMemberIdError: test
ERROR:aiokafka.consumer.group_coordinator:LeaveGroup request failed: [Error 25] UnknownMemberIdError

This patch fixes the issue by ignoring exceptions from _maybe_auto_commit_offsets_sync() in _on_join_prepare()

…ows an exception (e.g., when membership in the group has expired)
@codecov
Copy link

codecov bot commented Sep 28, 2017

Codecov Report

Merging #230 into master will increase coverage by <.01%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #230      +/-   ##
==========================================
+ Coverage   96.82%   96.82%   +<.01%     
==========================================
  Files          17       17              
  Lines        2329     2332       +3     
==========================================
+ Hits         2255     2258       +3     
  Misses         74       74
Impacted Files Coverage Δ
aiokafka/consumer/group_coordinator.py 94.18% <100%> (+0.03%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b3cf633...1949440. Read the comment docs.

@codecov
Copy link

codecov bot commented Sep 28, 2017

Codecov Report

Merging #230 into master will increase coverage by <.01%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #230      +/-   ##
==========================================
+ Coverage   96.82%   96.82%   +<.01%     
==========================================
  Files          17       17              
  Lines        2329     2332       +3     
==========================================
+ Hits         2255     2258       +3     
  Misses         74       74
Impacted Files Coverage Δ
aiokafka/consumer/group_coordinator.py 94.18% <100%> (+0.03%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b3cf633...1949440. Read the comment docs.

@tvoinarovskyi
Copy link
Member

Ok, great thanks. LGTM 👍

@tvoinarovskyi tvoinarovskyi merged commit 77d5bcf into aio-libs:master Sep 28, 2017
@shargan shargan deleted the shargan/fix-broken-membership-expiration branch September 28, 2017 18:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants