Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@
log = logging.getLogger(__name__)


def time_metric(metric_name):
def decorator(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
start_time = time.time()
ret = fn(self, *args, **kwargs)

if self.metrics_responder:
self.metrics_responder(metric_name, time.time() - start_time)

return ret
return wrapper
return decorator


class KafkaClient(object):

CLIENT_ID = b'kafka-python'
Expand All @@ -28,12 +43,14 @@ class KafkaClient(object):
# socket timeout.
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
correlation_id=0):
correlation_id=0,
metrics_responder=None):
# We need one connection to bootstrap
self.client_id = kafka_bytestring(client_id)
self.timeout = timeout
self.hosts = collect_hosts(hosts)
self.correlation_id = correlation_id
self.metrics_responder = metrics_responder

# create connections only when we need them
self.conns = {}
Expand All @@ -48,6 +65,7 @@ def __init__(self, hosts, client_id=CLIENT_ID,
# Private API #
##################


def _get_conn(self, host, port):
"""Get or create a connection to a broker using host and port"""
host_key = (host, port)
Expand Down Expand Up @@ -422,13 +440,15 @@ def load_metadata_for_topics(self, *topics):
leader, None, None
)

@time_metric('metadata_request_timer')
def send_metadata_request(self, payloads=[], fail_on_error=True,
callback=None):
encoder = KafkaProtocol.encode_metadata_request
decoder = KafkaProtocol.decode_metadata_response

return self._send_broker_unaware_request(payloads, encoder, decoder)

@time_metric('produce_request_timer')
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
Expand Down Expand Up @@ -479,6 +499,7 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,
if resp is not None and
(not fail_on_error or not self._raise_on_response_error(resp))]

@time_metric('fetch_request_timer')
def send_fetch_request(self, payloads=[], fail_on_error=True,
callback=None, max_wait_time=100, min_bytes=4096):
"""
Expand All @@ -499,6 +520,7 @@ def send_fetch_request(self, payloads=[], fail_on_error=True,
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]

@time_metric('offset_request_timer')
def send_offset_request(self, payloads=[], fail_on_error=True,
callback=None):
resps = self._send_broker_aware_request(
Expand All @@ -509,6 +531,7 @@ def send_offset_request(self, payloads=[], fail_on_error=True,
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]

@time_metric('offset_commit_request_timer')
def send_offset_commit_request(self, group, payloads=[],
fail_on_error=True, callback=None):
encoder = functools.partial(KafkaProtocol.encode_offset_commit_request,
Expand All @@ -519,6 +542,7 @@ def send_offset_commit_request(self, group, payloads=[],
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]

@time_metric('offset_fetch_request_timer')
def send_offset_fetch_request(self, group, payloads=[],
fail_on_error=True, callback=None):

Expand Down
19 changes: 18 additions & 1 deletion kafka/consumer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
'auto_commit_interval_messages': None,
'consumer_timeout_ms': -1,

'metrics_responder': None,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the upstream client configuration here is metric.reporters. I try to stay consistent, so perhaps metric_reporters and support a list of reporter interfaces?


# Currently unused
'socket_receive_buffer_bytes': 64 * 1024,
'num_consumer_fetchers': 1,
Expand Down Expand Up @@ -135,10 +137,13 @@ def configure(self, **configs):
'bootstrap_servers required to configure KafkaConsumer'
)

self.metrics_responder = self._config['metrics_responder']
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly would like to name this metric_reporters and make it a list


self._client = KafkaClient(
self._config['bootstrap_servers'],
client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0)
timeout=(self._config['socket_timeout_ms'] / 1000.0),
metrics_responder=self.metrics_responder
)

def set_topic_partitions(self, *topics):
Expand Down Expand Up @@ -344,6 +349,9 @@ def fetch_messages(self):
for resp in responses:

if isinstance(resp, FailedPayloadsError):
if self.metrics_responder:
self.metrics_responder('failed_payloads_count', 1)

logger.warning('FailedPayloadsError attempting to fetch data')
self._refresh_metadata_on_error()
continue
Expand All @@ -353,6 +361,9 @@ def fetch_messages(self):
try:
check_error(resp)
except OffsetOutOfRangeError:
if self.metrics_responder:
self.metrics_responder('offset_out_of_range_count', 1)

logger.warning('OffsetOutOfRange: topic %s, partition %d, '
'offset %d (Highwatermark: %d)',
topic, partition,
Expand All @@ -365,13 +376,19 @@ def fetch_messages(self):
continue

except NotLeaderForPartitionError:
if self.metrics_responder:
self.metrics_responder('not_leader_for_partition_count', 1)

logger.warning("NotLeaderForPartitionError for %s - %d. "
"Metadata may be out of date",
topic, partition)
self._refresh_metadata_on_error()
continue

except RequestTimedOutError:
if self.metrics_responder:
self.metrics_responder('request_timed_out_count', 1)

logger.warning("RequestTimedOutError for %s - %d",
topic, partition)
continue
Expand Down