From b2b5fd253c2493456b5509ca160b1afa5cf2b466 Mon Sep 17 00:00:00 2001 From: talwai Date: Mon, 27 Jun 2016 13:40:54 +0200 Subject: [PATCH 1/8] contrib: add cassandra integration --- .gitignore | 3 + ddtrace/contrib/cassandra/__init__.py | 1 + ddtrace/contrib/cassandra/session.py | 163 ++++++++++++++++++++++++++ ddtrace/util.py | 83 +++++++++++++ 4 files changed, 250 insertions(+) create mode 100644 ddtrace/contrib/cassandra/__init__.py create mode 100644 ddtrace/contrib/cassandra/session.py create mode 100644 ddtrace/util.py diff --git a/.gitignore b/.gitignore index e274e606a3f..0f40e2d8ecf 100644 --- a/.gitignore +++ b/.gitignore @@ -88,3 +88,6 @@ ENV/ # Rope project settings .ropeproject + +# Vim +*.swp diff --git a/ddtrace/contrib/cassandra/__init__.py b/ddtrace/contrib/cassandra/__init__.py new file mode 100644 index 00000000000..40cdf99e8d6 --- /dev/null +++ b/ddtrace/contrib/cassandra/__init__.py @@ -0,0 +1 @@ +# Tracing for cassandra diff --git a/ddtrace/contrib/cassandra/session.py b/ddtrace/contrib/cassandra/session.py new file mode 100644 index 00000000000..bee30893064 --- /dev/null +++ b/ddtrace/contrib/cassandra/session.py @@ -0,0 +1,163 @@ +""" +Trace queries along a session to a cassandra cluster +""" + +# stdlib +import logging + +# project +from ...util import deep_getattr + +# 3p +_installed = False +try: + from cassandra.cluster import Session as session + _installed = True +except ImportError: + session = object + + +log = logging.getLogger(__name__) + + + +def trace(cassandra, tracer, service="cassandra", meta=None): + """ Trace synchronous cassandra commands by patching the client """ + if inspect.ismodule(cassandra) and deep_getattr(cassandra, "Session.execute"): + log.debug("Patching cassandra Session class") + cassandra.Session = functools.partial( + TracedSession, + datadog_tracer=tracer, + datadog_service=service, + ) + elif hasattr(cassandra, "execute"): + log.debug("Patching cassandra Session instance") + safe_patch(cassandra, "execute", _patched_execute_command, service, meta, tracer) + + +class TracedSession(session): + + def __init__(self, *args, **kwargs): + self._datadog_tracer = kwargs.pop("datadog_tracer", None) + self._datadog_service = kwargs.pop("datadog_service", None) + self._datadog_tags = kwargs.pop("datadog_tags", None) + super(TracedSession, self).__init__(*args, **kwargs) + + def execute(self, query, *args, **options): + if not self._datadog_tracer: + return session.execute(query, *args, **options) + + with self._datadog_tracer.trace("cassandra.query", service=self._datadog_service) as span: + query_string = _sanitize_query(query) + span.resource = query_string + + span.set_tag("query", query_string) + + span.set_tags(_extract_session_metas(self)) + cluster = getattr(self, cluster, None) + span.set_tags(_extract_cluster_metas(cluster)) + + result = None + try: + result = super(TracedSession, self).execute(query, *args, **options) + return result + finally: + span.set_tags(_extract_result_metas(result)) + + +def _patched_execute_command(orig_command, service, meta, tracer): + log.debug("Patching cassandra.Session.execute call for service %s", service) + + def traced_execute_command(self, query, *args, **options): + with tracer.trace("cassandra.query", service=service) as span: + query_string = _sanitize_query(query) + + span.resource = query_string + span.set_tag("query", query_string) + + span.set_tags(_extract_session_metas(self)) + cluster = getattr(self, cluster, None) + span.set_tags(_extract_cluster_metas(cluster)) + + try: + result = orig_command(self, query, *args, **options) + return result + finally: + span.set_tags(_extract_result_metas(result)) + + return traced_execute_command + + +def _extract_session_metas(session): + metas = {} + + if getattr(session, "keyspace", None): + # NOTE the keyspace can be overridden explicitly in the query itself + # e.g. "Select * from trace.hash_to_resource" + # currently we don't account for this, which is probably fine + # since the keyspace info is contained in the query even if the metadata disagrees + metas["keyspace"] = session.keyspace.lower() + + return metas + +def _extract_cluster_metas(cluster): + metas = {} + if deep_getattr(cluster, "metadata.cluster_name"): + metas["cluster_name"] = cluster.metadata.cluster_name + # Needed for hostname grouping + metas["out.section"] = cluster.metadata.cluster_name + + if getattr(cluster, "port", None): + metas["port"] = cluster.port + + if getattr(cluster, "contact_points", None): + metas["contact_points"] = cluster.contact_points + # Use the first contact point as a persistent host + if isinstance(cluster.contact_points, list) and len(cluster.contact_points) > 0: + metas["out.host"] = cluster.contact_points[0] + + if getattr(cluster, "compression", None): + metas["compression"] = cluster.compression + if getattr(cluster, "cql_version", None): + metas["cql_version"] = cluster.cql_version + + return metas + +def _extract_result_metas(result): + metas = {} + if deep_getattr(result, "response_future.query"): + query = result.response_future.query + + if getattr(query, "consistency_level", None): + metas["consistency_level"] = query.consistency_level + if getattr(query, "keyspace", None): + # Overrides session.keyspace if the query has been prepared against a particular + # keyspace + metas["keyspace"] = query.keyspace.lower() + + if hasattr(result, "has_more_pages"): + if result.has_more_pages: + metas["paginated"] = True + else: + metas["paginated"] = False + + # NOTE(aaditya): this number only reflects the first page of results + # which could be misleading. But a true count would require iterating through + # all pages which is expensive + if hasattr(result, "current_rows"): + result_rows = result.current_rows or [] + metas["db.rowcount"] = len(result_rows) + + return metas + +def _sanitize_query(query): + """ Sanitize the query to something ready for the agent receiver + - Cast to unicode + - truncate if needed + """ + # TODO (aaditya): fix this hacky type check. we need it to avoid circular imports + if type(query).__name__ in ('SimpleStatement', 'PreparedStatement'): + # reset query if a string is available + query = getattr(query, "query_string", query) + + return unicode(query)[:RESOURCE_MAX_LENGTH] diff --git a/ddtrace/util.py b/ddtrace/util.py new file mode 100644 index 00000000000..6727289165f --- /dev/null +++ b/ddtrace/util.py @@ -0,0 +1,83 @@ +""" +Generic utilities for tracers +""" + +import inspect + + +def deep_getattr(obj, attr_string, default=None): + """ + Returns the attribute of `obj` at the dotted path given by `attr_string` + If no such attribute is reachable, returns `default` + + >>> deep_getattr(cass, "cluster") + >> deep_getattr(cass, "cluster.metadata.partitioner") + u'org.apache.cassandra.dht.Murmur3Partitioner' + + >>> deep_getattr(cass, "i.dont.exist", default="default") + 'default' + """ + attrs = attr_string.split('.') + for attr in attrs: + try: + obj = getattr(obj, attr) + except AttributeError: + return default + + return obj + + +# monkey patch all the things +# subtle: +# if this is the module/class we can go yolo as methods are unbound +# and just stored like that in class __dict__ +# if this is an instance, we have to unbind the current and rebind our +# patched method + +# Also subtle: +# If patchable is an instance and if we've already patched at the module/class level +# then patchable[key] contains an already patched command! +# To workaround this, check if patchable or patchable.__class__ are _dogtraced +# If is isn't, nothing to worry about, patch the key as usual +# But if it is, search for a "__dd_orig_{key}" method on the class, which is +# the original unpatched method we wish to trace. + +def safe_patch(patchable, key, patch_func, service, meta): + """ takes patch_func (signature: takes the orig_method that is + wrapped in the monkey patch == UNBOUND + service and meta) and + attach the patched result to patchable at patchable.key + """ + + def _get_original_method(thing, key): + orig = None + if hasattr(thing, '_dogtraced'): + # Search for original method + orig = getattr(thing, "__dd_orig_{}".format(key), None) + else: + orig = getattr(thing, key) + # Set it for the next time we attempt to patch `thing` + setattr(thing, "__dd_orig_{}".format(key), orig) + + return orig + + if inspect.isclass(patchable) or inspect.ismodule(patchable): + orig = _get_original_method(patchable, key) + if not orig: + # Should never happen + return + elif hasattr(patchable, '__class__'): + orig = _get_original_method(patchable.__class__, key) + if not orig: + # Should never happen + return + else: + return + + dest = patch_func(orig, service, meta) + + if inspect.isclass(patchable) or inspect.ismodule(patchable): + setattr(patchable, key, dest) + elif hasattr(patchable, '__class__'): + setattr(patchable, key, dest.__get__(patchable, patchable.__class__)) From f1c5155872b4e48caadf8ac01166e78e1556f538 Mon Sep 17 00:00:00 2001 From: talwai Date: Mon, 27 Jun 2016 16:32:03 +0200 Subject: [PATCH 2/8] trace/contrib: Add some cass tests --- ddtrace/contrib/cassandra/__init__.py | 3 + ddtrace/contrib/cassandra/session.py | 20 ++++-- ddtrace/util.py | 4 +- tests/contrib/cassandra/__init__.py | 0 tests/contrib/cassandra/test.py | 97 +++++++++++++++++++++++++++ 5 files changed, 116 insertions(+), 8 deletions(-) create mode 100644 tests/contrib/cassandra/__init__.py create mode 100644 tests/contrib/cassandra/test.py diff --git a/ddtrace/contrib/cassandra/__init__.py b/ddtrace/contrib/cassandra/__init__.py index 40cdf99e8d6..bc5a0a8d410 100644 --- a/ddtrace/contrib/cassandra/__init__.py +++ b/ddtrace/contrib/cassandra/__init__.py @@ -1 +1,4 @@ # Tracing for cassandra +from .session import trace + +__all__ = ['trace'] diff --git a/ddtrace/contrib/cassandra/session.py b/ddtrace/contrib/cassandra/session.py index bee30893064..7fb6e4bc3e2 100644 --- a/ddtrace/contrib/cassandra/session.py +++ b/ddtrace/contrib/cassandra/session.py @@ -3,10 +3,13 @@ """ # stdlib +import functools +import inspect import logging + # project -from ...util import deep_getattr +from ...util import deep_getattr, safe_patch # 3p _installed = False @@ -18,7 +21,7 @@ log = logging.getLogger(__name__) - +RESOURCE_MAX_LENGTH=5000 def trace(cassandra, tracer, service="cassandra", meta=None): @@ -29,10 +32,11 @@ def trace(cassandra, tracer, service="cassandra", meta=None): TracedSession, datadog_tracer=tracer, datadog_service=service, + datadog_tags=meta, ) elif hasattr(cassandra, "execute"): log.debug("Patching cassandra Session instance") - safe_patch(cassandra, "execute", _patched_execute_command, service, meta, tracer) + safe_patch(cassandra, "execute", patch_execute, service, meta, tracer) class TracedSession(session): @@ -54,7 +58,7 @@ def execute(self, query, *args, **options): span.set_tag("query", query_string) span.set_tags(_extract_session_metas(self)) - cluster = getattr(self, cluster, None) + cluster = getattr(self, "cluster", None) span.set_tags(_extract_cluster_metas(cluster)) result = None @@ -65,7 +69,7 @@ def execute(self, query, *args, **options): span.set_tags(_extract_result_metas(result)) -def _patched_execute_command(orig_command, service, meta, tracer): +def patch_execute(orig_command, service, meta, tracer): log.debug("Patching cassandra.Session.execute call for service %s", service) def traced_execute_command(self, query, *args, **options): @@ -76,9 +80,10 @@ def traced_execute_command(self, query, *args, **options): span.set_tag("query", query_string) span.set_tags(_extract_session_metas(self)) - cluster = getattr(self, cluster, None) + cluster = getattr(self, "cluster", None) span.set_tags(_extract_cluster_metas(cluster)) + result = None try: result = orig_command(self, query, *args, **options) return result @@ -125,6 +130,9 @@ def _extract_cluster_metas(cluster): def _extract_result_metas(result): metas = {} + if not result: + return metas + if deep_getattr(result, "response_future.query"): query = result.response_future.query diff --git a/ddtrace/util.py b/ddtrace/util.py index 6727289165f..4d229c5ccc6 100644 --- a/ddtrace/util.py +++ b/ddtrace/util.py @@ -44,7 +44,7 @@ def deep_getattr(obj, attr_string, default=None): # But if it is, search for a "__dd_orig_{key}" method on the class, which is # the original unpatched method we wish to trace. -def safe_patch(patchable, key, patch_func, service, meta): +def safe_patch(patchable, key, patch_func, service, meta, tracer): """ takes patch_func (signature: takes the orig_method that is wrapped in the monkey patch == UNBOUND + service and meta) and attach the patched result to patchable at patchable.key @@ -75,7 +75,7 @@ def _get_original_method(thing, key): else: return - dest = patch_func(orig, service, meta) + dest = patch_func(orig, service, meta, tracer) if inspect.isclass(patchable) or inspect.ismodule(patchable): setattr(patchable, key, dest) diff --git a/tests/contrib/cassandra/__init__.py b/tests/contrib/cassandra/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/contrib/cassandra/test.py b/tests/contrib/cassandra/test.py new file mode 100644 index 00000000000..0cc047a2bf2 --- /dev/null +++ b/tests/contrib/cassandra/test.py @@ -0,0 +1,97 @@ +import unittest +from nose.tools import eq_ + +# We should probably be smarter than that +try: + from cassandra.cluster import Cluster +except ImportError: + Cluster = None + +from ddtrace.contrib.cassandra import trace as trace_cassandra +from ddtrace.tracer import Tracer + +from ...test_tracer import DummyWriter + +class CassandraTest(unittest.TestCase): + """Needs a running cassandra at localhost:9042""" + + TEST_QUERY = "SELECT * from test.person" + TEST_KEYSPACE = "test" + + def setUp(self): + if not Cluster: + raise unittest.SkipTest("cassandra.cluster.Cluster is not available.") + + self.cluster = Cluster(port=9040) + session = self.cluster.connect() + session.execute("""CREATE KEYSPACE test WITH REPLICATION = { + 'class' : 'SimpleStrategy', + 'replication_factor': 1 + }""") + session.execute("CREATE TABLE test.person (name text PRIMARY KEY, age int, description text)") + session.execute("""INSERT INTO test.person (name, age, description) VALUES ('Cassandra', 100, 'A cruel mistress')""") + + + def _assert_result_correct(self, result): + eq_(len(result.current_rows), 1) + for r in result: + eq_(r.name, "Cassandra") + eq_(r.age, 100) + eq_(r.description, "A cruel mistress") + + def test_cassandra_instance(self): + """ + Tests patching a cassandra Session instance + """ + writer = DummyWriter() + tracer = Tracer(writer=writer) + session = self.cluster.connect("test") + + trace_cassandra(session, tracer) + result = session.execute(self.TEST_QUERY) + self._assert_result_correct(result) + + spans = writer.pop() + assert spans + eq_(len(spans), 1) + + span = spans[0] + eq_(span.service, "cassandra") + eq_(span.resource, self.TEST_QUERY) + eq_(span.get_tag("keyspace"), self.TEST_KEYSPACE) + eq_(span.get_tag("port"), "9040") + eq_(span.get_tag("db.rowcount"), "1") + eq_(span.get_tag("out.host"), "127.0.0.1") + + def test_cassandra_class(self): + """ + Tests patching a cassandra Session class + """ + writer = DummyWriter() + tracer = Tracer(writer=writer) + + import cassandra.cluster + trace_cassandra(cassandra.cluster, tracer) + session = Cluster(port=9040).connect("test") + result = session.execute(self.TEST_QUERY) + self._assert_result_correct(result) + + spans = writer.pop() + assert spans + + # Should be sending one request to "USE " and another for the actual query + eq_(len(spans), 2) + use, query = spans[0], spans[1] + + eq_(use.service, "cassandra") + eq_(use.resource, "USE %s" % self.TEST_KEYSPACE) + + eq_(query.service, "cassandra") + eq_(query.resource, self.TEST_QUERY) + eq_(query.get_tag("keyspace"), self.TEST_KEYSPACE) + eq_(query.get_tag("port"), "9040") + eq_(query.get_tag("db.rowcount"), "1") + eq_(query.get_tag("out.host"), "127.0.0.1") + + def tearDown(self): + self.cluster.connect().execute("DROP KEYSPACE IF EXISTS test") From 91057fe256ed841ae96b53e924c5462593ce41b7 Mon Sep 17 00:00:00 2001 From: talwai Date: Mon, 27 Jun 2016 16:39:51 +0200 Subject: [PATCH 3/8] trace/contrib: circle tests for cass --- circle.yml | 1 + tests/contrib/cassandra/test.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/circle.yml b/circle.yml index 0379c2d6573..4b5aa115354 100644 --- a/circle.yml +++ b/circle.yml @@ -9,6 +9,7 @@ dependencies: test: override: - docker run -d -p 9200:9200 elasticsearch:2.3 + - docker run -d -p 9042:9042 cassandra:3 - docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test -e POSTGRES_DB=test postgres:9.5 - python2.7 setup.py test - python3.4 setup.py test diff --git a/tests/contrib/cassandra/test.py b/tests/contrib/cassandra/test.py index 0cc047a2bf2..f9bef3428db 100644 --- a/tests/contrib/cassandra/test.py +++ b/tests/contrib/cassandra/test.py @@ -22,7 +22,7 @@ def setUp(self): if not Cluster: raise unittest.SkipTest("cassandra.cluster.Cluster is not available.") - self.cluster = Cluster(port=9040) + self.cluster = Cluster(port=9042) session = self.cluster.connect() session.execute("""CREATE KEYSPACE test WITH REPLICATION = { 'class' : 'SimpleStrategy', @@ -59,7 +59,7 @@ def test_cassandra_instance(self): eq_(span.service, "cassandra") eq_(span.resource, self.TEST_QUERY) eq_(span.get_tag("keyspace"), self.TEST_KEYSPACE) - eq_(span.get_tag("port"), "9040") + eq_(span.get_tag("port"), "9042") eq_(span.get_tag("db.rowcount"), "1") eq_(span.get_tag("out.host"), "127.0.0.1") @@ -72,7 +72,7 @@ def test_cassandra_class(self): import cassandra.cluster trace_cassandra(cassandra.cluster, tracer) - session = Cluster(port=9040).connect("test") + session = Cluster(port=9042).connect("test") result = session.execute(self.TEST_QUERY) self._assert_result_correct(result) @@ -89,7 +89,7 @@ def test_cassandra_class(self): eq_(query.service, "cassandra") eq_(query.resource, self.TEST_QUERY) eq_(query.get_tag("keyspace"), self.TEST_KEYSPACE) - eq_(query.get_tag("port"), "9040") + eq_(query.get_tag("port"), "9042") eq_(query.get_tag("db.rowcount"), "1") eq_(query.get_tag("out.host"), "127.0.0.1") From b470a1567765d43a3d5e326c93bca76111545a36 Mon Sep 17 00:00:00 2001 From: talwai Date: Mon, 27 Jun 2016 16:54:35 +0200 Subject: [PATCH 4/8] trace/contrib: add cass driver to reqs --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 4c3370b2e9c..7873925311d 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,8 @@ 'blinker', 'elasticsearch', 'psycopg2', - 'django' + 'django', + 'cassandra-driver' ] setup( From 64b640aaaa1057be2153d2797c71d1de609b81a4 Mon Sep 17 00:00:00 2001 From: talwai Date: Mon, 27 Jun 2016 17:12:41 +0200 Subject: [PATCH 5/8] trace/contrib: use stringify --- ddtrace/contrib/cassandra/__init__.py | 10 ++- ddtrace/contrib/cassandra/session.py | 124 ++++++++++---------------- ddtrace/ext/cassandra.py | 10 +++ ddtrace/util.py | 30 +++---- tests/contrib/cassandra/test.py | 47 +++------- 5 files changed, 92 insertions(+), 129 deletions(-) create mode 100644 ddtrace/ext/cassandra.py diff --git a/ddtrace/contrib/cassandra/__init__.py b/ddtrace/contrib/cassandra/__init__.py index bc5a0a8d410..7a372d28071 100644 --- a/ddtrace/contrib/cassandra/__init__.py +++ b/ddtrace/contrib/cassandra/__init__.py @@ -1,4 +1,8 @@ -# Tracing for cassandra -from .session import trace +from .util import require_modules -__all__ = ['trace'] +required_modules = ['cassandra.cluster'] + +with require_modules(required_modules) as missing_modules: + if not missing_modules: + from .session import trace + __all__ = ['trace'] diff --git a/ddtrace/contrib/cassandra/session.py b/ddtrace/contrib/cassandra/session.py index 7fb6e4bc3e2..063a29265b3 100644 --- a/ddtrace/contrib/cassandra/session.py +++ b/ddtrace/contrib/cassandra/session.py @@ -9,89 +9,65 @@ # project +from ...compat import stringify from ...util import deep_getattr, safe_patch +from ...ext import net as netx, cass as cassx # 3p -_installed = False -try: - from cassandra.cluster import Session as session - _installed = True -except ImportError: - session = object +from cassandra.cluster import Session log = logging.getLogger(__name__) -RESOURCE_MAX_LENGTH=5000 - - -def trace(cassandra, tracer, service="cassandra", meta=None): - """ Trace synchronous cassandra commands by patching the client """ - if inspect.ismodule(cassandra) and deep_getattr(cassandra, "Session.execute"): - log.debug("Patching cassandra Session class") - cassandra.Session = functools.partial( - TracedSession, - datadog_tracer=tracer, - datadog_service=service, - datadog_tags=meta, - ) - elif hasattr(cassandra, "execute"): - log.debug("Patching cassandra Session instance") - safe_patch(cassandra, "execute", patch_execute, service, meta, tracer) - -class TracedSession(session): - - def __init__(self, *args, **kwargs): - self._datadog_tracer = kwargs.pop("datadog_tracer", None) - self._datadog_service = kwargs.pop("datadog_service", None) - self._datadog_tags = kwargs.pop("datadog_tags", None) - super(TracedSession, self).__init__(*args, **kwargs) - - def execute(self, query, *args, **options): - if not self._datadog_tracer: - return session.execute(query, *args, **options) +RESOURCE_MAX_LENGTH=5000 +DEFAULT_SERVICE = "cassandra" - with self._datadog_tracer.trace("cassandra.query", service=self._datadog_service) as span: - query_string = _sanitize_query(query) - span.resource = query_string - span.set_tag("query", query_string) +def get_traced_cassandra(tracer, service=DEFAULT_SERVICE, meta=None): + return _get_traced_cluster(cassandra.cluster, tracer, service, meta - span.set_tags(_extract_session_metas(self)) - cluster = getattr(self, "cluster", None) - span.set_tags(_extract_cluster_metas(cluster)) - result = None - try: - result = super(TracedSession, self).execute(query, *args, **options) - return result - finally: - span.set_tags(_extract_result_metas(result)) +def _get_traced_cluster(cassandra, tracer, service="cassandra", meta=None): + """ Trace synchronous cassandra commands by patching the Session class """ + class TracedSession(Session): + def __init__(self, *args, **kwargs): + self._datadog_tracer = kwargs.pop("datadog_tracer", None) + self._datadog_service = kwargs.pop("datadog_service", None) + self._datadog_tags = kwargs.pop("datadog_tags", None) + super(TracedSession, self).__init__(*args, **kwargs) -def patch_execute(orig_command, service, meta, tracer): - log.debug("Patching cassandra.Session.execute call for service %s", service) + def execute(self, query, *args, **options): + if not self._datadog_tracer: + return session.execute(query, *args, **options) - def traced_execute_command(self, query, *args, **options): - with tracer.trace("cassandra.query", service=service) as span: - query_string = _sanitize_query(query) + with self._datadog_tracer.trace("cassandra.query", service=self._datadog_service) as span: + query_string = _sanitize_query(query) + span.resource = query_string - span.resource = query_string - span.set_tag("query", query_string) + span.set_tags(_extract_session_metas(self)) + cluster = getattr(self, "cluster", None) + span.set_tags(_extract_cluster_metas(cluster)) - span.set_tags(_extract_session_metas(self)) - cluster = getattr(self, "cluster", None) - span.set_tags(_extract_cluster_metas(cluster)) + result = None + try: + result = super(TracedSession, self).execute(query, *args, **options) + return result + finally: + span.set_tags(_extract_result_metas(result)) - result = None - try: - result = orig_command(self, query, *args, **options) - return result - finally: - span.set_tags(_extract_result_metas(result)) + class TracedCluster(cassandra.Cluster): - return traced_execute_command + def connect(): + cassandra.Session = functools.partial( + TracedSession, + datadog_tracer=tracer, + datadog_service=service, + datadog_tags=meta, + ) + return super(TracedCluster, self).connect() + return TracedCluster def _extract_session_metas(session): metas = {} @@ -101,7 +77,7 @@ def _extract_session_metas(session): # e.g. "Select * from trace.hash_to_resource" # currently we don't account for this, which is probably fine # since the keyspace info is contained in the query even if the metadata disagrees - metas["keyspace"] = session.keyspace.lower() + metas[cassx.KEYSPACE] = session.keyspace.lower() return metas @@ -109,17 +85,15 @@ def _extract_cluster_metas(cluster): metas = {} if deep_getattr(cluster, "metadata.cluster_name"): metas["cluster_name"] = cluster.metadata.cluster_name - # Needed for hostname grouping - metas["out.section"] = cluster.metadata.cluster_name if getattr(cluster, "port", None): - metas["port"] = cluster.port + metas[netx.TARGET_PORT] = cluster.port if getattr(cluster, "contact_points", None): metas["contact_points"] = cluster.contact_points # Use the first contact point as a persistent host if isinstance(cluster.contact_points, list) and len(cluster.contact_points) > 0: - metas["out.host"] = cluster.contact_points[0] + metas[netx.TARGET_PORT] = cluster.contact_points[0] if getattr(cluster, "compression", None): metas["compression"] = cluster.compression @@ -137,24 +111,24 @@ def _extract_result_metas(result): query = result.response_future.query if getattr(query, "consistency_level", None): - metas["consistency_level"] = query.consistency_level + metas[cassx.CONSISTENCY_LEVEL] = query.consistency_level if getattr(query, "keyspace", None): # Overrides session.keyspace if the query has been prepared against a particular # keyspace - metas["keyspace"] = query.keyspace.lower() + metas[cassx.KEYSPACE] = query.keyspace.lower() if hasattr(result, "has_more_pages"): if result.has_more_pages: - metas["paginated"] = True + metas[cassx.PAGINATED] = True else: - metas["paginated"] = False + metas[cassx.PAGINATED] = False # NOTE(aaditya): this number only reflects the first page of results # which could be misleading. But a true count would require iterating through # all pages which is expensive if hasattr(result, "current_rows"): result_rows = result.current_rows or [] - metas["db.rowcount"] = len(result_rows) + metas[cassx.ROW_COUNT] = len(result_rows) return metas @@ -168,4 +142,4 @@ def _sanitize_query(query): # reset query if a string is available query = getattr(query, "query_string", query) - return unicode(query)[:RESOURCE_MAX_LENGTH] + return stringify(query)[:RESOURCE_MAX_LENGTH] diff --git a/ddtrace/ext/cassandra.py b/ddtrace/ext/cassandra.py new file mode 100644 index 00000000000..a86bb6abc2b --- /dev/null +++ b/ddtrace/ext/cassandra.py @@ -0,0 +1,10 @@ + +# the type of the spans +TYPE = "cassandra" + +# tags +KEYSPACE = "cassandra.keyspace" +CONSISTENCY_LEVEL = "cassandra.consistency_level" +PAGINATED = "cassandra.paginated" +ROW_COUNT = "cassandra.row_count" + diff --git a/ddtrace/util.py b/ddtrace/util.py index 4d229c5ccc6..f62db068737 100644 --- a/ddtrace/util.py +++ b/ddtrace/util.py @@ -29,25 +29,25 @@ def deep_getattr(obj, attr_string, default=None): return obj -# monkey patch all the things -# subtle: -# if this is the module/class we can go yolo as methods are unbound -# and just stored like that in class __dict__ -# if this is an instance, we have to unbind the current and rebind our -# patched method - -# Also subtle: -# If patchable is an instance and if we've already patched at the module/class level -# then patchable[key] contains an already patched command! -# To workaround this, check if patchable or patchable.__class__ are _dogtraced -# If is isn't, nothing to worry about, patch the key as usual -# But if it is, search for a "__dd_orig_{key}" method on the class, which is -# the original unpatched method we wish to trace. - def safe_patch(patchable, key, patch_func, service, meta, tracer): """ takes patch_func (signature: takes the orig_method that is wrapped in the monkey patch == UNBOUND + service and meta) and attach the patched result to patchable at patchable.key + + + - if this is the module/class we can rely on methods being unbound, and just have to + update the __dict__ + + - if this is an instance, we have to unbind the current and rebind our + patched method + + - If patchable is an instance and if we've already patched at the module/class level + then patchable[key] contains an already patched command! + To workaround this, check if patchable or patchable.__class__ are _dogtraced + If is isn't, nothing to worry about, patch the key as usual + But if it is, search for a "__dd_orig_{key}" method on the class, which is + the original unpatched method we wish to trace. + """ def _get_original_method(thing, key): diff --git a/tests/contrib/cassandra/test.py b/tests/contrib/cassandra/test.py index f9bef3428db..897bec84a9b 100644 --- a/tests/contrib/cassandra/test.py +++ b/tests/contrib/cassandra/test.py @@ -1,13 +1,12 @@ import unittest from nose.tools import eq_ -# We should probably be smarter than that -try: - from cassandra.cluster import Cluster -except ImportError: - Cluster = None +from ddtrace.contrib.cassandra import missing_modules +if missing_modules: + raise unittest.SkipTest("Missing dependencies %s" % missing_modules) -from ddtrace.contrib.cassandra import trace as trace_cassandra +from cassandra.cluster import Cluster +from ddtrace.contrib.cassandra import trace as get_traced_cassandra from ddtrace.tracer import Tracer from ...test_tracer import DummyWriter @@ -45,34 +44,11 @@ def test_cassandra_instance(self): """ writer = DummyWriter() tracer = Tracer(writer=writer) - session = self.cluster.connect("test") - trace_cassandra(session, tracer) - result = session.execute(self.TEST_QUERY) - self._assert_result_correct(result) - - spans = writer.pop() - assert spans - eq_(len(spans), 1) - span = spans[0] - eq_(span.service, "cassandra") - eq_(span.resource, self.TEST_QUERY) - eq_(span.get_tag("keyspace"), self.TEST_KEYSPACE) - eq_(span.get_tag("port"), "9042") - eq_(span.get_tag("db.rowcount"), "1") - eq_(span.get_tag("out.host"), "127.0.0.1") + TracedCluster = get_traced_cluster(tracer) + session = TracedCluster(port=9042).connect() - def test_cassandra_class(self): - """ - Tests patching a cassandra Session class - """ - writer = DummyWriter() - tracer = Tracer(writer=writer) - - import cassandra.cluster - trace_cassandra(cassandra.cluster, tracer) - session = Cluster(port=9042).connect("test") result = session.execute(self.TEST_QUERY) self._assert_result_correct(result) @@ -85,13 +61,12 @@ def test_cassandra_class(self): eq_(use.service, "cassandra") eq_(use.resource, "USE %s" % self.TEST_KEYSPACE) - eq_(query.service, "cassandra") eq_(query.resource, self.TEST_QUERY) - eq_(query.get_tag("keyspace"), self.TEST_KEYSPACE) - eq_(query.get_tag("port"), "9042") - eq_(query.get_tag("db.rowcount"), "1") - eq_(query.get_tag("out.host"), "127.0.0.1") + eq_(query.get_tag(cassx.KEYSPACE), self.TEST_KEYSPACE) + eq_(query.get_tag(netx.TARGET_PORT), "9042") + eq_(query.get_tag(cassx.ROW_COUNT), "1") + eq_(query.get_tag(netx.TARGET_HOST), "127.0.0.1") def tearDown(self): self.cluster.connect().execute("DROP KEYSPACE IF EXISTS test") From be46fcbdf965333ce83c244544c851435d0760a0 Mon Sep 17 00:00:00 2001 From: talwai Date: Thu, 30 Jun 2016 18:34:44 +0200 Subject: [PATCH 6/8] trace/contrib: improve cass testing --- ddtrace/contrib/cassandra/__init__.py | 6 +++--- ddtrace/contrib/cassandra/session.py | 14 +++++++------- tests/contrib/cassandra/test.py | 13 +++++++------ 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/ddtrace/contrib/cassandra/__init__.py b/ddtrace/contrib/cassandra/__init__.py index 7a372d28071..3a35a0278ed 100644 --- a/ddtrace/contrib/cassandra/__init__.py +++ b/ddtrace/contrib/cassandra/__init__.py @@ -1,8 +1,8 @@ -from .util import require_modules +from ..util import require_modules required_modules = ['cassandra.cluster'] with require_modules(required_modules) as missing_modules: if not missing_modules: - from .session import trace - __all__ = ['trace'] + from .session import get_traced_cassandra + __all__ = ['get_traced_cassanra'] diff --git a/ddtrace/contrib/cassandra/session.py b/ddtrace/contrib/cassandra/session.py index 063a29265b3..93a73701fd8 100644 --- a/ddtrace/contrib/cassandra/session.py +++ b/ddtrace/contrib/cassandra/session.py @@ -11,10 +11,10 @@ # project from ...compat import stringify from ...util import deep_getattr, safe_patch -from ...ext import net as netx, cass as cassx +from ...ext import net as netx, cassandra as cassx # 3p -from cassandra.cluster import Session +import cassandra.cluster log = logging.getLogger(__name__) @@ -24,12 +24,12 @@ def get_traced_cassandra(tracer, service=DEFAULT_SERVICE, meta=None): - return _get_traced_cluster(cassandra.cluster, tracer, service, meta + return _get_traced_cluster(cassandra.cluster, tracer, service, meta) def _get_traced_cluster(cassandra, tracer, service="cassandra", meta=None): """ Trace synchronous cassandra commands by patching the Session class """ - class TracedSession(Session): + class TracedSession(cassandra.Session): def __init__(self, *args, **kwargs): self._datadog_tracer = kwargs.pop("datadog_tracer", None) @@ -58,14 +58,14 @@ def execute(self, query, *args, **options): class TracedCluster(cassandra.Cluster): - def connect(): + def connect(self, *args, **kwargs): cassandra.Session = functools.partial( TracedSession, datadog_tracer=tracer, datadog_service=service, datadog_tags=meta, ) - return super(TracedCluster, self).connect() + return super(TracedCluster, self).connect(*args, **kwargs) return TracedCluster @@ -93,7 +93,7 @@ def _extract_cluster_metas(cluster): metas["contact_points"] = cluster.contact_points # Use the first contact point as a persistent host if isinstance(cluster.contact_points, list) and len(cluster.contact_points) > 0: - metas[netx.TARGET_PORT] = cluster.contact_points[0] + metas[netx.TARGET_HOST] = cluster.contact_points[0] if getattr(cluster, "compression", None): metas["compression"] = cluster.compression diff --git a/tests/contrib/cassandra/test.py b/tests/contrib/cassandra/test.py index 897bec84a9b..8a5a4c51dbc 100644 --- a/tests/contrib/cassandra/test.py +++ b/tests/contrib/cassandra/test.py @@ -6,8 +6,9 @@ raise unittest.SkipTest("Missing dependencies %s" % missing_modules) from cassandra.cluster import Cluster -from ddtrace.contrib.cassandra import trace as get_traced_cassandra +from ddtrace.contrib.cassandra import get_traced_cassandra from ddtrace.tracer import Tracer +from ddtrace.ext import net as netx, cassandra as cassx from ...test_tracer import DummyWriter @@ -38,16 +39,16 @@ def _assert_result_correct(self, result): eq_(r.age, 100) eq_(r.description, "A cruel mistress") - def test_cassandra_instance(self): + def test_get_traced_cassandra(self): """ - Tests patching a cassandra Session instance + Tests a traced cassandra Cluster """ writer = DummyWriter() tracer = Tracer(writer=writer) - TracedCluster = get_traced_cluster(tracer) - session = TracedCluster(port=9042).connect() + TracedCluster = get_traced_cassandra(tracer) + session = TracedCluster(port=9042).connect(self.TEST_KEYSPACE) result = session.execute(self.TEST_QUERY) self._assert_result_correct(result) @@ -58,9 +59,9 @@ def test_cassandra_instance(self): # Should be sending one request to "USE " and another for the actual query eq_(len(spans), 2) use, query = spans[0], spans[1] - eq_(use.service, "cassandra") eq_(use.resource, "USE %s" % self.TEST_KEYSPACE) + eq_(query.service, "cassandra") eq_(query.resource, self.TEST_QUERY) eq_(query.get_tag(cassx.KEYSPACE), self.TEST_KEYSPACE) From 71dcf63b3bf34dcd2e035f7b7a3e493654e16040 Mon Sep 17 00:00:00 2001 From: talwai Date: Fri, 1 Jul 2016 10:49:13 +0200 Subject: [PATCH 7/8] trace/contrib: fix cassandra session patching, add tests --- ddtrace/contrib/cassandra/session.py | 21 +++++++------ tests/contrib/cassandra/test.py | 46 ++++++++++++++++++++++++---- 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/ddtrace/contrib/cassandra/session.py b/ddtrace/contrib/cassandra/session.py index 93a73701fd8..22ab4516876 100644 --- a/ddtrace/contrib/cassandra/session.py +++ b/ddtrace/contrib/cassandra/session.py @@ -30,11 +30,11 @@ def get_traced_cassandra(tracer, service=DEFAULT_SERVICE, meta=None): def _get_traced_cluster(cassandra, tracer, service="cassandra", meta=None): """ Trace synchronous cassandra commands by patching the Session class """ class TracedSession(cassandra.Session): + _datadog_tracer = tracer + _datadog_service = service + _datadog_tags = meta def __init__(self, *args, **kwargs): - self._datadog_tracer = kwargs.pop("datadog_tracer", None) - self._datadog_service = kwargs.pop("datadog_service", None) - self._datadog_tags = kwargs.pop("datadog_tags", None) super(TracedSession, self).__init__(*args, **kwargs) def execute(self, query, *args, **options): @@ -59,13 +59,14 @@ def execute(self, query, *args, **options): class TracedCluster(cassandra.Cluster): def connect(self, *args, **kwargs): - cassandra.Session = functools.partial( - TracedSession, - datadog_tracer=tracer, - datadog_service=service, - datadog_tags=meta, - ) - return super(TracedCluster, self).connect(*args, **kwargs) + orig = cassandra.Session + cassandra.Session = TracedSession + traced_session = super(TracedCluster, self).connect(*args, **kwargs) + + # unpatch the Session class so we don't wrap already traced sessions + cassandra.Session = orig + + return traced_session return TracedCluster diff --git a/tests/contrib/cassandra/test.py b/tests/contrib/cassandra/test.py index 8a5a4c51dbc..35a42835814 100644 --- a/tests/contrib/cassandra/test.py +++ b/tests/contrib/cassandra/test.py @@ -8,7 +8,7 @@ from cassandra.cluster import Cluster from ddtrace.contrib.cassandra import get_traced_cassandra from ddtrace.tracer import Tracer -from ddtrace.ext import net as netx, cassandra as cassx +from ddtrace.ext import net as netx, cassandra as cassx, errors as errx from ...test_tracer import DummyWriter @@ -39,15 +39,18 @@ def _assert_result_correct(self, result): eq_(r.age, 100) eq_(r.description, "A cruel mistress") - def test_get_traced_cassandra(self): - """ - Tests a traced cassandra Cluster - """ + def _traced_cluster(self): writer = DummyWriter() tracer = Tracer(writer=writer) + TracedCluster = get_traced_cassandra(tracer) + return TracedCluster, writer - TracedCluster = get_traced_cassandra(tracer) + def test_get_traced_cassandra(self): + """ + Tests a traced cassandra Cluster + """ + TracedCluster, writer = self._traced_cluster() session = TracedCluster(port=9042).connect(self.TEST_KEYSPACE) result = session.execute(self.TEST_QUERY) @@ -69,5 +72,36 @@ def test_get_traced_cassandra(self): eq_(query.get_tag(cassx.ROW_COUNT), "1") eq_(query.get_tag(netx.TARGET_HOST), "127.0.0.1") + def test_trace_with_service(self): + """ + Tests tracing with a custom service + """ + writer = DummyWriter() + tracer = Tracer(writer=writer) + TracedCluster = get_traced_cassandra(tracer, service="custom") + session = TracedCluster(port=9042).connect(self.TEST_KEYSPACE) + + result = session.execute(self.TEST_QUERY) + spans = writer.pop() + assert spans + eq_(len(spans), 2) + use, query = spans[0], spans[1] + eq_(use.service, "custom") + eq_(query.service, "custom") + + def test_trace_error(self): + TracedCluster, writer = self._traced_cluster() + session = TracedCluster(port=9042).connect(self.TEST_KEYSPACE) + + with self.assertRaises(Exception): + session.execute("select * from test.i_dont_exist limit 1") + + spans = writer.pop() + assert spans + use, query = spans[0], spans[1] + eq_(query.error, 1) + for k in (errx.ERROR_MSG, errx.ERROR_TYPE, errx.ERROR_STACK): + assert query.get_tag(k) + def tearDown(self): self.cluster.connect().execute("DROP KEYSPACE IF EXISTS test") From 1060e04a8f6bbace529cd13d5735e73823836e35 Mon Sep 17 00:00:00 2001 From: talwai Date: Fri, 1 Jul 2016 12:15:09 +0200 Subject: [PATCH 8/8] trace/contrib: add cass span type --- ddtrace/contrib/cassandra/session.py | 1 + tests/contrib/cassandra/test.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/ddtrace/contrib/cassandra/session.py b/ddtrace/contrib/cassandra/session.py index 22ab4516876..06156a0e59f 100644 --- a/ddtrace/contrib/cassandra/session.py +++ b/ddtrace/contrib/cassandra/session.py @@ -44,6 +44,7 @@ def execute(self, query, *args, **options): with self._datadog_tracer.trace("cassandra.query", service=self._datadog_service) as span: query_string = _sanitize_query(query) span.resource = query_string + span.span_type = cassx.TYPE span.set_tags(_extract_session_metas(self)) cluster = getattr(self, "cluster", None) diff --git a/tests/contrib/cassandra/test.py b/tests/contrib/cassandra/test.py index 35a42835814..c9a4d172d10 100644 --- a/tests/contrib/cassandra/test.py +++ b/tests/contrib/cassandra/test.py @@ -67,6 +67,8 @@ def test_get_traced_cassandra(self): eq_(query.service, "cassandra") eq_(query.resource, self.TEST_QUERY) + eq_(query.span_type, cassx.TYPE) + eq_(query.get_tag(cassx.KEYSPACE), self.TEST_KEYSPACE) eq_(query.get_tag(netx.TARGET_PORT), "9042") eq_(query.get_tag(cassx.ROW_COUNT), "1")