Skip to content

Commit

Permalink
Fixed xfailed test
Browse files Browse the repository at this point in the history
  • Loading branch information
tvoinarovskyi committed Feb 13, 2017
1 parent 9d7e741 commit 6fb2f9e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
1 change: 0 additions & 1 deletion aiokafka/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,6 @@ class CoordinatorGroupRebalance:
* rejoin_needed
and call methods:
* _perform_assignment
* _on_join_complete
* coordinator_dead
* coordinator_unknown
Expand Down
4 changes: 3 additions & 1 deletion examples/simple_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

async def consume():
consumer = AIOKafkaConsumer(
"my_topic", loop=loop, bootstrap_servers='localhost:9092')
"my_topic", loop=loop, bootstrap_servers='localhost:9092',
group_id="some_group")
# Get cluster layout and topic/partition allocation
await consumer.start()
consumer.subscribe(topics=("my_topic2", ))
try:
async for msg in consumer:
print(msg.value)
Expand Down
20 changes: 14 additions & 6 deletions tests/test_coordinator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import pytest
from unittest import mock

from kafka.protocol.group import JoinGroupRequest_v0 as JoinGroupRequest
Expand Down Expand Up @@ -170,7 +169,6 @@ def do_rebalance():
self.assertEqual(coordinator.coordinator_id, None)
yield from client.close()

@pytest.mark.xfail
@run_until_complete
def test_failed_sync_group(self):
client = AIOKafkaClient(loop=self.loop, bootstrap_servers=self.hosts)
Expand All @@ -180,8 +178,18 @@ def test_failed_sync_group(self):
client, subscription, loop=self.loop,
heartbeat_interval_ms=20000)

@asyncio.coroutine
def do_sync_group():
rebalance = CoordinatorGroupRebalance(
coordinator, coordinator.group_id, coordinator.coordinator_id,
subscription.subscription, coordinator._assignors,
coordinator._session_timeout_ms,
coordinator._retry_backoff_ms,
loop=self.loop)
yield from rebalance._on_join_follower()

with self.assertRaises(GroupCoordinatorNotAvailableError):
yield from coordinator._on_join_follower()
yield from do_sync_group()

mocked = mock.MagicMock()
coordinator._client = mocked
Expand All @@ -190,19 +198,19 @@ def test_failed_sync_group(self):
coordinator.coordinator_unknown = asyncio.coroutine(lambda: False)
mocked.send.side_effect = Errors.UnknownMemberIdError()
with self.assertRaises(Errors.UnknownMemberIdError):
yield from coordinator._on_join_follower()
yield from do_sync_group()
self.assertEqual(
coordinator.member_id, JoinGroupRequest.UNKNOWN_MEMBER_ID)

mocked.send.side_effect = Errors.NotCoordinatorForGroupError()
coordinator.coordinator_id = 'some_id'
with self.assertRaises(Errors.NotCoordinatorForGroupError):
yield from coordinator._on_join_follower()
yield from do_sync_group()
self.assertEqual(coordinator.coordinator_id, None)

mocked.send.side_effect = KafkaError()
with self.assertRaises(KafkaError):
yield from coordinator._on_join_follower()
yield from do_sync_group()

# client sends LeaveGroupRequest to group coordinator
# if generation > 0 (means that client is a member of group)
Expand Down

0 comments on commit 6fb2f9e

Please sign in to comment.