Skip to content

Commit e2b340c

Browse files
committed
instrument metrics for fetch requests
1 parent 81dc89a commit e2b340c

File tree

3 files changed

+95
-66
lines changed

3 files changed

+95
-66
lines changed

kafka/consumer/fetcher.py

Lines changed: 91 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
import collections
44
import copy
55
import logging
6+
import time
67

78
import six
89

910
import kafka.errors as Errors
1011
from kafka.future import Future
12+
from kafka.metrics.stats import Avg, Count, Max, Rate
1113
from kafka.protocol.fetch import FetchRequest
1214
from kafka.protocol.message import PartialMessage
1315
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
@@ -40,7 +42,8 @@ class Fetcher(six.Iterator):
4042
'api_version': (0, 8, 0),
4143
}
4244

43-
def __init__(self, client, subscriptions, **configs):
45+
def __init__(self, client, subscriptions, metrics, metric_group_prefix,
46+
**configs):
4447
"""Initialize a Kafka Message Fetcher.
4548
4649
Keyword Arguments:
@@ -68,8 +71,6 @@ def __init__(self, client, subscriptions, **configs):
6871
the messages occurred. This check adds some overhead, so it may
6972
be disabled in cases seeking extreme performance. Default: True
7073
"""
71-
#metrics=None,
72-
#metric_group_prefix='consumer',
7374
self.config = copy.copy(self.DEFAULT_CONFIG)
7475
for key in self.config:
7576
if key in configs:
@@ -83,8 +84,7 @@ def __init__(self, client, subscriptions, **configs):
8384
self._record_too_large_partitions = dict() # {topic_partition: offset}
8485
self._iterator = None
8586
self._fetch_futures = collections.deque()
86-
87-
#self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
87+
self._sensors = FetchManagerMetrics(metrics, metric_group_prefix)
8888

8989
def init_fetches(self):
9090
"""Send FetchRequests asynchronously for all assigned partitions.
@@ -109,7 +109,7 @@ def _init_fetches(self):
109109
if self._client.ready(node_id):
110110
log.debug("Sending FetchRequest to node %s", node_id)
111111
future = self._client.send(node_id, request)
112-
future.add_callback(self._handle_fetch_response, request)
112+
future.add_callback(self._handle_fetch_response, request, time.time())
113113
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
114114
futures.append(future)
115115
self._fetch_futures.extend(futures)
@@ -575,10 +575,11 @@ def _create_fetch_requests(self):
575575
partition_data.items())
576576
return requests
577577

578-
def _handle_fetch_response(self, request, response):
578+
def _handle_fetch_response(self, request, send_time, response):
579579
"""The callback for fetch completion"""
580-
#total_bytes = 0
581-
#total_count = 0
580+
total_bytes = 0
581+
total_count = 0
582+
recv_time = time.time()
582583

583584
fetch_offsets = {}
584585
for topic, partitions in request.topics:
@@ -609,6 +610,7 @@ def _handle_fetch_response(self, request, response):
609610
position)
610611
continue
611612

613+
num_bytes = 0
612614
partial = None
613615
if messages and isinstance(messages[-1][-1], PartialMessage):
614616
partial = messages.pop()
@@ -618,18 +620,18 @@ def _handle_fetch_response(self, request, response):
618620
" offset %d to buffered record list", tp,
619621
position)
620622
self._records.append((fetch_offset, tp, messages))
621-
#last_offset, _, _ = messages[-1]
622-
#self.sensors.records_fetch_lag.record(highwater - last_offset)
623+
last_offset, _, _ = messages[-1]
624+
self._sensors.records_fetch_lag.record(highwater - last_offset)
625+
num_bytes = sum(msg[1] for msg in messages)
623626
elif partial:
624627
# we did not read a single message from a non-empty
625628
# buffer because that message's size is larger than
626629
# fetch size, in this case record this exception
627630
self._record_too_large_partitions[tp] = fetch_offset
628631

629-
# TODO: bytes metrics
630-
#self.sensors.record_topic_fetch_metrics(tp.topic, num_bytes, parsed.size());
631-
#totalBytes += num_bytes;
632-
#totalCount += parsed.size();
632+
self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages))
633+
total_bytes += num_bytes
634+
total_count += len(messages)
633635
elif error_type in (Errors.NotLeaderForPartitionError,
634636
Errors.UnknownTopicOrPartitionError):
635637
self._client.cluster.request_update()
@@ -649,56 +651,82 @@ def _handle_fetch_response(self, request, response):
649651
else:
650652
raise error_type('Unexpected error while fetching data')
651653

652-
"""TOOD - metrics
653-
self.sensors.bytesFetched.record(totalBytes)
654-
self.sensors.recordsFetched.record(totalCount)
655-
self.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime())
656-
self.sensors.fetchLatency.record(resp.requestLatencyMs())
654+
self._sensors.bytes_fetched.record(total_bytes)
655+
self._sensors.records_fetched.record(total_count)
656+
self._sensors.fetch_throttle_time_sensor.record(response['throttle_time_ms'])
657+
self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
657658

