Skip to content

Commit

Permalink
[BACKPORT] Send TCP metrics for bytes sent and received to server (#500)
Browse files Browse the repository at this point in the history
* Send TCP metrics for bytes sent and received to server

It would be good to send `tcp.bytesReceived` and `tcp.bytesSend`
metrics in the metrics blob to the server, so that, someone can
track how much traffic is passing between the server and the client.

I have manually tested the stats exported to Prometheus and verified
that these two new stats are displayed there correctly.

* add v5 to branch filter in test runner
  • Loading branch information
mdumandag committed Oct 14, 2021
1 parent f1db480 commit b012a97
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 12 deletions.
14 changes: 9 additions & 5 deletions hazelcast/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ def __init__(self):
loop.shutdown()
loop = _BasicLoop(self.map)
self._loop = loop
self.bytes_sent = 0
self.bytes_received = 0

def start(self):
self._loop.start()
Expand Down Expand Up @@ -421,9 +423,11 @@ def handle_read(self):
try:
while True:
data = self.recv(receive_buffer_size)
bytes_received = len(data)
self._reactor.bytes_received += bytes_received
reader.read(data)
self.last_read_time = time.time()
if len(data) < receive_buffer_size:
if bytes_received < receive_buffer_size:
break
except socket.error as err:
if err.args[0] not in _RETRYABLE_ERROR_CODES:
Expand Down Expand Up @@ -461,7 +465,7 @@ def handle_write(self):
buf.truncate(0)

try:
sent = self.send(bytes_)
bytes_sent = self.send(bytes_)
except socket.error as err:
if err.args[0] in _RETRYABLE_ERROR_CODES:
# Couldn't write the bytes but we should
Expand All @@ -474,9 +478,9 @@ def handle_write(self):
# No exception is thrown during the send
self.last_write_time = time.time()
self.sent_protocol_bytes = True

if sent < len(bytes_):
write_queue.appendleft(bytes_[sent:])
self._reactor.bytes_sent += bytes_sent
if bytes_sent < len(bytes_):
write_queue.appendleft(bytes_[bytes_sent:])

def handle_close(self):
_logger.warning("Connection closed by server")
Expand Down
30 changes: 25 additions & 5 deletions hazelcast/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
_NEAR_CACHE_DESCRIPTOR_PREFIX = "nearcache"
_NEAR_CACHE_DESCRIPTOR_DISCRIMINATOR = "name"

_TCP_METRICS_PREFIX = "tcp"


class Statistics(object):
def __init__(
Expand Down Expand Up @@ -130,7 +132,7 @@ def _register_system_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LONG
self._registered_system_gauges[gauge_name] = (gauge_fn, value_type)
except Exception as e:
_logger.debug(
"Unable to register the system related gauge %s. Error: %s" % (gauge_name, e)
"Unable to register the system related gauge %s. Error: %s", gauge_name, e
)

def _register_process_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LONG):
Expand All @@ -141,7 +143,7 @@ def _register_process_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LON
self._registered_process_gauges[gauge_name] = (gauge_fn, value_type)
except Exception as e:
_logger.debug(
"Unable to register the process related gauge %s. Error: %s" % (gauge_name, e)
"Unable to register the process related gauge %s. Error: %s", gauge_name, e
)

def _collect_and_send_stats(self):
Expand All @@ -157,6 +159,7 @@ def _collect_and_send_stats(self):
self._add_client_attributes(attributes, connection)
self._add_near_cache_metrics(attributes, compressor)
self._add_system_and_process_metrics(attributes, compressor)
self._add_tcp_metrics(compressor)
self._send_stats(
collection_timestamp, "".join(attributes), compressor.generate_blob(), connection
)
Expand All @@ -180,7 +183,7 @@ def _add_system_and_process_metrics(self, attributes, compressor):
attributes, compressor, gauge_name, value, value_type
)
except:
_logger.exception("Error while collecting '%s'." % gauge_name)
_logger.exception("Error while collecting '%s'.", gauge_name)

if not self._registered_process_gauges:
# Do not create the process object if no process-related
Expand All @@ -195,7 +198,7 @@ def _add_system_and_process_metrics(self, attributes, compressor):
attributes, compressor, gauge_name, value, value_type
)
except:
_logger.exception("Error while collecting '%s'." % gauge_name)
_logger.exception("Error while collecting '%s'.", gauge_name)

def _add_system_or_process_metric(self, attributes, compressor, gauge_name, value, value_type):
# We don't have any metrics that do not have prefix.
Expand Down Expand Up @@ -342,9 +345,26 @@ def _add_near_cache_metric(
self._add_attribute(attributes, metric, value, nc_name_with_prefix)
except:
_logger.exception(
"Error while collecting %s metric for near cache '%s'" % (metric, nc_name)
"Error while collecting %s metric for near cache '%s'.", metric, nc_name
)

def _add_tcp_metrics(self, compressor):
self._add_tcp_metric(compressor, "bytesSend", self._reactor.bytes_sent)
self._add_tcp_metric(compressor, "bytesReceived", self._reactor.bytes_received)

def _add_tcp_metric(
self, compressor, metric, value, value_type=ValueType.LONG, unit=ProbeUnit.BYTES
):
descriptor = MetricDescriptor(
metric=metric,
prefix=_TCP_METRICS_PREFIX,
unit=unit,
)
try:
self._add_metric(compressor, descriptor, value, value_type)
except:
_logger.exception("Error while collecting '%s.%s'.", _TCP_METRICS_PREFIX, metric)

def _add_metric(self, compressor, descriptor, value, value_type):
if value_type == ValueType.LONG:
compressor.add_long(descriptor, value)
Expand Down
33 changes: 31 additions & 2 deletions tests/integration/backward_compatible/client_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import time

from tests.base import HazelcastTestCase
from tests.base import HazelcastTestCase, SingleMemberTestCase
from hazelcast.client import HazelcastClient
from hazelcast.lifecycle import LifecycleState
from tests.hzrc.ttypes import Lang
from tests.util import get_current_timestamp
from tests.util import get_current_timestamp, random_string


class ClientTest(HazelcastTestCase):
Expand Down Expand Up @@ -110,3 +110,32 @@ def get_labels_from_member(self, client_uuid):
client_uuid
)
return self.rc.executeOnController(self.cluster.id, script, Lang.JAVASCRIPT).result


class ClientTcpMetricsTest(SingleMemberTestCase):
@classmethod
def configure_client(cls, config):
config["cluster_name"] = cls.cluster.id
return config

def test_bytes_received(self):
reactor = self.client._reactor

bytes_received = reactor.bytes_received
self.assertGreater(bytes_received, 0)

m = self.client.get_map(random_string()).blocking()
m.get(random_string())

self.assertGreater(reactor.bytes_received, bytes_received)

def test_bytes_sent(self):
reactor = self.client._reactor

bytes_sent = reactor.bytes_sent
self.assertGreater(bytes_sent, 0)

m = self.client.get_map(random_string()).blocking()
m.set(random_string(), random_string())

self.assertGreater(reactor.bytes_sent, bytes_sent)

0 comments on commit b012a97

Please sign in to comment.