Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed logging messages according to KAFKA-3318 #110

Merged
merged 1 commit into from
Feb 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def _metadata_update(self, cluster_metadata, topics):

if conn is None:
continue
log.debug("Sending metadata request %s to %s",
log.debug("Sending metadata request %s to node %s",
metadata_request, node_id)

try:
Expand Down
8 changes: 4 additions & 4 deletions aiokafka/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,12 +586,12 @@ def _proc_offset_request(self, partition, timestamp):
return offset
elif error_type in (Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError):
log.warning("Attempt to fetch offsets for partition %s failed due"
" to obsolete leadership information, retrying.",
partition)
log.debug("Attempt to fetch offsets for partition %s failed due"
" to obsolete leadership information, retrying.",
partition)
raise error_type(partition)
else:
log.error(
log.warning(
"Attempt to fetch offsets for partition %s failed due to:"
" %s", partition, error_type)
raise error_type(partition)
Expand Down
120 changes: 70 additions & 50 deletions aiokafka/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class GroupCoordinator(object):
https://github.com/apache/kafka/blob/0.10.1.1/clients/src/main/java/\
org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
"""

def __init__(self, client, subscription, *, loop,
group_id='aiokafka-default-group',
session_timeout_ms=30000, heartbeat_interval_ms=3000,
Expand Down Expand Up @@ -227,15 +228,16 @@ def _on_join_prepare(self, generation, member_id):
yield from self._maybe_auto_commit_offsets_sync()

# execute the user's callback before rebalance
log.debug("Revoking previously assigned partitions %s",
self._subscription.assigned_partitions())
log.info("Revoking previously assigned partitions %s for group %s",
self._subscription.assigned_partitions(), self.group_id)
if self._subscription.listener:
try:
revoked = set(self._subscription.assigned_partitions())
self._subscription.listener.on_partitions_revoked(revoked)
except Exception:
log.exception("User provided subscription listener failed"
" on_partitions_revoked")
log.exception("User provided subscription listener %s"
" for group %s failed on_partitions_revoked",
self._subscription.listener, self.group_id)

def _perform_assignment(self, leader_id, assignment_strategy, members):
assignor = self._lookup_assignor(assignment_strategy)
Expand All @@ -254,11 +256,13 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
self._subscription.group_subscribe(all_subscribed_topics)
self._client.set_topics(self._subscription.group_subscription())

log.debug("Performing %s assignment for subscriptions %s",
assignor.name, member_metadata)
log.debug("Performing assignment for group %s using strategy %s"
" with subscriptions %s", self.group_id, assignor.name,
member_metadata)

assignments = assignor.assign(self._cluster, member_metadata)
log.debug("Finished assignment: %s", assignments)
log.debug("Finished assignment for group %s: %s",
self.group_id, assignments)

group_assignment = {}
for member_id, assignment in assignments.items():
Expand All @@ -284,15 +288,18 @@ def _on_join_complete(self, generation, member_id, protocol,
assignor.on_assignment(assignment)

assigned = set(self._subscription.assigned_partitions())
log.debug("Set newly assigned partitions %s", assigned)
log.info("Setting newly assigned partitions %s for group %s",
assigned, self.group_id)

# execute the user's callback after rebalance
if self._subscription.listener:
try:
self._subscription.listener.on_partitions_assigned(assigned)
except Exception:
log.exception("User provided listener failed on partition"
" assignment: %s", assigned)
log.exception("User provided listener %s for group %s"
" failed on partition assignment: %s",
self._subscription.listener, self.group_id,
assigned)

if self._group_rebalanced_callback:
self._group_rebalanced_callback()
Expand Down Expand Up @@ -429,8 +436,8 @@ def commit_offsets(self, offsets):
[(topic, tp_offsets) for topic, tp_offsets in offset_data.items()]
)

log.debug(
"Sending offset-commit request with %s to %s", offsets, node_id)
log.debug("Sending offset-commit request with %s for group %s to %s",
offsets, self.group_id, node_id)

response = yield from self._send_req(node_id, request)

Expand Down Expand Up @@ -562,7 +569,9 @@ def ensure_coordinator_known(self):
if node_id is None or not (yield from self._client.ready(node_id)):
raise Errors.NoBrokersAvailable()

log.debug("Issuing group metadata request to broker %s", node_id)
log.debug(
"Sending group coordinator request for group %s to broker %s",
self.group_id, node_id)
request = GroupCoordinatorRequest(self.group_id)
try:
resp = yield from self._send_req(node_id, request)
Expand All @@ -573,12 +582,14 @@ def ensure_coordinator_known(self):
if err.retriable is True:
yield from self._client.force_metadata_update()
else:
log.debug("Group metadata response %s", resp)
log.debug("Received group coordinator response %s", resp)
if not (yield from self.coordinator_unknown()):
# We already found the coordinator, so ignore the response
log.debug("Coordinator already known, ignoring response")
break
self.coordinator_id = resp.coordinator_id
log.info("Discovered coordinator %s for group %s",
self.coordinator_id, self.group_id)

def need_rejoin(self):
"""Check whether the group should be rejoined
Expand Down Expand Up @@ -630,9 +641,12 @@ def ensure_active_group(self):
def coordinator_dead(self, error=None):
"""Mark the current coordinator as dead."""
if self.coordinator_id is not None:
log.info("Marking the coordinator dead (node %s): %s.",
self.coordinator_id, error)
log.warning(
"Marking the coordinator dead (node %s)for group %s: %s.",
self.coordinator_id, self.group_id, error)
self.coordinator_id = None
# TODO: Coordinator's state is stored in Zookeeper. Even if it goes
# down the group might not break if a new one is elected fast
self.rejoin_needed = True

@asyncio.coroutine
Expand All @@ -652,7 +666,7 @@ def _heartbeat_task_routine(self):
try:
yield from self.ensure_active_group()
except Errors.KafkaError as err:
log.error("Skipping heartbeat: no active group: %s", err)
log.error("Skipping heartbeat: no active group: %r", err)
last_ok_heartbeat = self.loop.time()
sleep_time = retry_backoff_time
continue
Expand All @@ -662,34 +676,37 @@ def _heartbeat_task_routine(self):
self.group_id, self.generation, self.member_id)
log.debug("Heartbeat: %s[%s] %s",
self.group_id, self.generation, self.member_id)

try:
yield from self._send_req(self.coordinator_id, request)
except (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
log.info(
"Heartbeat failed: coordinator is either not started or"
" not valid; will refresh metadata and retry")
log.warning(
"Heartbeat failed for group %s: coordinator (node %s)"
" is either not started or not valid",
self.group_id, self.coordinator_id)
self.coordinator_dead()
except Errors.RebalanceInProgressError:
log.info(
"Heartbeat failed: group is rebalancing; re-joining group")
log.warning(
"Heartbeat failed for group %s because it is rebalancing",
self.group_id)
self.rejoin_needed = True
except Errors.IllegalGenerationError:
log.info(
"Heartbeat failed: local generation id is not current;"
" re-joining group")
log.warning(
"Heartbeat failed for group %s: generation id is not "
" current.", self.group_id)
self.rejoin_needed = True
except Errors.UnknownMemberIdError:
log.info(
log.warning(
"Heartbeat failed: local member_id was not recognized;"
" resetting and re-joining group")
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
self.rejoin_needed = True
except Errors.KafkaError as err:
log.error("Heartbeat failed: %s", err)
else:
log.debug("Received successful heartbeat response.")
log.debug(
"Received successful heartbeat response for group %s",
self.group_id)
last_ok_heartbeat = self.loop.time()

if self.rejoin_needed:
Expand Down Expand Up @@ -776,21 +793,23 @@ def perform_group_join(self):
response = yield from self._coordinator._send_req(
self.coordinator_id, request)
except Errors.GroupLoadInProgressError:
log.debug("Attempt to join group %s rejected since coordinator is"
" loading the group.", self.group_id)
log.debug("Attempt to join group %s rejected since coordinator %s"
" is loading the group.", self.group_id,
self.coordinator_id)
except Errors.UnknownMemberIdError:
# reset the member id and retry immediately
self._coordinator.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
log.info(
"Attempt to join group %s failed due to unknown member id,"
" resetting and retrying.", self.group_id)
log.debug(
"Attempt to join group %s failed due to unknown member id",
self.group_id)
return
except (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
Errors.NotCoordinatorForGroupError) as err:
# re-discover the coordinator and retry with backoff
self._coordinator.coordinator_dead()
log.info("Attempt to join group %s failed due to obsolete "
"coordinator information, retrying.", self.group_id)
log.debug("Attempt to join group %s failed due to obsolete "
"coordinator information: %s", self.group_id,
err)
except Errors.KafkaError as err:
log.error(
"Error in join group '%s' response: %s", self.group_id, err)
Expand Down Expand Up @@ -838,8 +857,9 @@ def _on_join_follower(self):
self._coordinator.generation,
self._coordinator.member_id,
{})
log.debug("Issuing follower SyncGroup (%s) to coordinator %s",
request, self.coordinator_id)
log.debug(
"Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
return (yield from self._send_sync_group_request(request))

@asyncio.coroutine
Expand Down Expand Up @@ -874,8 +894,9 @@ def _on_join_leader(self, response):
self._coordinator.member_id,
assignment_req)

log.debug("Issuing leader SyncGroup (%s) to coordinator %s",
request, self.coordinator_id)
log.debug(
"Sending leader SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
return (yield from self._send_sync_group_request(request))

@asyncio.coroutine
Expand All @@ -887,28 +908,27 @@ def _send_sync_group_request(self, request):
try:
response = yield from self._coordinator._send_req(
self.coordinator_id, request)
log.debug(
"Received successful sync group response for group %s: %s",
self.group_id, response)
log.info("Successfully synced group %s with generation %s",
self.group_id, self._coordinator.generation)
return response.member_assignment
except Errors.RebalanceInProgressError as err:
log.info("SyncGroup for group %s failed due to coordinator"
" rebalance, rejoining the group", self.group_id)
log.debug("SyncGroup for group %s failed due to coordinator"
" rebalance", self.group_id)
raise err
except (Errors.UnknownMemberIdError,
Errors.IllegalGenerationError) as err:
log.info("SyncGroup for group %s failed due to %s,"
" rejoining the group", self.group_id, err)
log.debug("SyncGroup for group %s failed due to %s,",
self.group_id, err)
self._coordinator.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
raise err
except (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError) as err:
log.info("SyncGroup for group %s failed due to %s, will find new"
" coordinator and rejoin", self.group_id, err)
log.debug("SyncGroup for group %s failed due to %s",
self.group_id, err)
self._coordinator.coordinator_dead()
raise err
except Errors.KafkaError as err:
log.error("Error from SyncGroup: %s", err)
log.error("Unexpected error from SyncGroup: %s", err)
raise err
finally:
if response is None:
Expand Down