658659

659660
class FetchManagerMetrics(object):
660661
def __init__(self, metrics, prefix):
661662
self.metrics = metrics
662-
self.group_name = prefix + "-fetch-manager-metrics"
663-
664-
self.bytes_fetched = metrics.sensor("bytes-fetched")
665-
self.bytes_fetched.add(metrics.metricName("fetch-size-avg", self.group_name,
666-
"The average number of bytes fetched per request"), metrics.Avg())
667-
self.bytes_fetched.add(metrics.metricName("fetch-size-max", self.group_name,
668-
"The maximum number of bytes fetched per request"), metrics.Max())
669-
self.bytes_fetched.add(metrics.metricName("bytes-consumed-rate", self.group_name,
670-
"The average number of bytes consumed per second"), metrics.Rate())
671-
672-
self.records_fetched = self.metrics.sensor("records-fetched")
673-
self.records_fetched.add(metrics.metricName("records-per-request-avg", self.group_name,
674-
"The average number of records in each request"), metrics.Avg())
675-
self.records_fetched.add(metrics.metricName("records-consumed-rate", self.group_name,
676-
"The average number of records consumed per second"), metrics.Rate())
677-
678-
self.fetch_latency = metrics.sensor("fetch-latency")
679-
self.fetch_latency.add(metrics.metricName("fetch-latency-avg", self.group_name,
680-
"The average time taken for a fetch request."), metrics.Avg())
681-
self.fetch_latency.add(metrics.metricName("fetch-latency-max", self.group_name,
682-
"The max time taken for any fetch request."), metrics.Max())
683-
self.fetch_latency.add(metrics.metricName("fetch-rate", self.group_name,
684-
"The number of fetch requests per second."), metrics.Rate(sampled_stat=metrics.Count()))
685-
686-
self.records_fetch_lag = metrics.sensor("records-lag")
687-
self.records_fetch_lag.add(metrics.metricName("records-lag-max", self.group_name,
688-
"The maximum lag in terms of number of records for any partition in self window"), metrics.Max())
689-
690-
self.fetch_throttle_time_sensor = metrics.sensor("fetch-throttle-time")
691-
self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-avg", self.group_name,
692-
"The average throttle time in ms"), metrics.Avg())
693-
self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-max", self.group_name,
694-
"The maximum throttle time in ms"), metrics.Max())
695-
696-
def record_topic_fetch_metrics(topic, num_bytes, num_records):
697-
# record bytes fetched
698-
name = '.'.join(["topic", topic, "bytes-fetched"])
699-
self.metrics[name].record(num_bytes);
700-
701-
# record records fetched
702-
name = '.'.join(["topic", topic, "records-fetched"])
703-
self.metrics[name].record(num_records)
704-
"""
663+
self.group_name = '%s-fetch-manager-metrics' % prefix
664+
665+
self.bytes_fetched = metrics.sensor('bytes-fetched')
666+
self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name,
667+
'The average number of bytes fetched per request'), Avg())
668+
self.bytes_fetched.add(metrics.metric_name('fetch-size-max', self.group_name,
669+
'The maximum number of bytes fetched per request'), Max())
670+
self.bytes_fetched.add(metrics.metric_name('bytes-consumed-rate', self.group_name,
671+
'The average number of bytes consumed per second'), Rate())
672+
673+
self.records_fetched = self.metrics.sensor('records-fetched')
674+
self.records_fetched.add(metrics.metric_name('records-per-request-avg', self.group_name,
675+
'The average number of records in each request'), Avg())
676+
self.records_fetched.add(metrics.metric_name('records-consumed-rate', self.group_name,
677+
'The average number of records consumed per second'), Rate())
678+
679+
self.fetch_latency = metrics.sensor('fetch-latency')
680+
self.fetch_latency.add(metrics.metric_name('fetch-latency-avg', self.group_name,
681+
'The average time taken for a fetch request.'), Avg())
682+
self.fetch_latency.add(metrics.metric_name('fetch-latency-max', self.group_name,
683+
'The max time taken for any fetch request.'), Max())
684+
self.fetch_latency.add(metrics.metric_name('fetch-rate', self.group_name,
685+
'The number of fetch requests per second.'), Rate(sampled_stat=Count()))
686+
687+
self.records_fetch_lag = metrics.sensor('records-lag')
688+
self.records_fetch_lag.add(metrics.metric_name('records-lag-max', self.group_name,
689+
'The maximum lag in terms of number of records for any partition in self window'), Max())
690+
691+
self.fetch_throttle_time_sensor = metrics.sensor('fetch-throttle-time')
692+
self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-avg', self.group_name,
693+
'The average throttle time in ms'), Avg())
694+
self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-max', self.group_name,
695+
'The maximum throttle time in ms'), Max())
696+
697+
def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
698+
metric_tags = {'topic': topic.replace('.', '_')}
699+
700+
# record bytes fetched
701+
name = '.'.join(['topic', topic, 'bytes-fetched'])
702+
bytes_fetched = self.metrics.get_sensor(name)
703+
if not bytes_fetched:
704+
bytes_fetched = self.metrics.sensor(name)
705+
bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
706+
self.group_name,
707+
'The average number of bytes fetched per request for topic %s' % topic,
708+
metric_tags), Avg())
709+
bytes_fetched.add(self.metrics.metric_name('fetch-size-max',
710+
self.group_name,
711+
'The maximum number of bytes fetched per request for topic %s' % topic,
712+
metric_tags), Max())
713+
bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate',
714+
self.group_name,
715+
'The average number of bytes consumed per second for topic %s' % topic,
716+
metric_tags), Rate())
717+
bytes_fetched.record(num_bytes)
718+
719+
# record records fetched
720+
name = '.'.join(['topic', topic, 'records-fetched'])
721+
records_fetched = self.metrics.get_sensor(name)
722+
if not records_fetched:
723+
records_fetched = self.metrics.sensor(name)
724+
records_fetched.add(self.metrics.metric_name('records-per-request-avg',
725+
self.group_name,
726+
'The average number of records in each request for topic %s' % topic,
727+
metric_tags), Avg())
728+
records_fetched.add(self.metrics.metric_name('records-consumed-rate',
729+
self.group_name,
730+
'The average number of records consumed per second for topic %s' % topic,
731+
metric_tags), Rate())
732+
records_fetched.record(num_records)

kafka/consumer/group.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def __init__(self, *topics, **configs):
218218
reporters.append(DictReporter('kafka.consumer'))
219219
self._metrics = Metrics(metric_config, reporters)
220220
metric_group_prefix = 'consumer'
221-
# TODO _metrics likely needs to be passed to KafkaClient, Fetcher, etc.
221+
# TODO _metrics likely needs to be passed to KafkaClient, etc.
222222

223223
self._client = KafkaClient(**self.config)
224224

@@ -233,7 +233,7 @@ def __init__(self, *topics, **configs):
233233

234234
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
235235
self._fetcher = Fetcher(
236-
self._client, self._subscription, **self.config)
236+
self._client, self._subscription, self._metrics, metric_group_prefix, **self.config)
237237
self._coordinator = ConsumerCoordinator(
238238
self._client, self._subscription, self._metrics, metric_group_prefix,
239239
assignors=self.config['partition_assignment_strategy'],

test/test_fetcher.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from kafka.consumer.subscription_state import SubscriptionState
99
import kafka.errors as Errors
1010
from kafka.future import Future
11+
from kafka.metrics import Metrics
1112
from kafka.protocol.fetch import FetchRequest
1213
from kafka.structs import TopicPartition, OffsetAndMetadata
1314

@@ -29,7 +30,7 @@ def fetcher(client, subscription_state):
2930
subscription_state.assign_from_subscribed(assignment)
3031
for tp in assignment:
3132
subscription_state.seek(tp, 0)
32-
return Fetcher(client, subscription_state)
33+
return Fetcher(client, subscription_state, Metrics(), 'test_fetcher')
3334

3435

3536
def test_init_fetches(fetcher, mocker):

0 commit comments

Comments
 (0)