diff --git a/.gitignore b/.gitignore index e274e606a3f..0f40e2d8ecf 100644 --- a/.gitignore +++ b/.gitignore @@ -88,3 +88,6 @@ ENV/ # Rope project settings .ropeproject + +# Vim +*.swp diff --git a/circle.yml b/circle.yml index 0379c2d6573..4b5aa115354 100644 --- a/circle.yml +++ b/circle.yml @@ -9,6 +9,7 @@ dependencies: test: override: - docker run -d -p 9200:9200 elasticsearch:2.3 + - docker run -d -p 9042:9042 cassandra:3 - docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test -e POSTGRES_DB=test postgres:9.5 - python2.7 setup.py test - python3.4 setup.py test diff --git a/ddtrace/contrib/cassandra/__init__.py b/ddtrace/contrib/cassandra/__init__.py new file mode 100644 index 00000000000..3a35a0278ed --- /dev/null +++ b/ddtrace/contrib/cassandra/__init__.py @@ -0,0 +1,8 @@ +from ..util import require_modules + +required_modules = ['cassandra.cluster'] + +with require_modules(required_modules) as missing_modules: + if not missing_modules: + from .session import get_traced_cassandra + __all__ = ['get_traced_cassanra'] diff --git a/ddtrace/contrib/cassandra/session.py b/ddtrace/contrib/cassandra/session.py new file mode 100644 index 00000000000..06156a0e59f --- /dev/null +++ b/ddtrace/contrib/cassandra/session.py @@ -0,0 +1,147 @@ +""" +Trace queries along a session to a cassandra cluster +""" + +# stdlib +import functools +import inspect +import logging + + +# project +from ...compat import stringify +from ...util import deep_getattr, safe_patch +from ...ext import net as netx, cassandra as cassx + +# 3p +import cassandra.cluster + + +log = logging.getLogger(__name__) + +RESOURCE_MAX_LENGTH=5000 +DEFAULT_SERVICE = "cassandra" + + +def get_traced_cassandra(tracer, service=DEFAULT_SERVICE, meta=None): + return _get_traced_cluster(cassandra.cluster, tracer, service, meta) + + +def _get_traced_cluster(cassandra, tracer, service="cassandra", meta=None): + """ Trace synchronous cassandra commands by patching the Session class """ + class TracedSession(cassandra.Session): + _datadog_tracer = tracer + _datadog_service = service + _datadog_tags = meta + + def __init__(self, *args, **kwargs): + super(TracedSession, self).__init__(*args, **kwargs) + + def execute(self, query, *args, **options): + if not self._datadog_tracer: + return session.execute(query, *args, **options) + + with self._datadog_tracer.trace("cassandra.query", service=self._datadog_service) as span: + query_string = _sanitize_query(query) + span.resource = query_string + span.span_type = cassx.TYPE + + span.set_tags(_extract_session_metas(self)) + cluster = getattr(self, "cluster", None) + span.set_tags(_extract_cluster_metas(cluster)) + + result = None + try: + result = super(TracedSession, self).execute(query, *args, **options) + return result + finally: + span.set_tags(_extract_result_metas(result)) + + class TracedCluster(cassandra.Cluster): + + def connect(self, *args, **kwargs): + orig = cassandra.Session + cassandra.Session = TracedSession + traced_session = super(TracedCluster, self).connect(*args, **kwargs) + + # unpatch the Session class so we don't wrap already traced sessions + cassandra.Session = orig + + return traced_session + + return TracedCluster + +def _extract_session_metas(session): + metas = {} + + if getattr(session, "keyspace", None): + # NOTE the keyspace can be overridden explicitly in the query itself + # e.g. "Select * from trace.hash_to_resource" + # currently we don't account for this, which is probably fine + # since the keyspace info is contained in the query even if the metadata disagrees + metas[cassx.KEYSPACE] = session.keyspace.lower() + + return metas + +def _extract_cluster_metas(cluster): + metas = {} + if deep_getattr(cluster, "metadata.cluster_name"): + metas["cluster_name"] = cluster.metadata.cluster_name + + if getattr(cluster, "port", None): + metas[netx.TARGET_PORT] = cluster.port + + if getattr(cluster, "contact_points", None): + metas["contact_points"] = cluster.contact_points + # Use the first contact point as a persistent host + if isinstance(cluster.contact_points, list) and len(cluster.contact_points) > 0: + metas[netx.TARGET_HOST] = cluster.contact_points[0] + + if getattr(cluster, "compression", None): + metas["compression"] = cluster.compression + if getattr(cluster, "cql_version", None): + metas["cql_version"] = cluster.cql_version + + return metas + +def _extract_result_metas(result): + metas = {} + if not result: + return metas + + if deep_getattr(result, "response_future.query"): + query = result.response_future.query + + if getattr(query, "consistency_level", None): + metas[cassx.CONSISTENCY_LEVEL] = query.consistency_level + if getattr(query, "keyspace", None): + # Overrides session.keyspace if the query has been prepared against a particular + # keyspace + metas[cassx.KEYSPACE] = query.keyspace.lower() + + if hasattr(result, "has_more_pages"): + if result.has_more_pages: + metas[cassx.PAGINATED] = True + else: + metas[cassx.PAGINATED] = False + + # NOTE(aaditya): this number only reflects the first page of results + # which could be misleading. But a true count would require iterating through + # all pages which is expensive + if hasattr(result, "current_rows"): + result_rows = result.current_rows or [] + metas[cassx.ROW_COUNT] = len(result_rows) + + return metas + +def _sanitize_query(query): + """ Sanitize the query to something ready for the agent receiver + - Cast to unicode + - truncate if needed + """ + # TODO (aaditya): fix this hacky type check. we need it to avoid circular imports + if type(query).__name__ in ('SimpleStatement', 'PreparedStatement'): + # reset query if a string is available + query = getattr(query, "query_string", query) + + return stringify(query)[:RESOURCE_MAX_LENGTH] diff --git a/ddtrace/ext/cassandra.py b/ddtrace/ext/cassandra.py new file mode 100644 index 00000000000..a86bb6abc2b --- /dev/null +++ b/ddtrace/ext/cassandra.py @@ -0,0 +1,10 @@ + +# the type of the spans +TYPE = "cassandra" + +# tags +KEYSPACE = "cassandra.keyspace" +CONSISTENCY_LEVEL = "cassandra.consistency_level" +PAGINATED = "cassandra.paginated" +ROW_COUNT = "cassandra.row_count" + diff --git a/ddtrace/util.py b/ddtrace/util.py new file mode 100644 index 00000000000..f62db068737 --- /dev/null +++ b/ddtrace/util.py @@ -0,0 +1,83 @@ +""" +Generic utilities for tracers +""" + +import inspect + + +def deep_getattr(obj, attr_string, default=None): + """ + Returns the attribute of `obj` at the dotted path given by `attr_string` + If no such attribute is reachable, returns `default` + + >>> deep_getattr(cass, "cluster") + >> deep_getattr(cass, "cluster.metadata.partitioner") + u'org.apache.cassandra.dht.Murmur3Partitioner' + + >>> deep_getattr(cass, "i.dont.exist", default="default") + 'default' + """ + attrs = attr_string.split('.') + for attr in attrs: + try: + obj = getattr(obj, attr) + except AttributeError: + return default + + return obj + + +def safe_patch(patchable, key, patch_func, service, meta, tracer): + """ takes patch_func (signature: takes the orig_method that is + wrapped in the monkey patch == UNBOUND + service and meta) and + attach the patched result to patchable at patchable.key + + + - if this is the module/class we can rely on methods being unbound, and just have to + update the __dict__ + + - if this is an instance, we have to unbind the current and rebind our + patched method + + - If patchable is an instance and if we've already patched at the module/class level + then patchable[key] contains an already patched command! + To workaround this, check if patchable or patchable.__class__ are _dogtraced + If is isn't, nothing to worry about, patch the key as usual + But if it is, search for a "__dd_orig_{key}" method on the class, which is + the original unpatched method we wish to trace. + + """ + + def _get_original_method(thing, key): + orig = None + if hasattr(thing, '_dogtraced'): + # Search for original method + orig = getattr(thing, "__dd_orig_{}".format(key), None) + else: + orig = getattr(thing, key) + # Set it for the next time we attempt to patch `thing` + setattr(thing, "__dd_orig_{}".format(key), orig) + + return orig + + if inspect.isclass(patchable) or inspect.ismodule(patchable): + orig = _get_original_method(patchable, key) + if not orig: + # Should never happen + return + elif hasattr(patchable, '__class__'): + orig = _get_original_method(patchable.__class__, key) + if not orig: + # Should never happen + return + else: + return + + dest = patch_func(orig, service, meta, tracer) + + if inspect.isclass(patchable) or inspect.ismodule(patchable): + setattr(patchable, key, dest) + elif hasattr(patchable, '__class__'): + setattr(patchable, key, dest.__get__(patchable, patchable.__class__)) diff --git a/setup.py b/setup.py index 4c3370b2e9c..7873925311d 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,8 @@ 'blinker', 'elasticsearch', 'psycopg2', - 'django' + 'django', + 'cassandra-driver' ] setup( diff --git a/tests/contrib/cassandra/__init__.py b/tests/contrib/cassandra/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/contrib/cassandra/test.py b/tests/contrib/cassandra/test.py new file mode 100644 index 00000000000..c9a4d172d10 --- /dev/null +++ b/tests/contrib/cassandra/test.py @@ -0,0 +1,109 @@ +import unittest +from nose.tools import eq_ + +from ddtrace.contrib.cassandra import missing_modules +if missing_modules: + raise unittest.SkipTest("Missing dependencies %s" % missing_modules) + +from cassandra.cluster import Cluster +from ddtrace.contrib.cassandra import get_traced_cassandra +from ddtrace.tracer import Tracer +from ddtrace.ext import net as netx, cassandra as cassx, errors as errx + +from ...test_tracer import DummyWriter + +class CassandraTest(unittest.TestCase): + """Needs a running cassandra at localhost:9042""" + + TEST_QUERY = "SELECT * from test.person" + TEST_KEYSPACE = "test" + + def setUp(self): + if not Cluster: + raise unittest.SkipTest("cassandra.cluster.Cluster is not available.") + + self.cluster = Cluster(port=9042) + session = self.cluster.connect() + session.execute("""CREATE KEYSPACE test WITH REPLICATION = { + 'class' : 'SimpleStrategy', + 'replication_factor': 1 + }""") + session.execute("CREATE TABLE test.person (name text PRIMARY KEY, age int, description text)") + session.execute("""INSERT INTO test.person (name, age, description) VALUES ('Cassandra', 100, 'A cruel mistress')""") + + + def _assert_result_correct(self, result): + eq_(len(result.current_rows), 1) + for r in result: + eq_(r.name, "Cassandra") + eq_(r.age, 100) + eq_(r.description, "A cruel mistress") + + def _traced_cluster(self): + writer = DummyWriter() + tracer = Tracer(writer=writer) + TracedCluster = get_traced_cassandra(tracer) + return TracedCluster, writer + + + def test_get_traced_cassandra(self): + """ + Tests a traced cassandra Cluster + """ + TracedCluster, writer = self._traced_cluster() + session = TracedCluster(port=9042).connect(self.TEST_KEYSPACE) + + result = session.execute(self.TEST_QUERY) + self._assert_result_correct(result) + + spans = writer.pop() + assert spans + + # Should be sending one request to "USE " and another for the actual query + eq_(len(spans), 2) + use, query = spans[0], spans[1] + eq_(use.service, "cassandra") + eq_(use.resource, "USE %s" % self.TEST_KEYSPACE) + + eq_(query.service, "cassandra") + eq_(query.resource, self.TEST_QUERY) + eq_(query.span_type, cassx.TYPE) + + eq_(query.get_tag(cassx.KEYSPACE), self.TEST_KEYSPACE) + eq_(query.get_tag(netx.TARGET_PORT), "9042") + eq_(query.get_tag(cassx.ROW_COUNT), "1") + eq_(query.get_tag(netx.TARGET_HOST), "127.0.0.1") + + def test_trace_with_service(self): + """ + Tests tracing with a custom service + """ + writer = DummyWriter() + tracer = Tracer(writer=writer) + TracedCluster = get_traced_cassandra(tracer, service="custom") + session = TracedCluster(port=9042).connect(self.TEST_KEYSPACE) + + result = session.execute(self.TEST_QUERY) + spans = writer.pop() + assert spans + eq_(len(spans), 2) + use, query = spans[0], spans[1] + eq_(use.service, "custom") + eq_(query.service, "custom") + + def test_trace_error(self): + TracedCluster, writer = self._traced_cluster() + session = TracedCluster(port=9042).connect(self.TEST_KEYSPACE) + + with self.assertRaises(Exception): + session.execute("select * from test.i_dont_exist limit 1") + + spans = writer.pop() + assert spans + use, query = spans[0], spans[1] + eq_(query.error, 1) + for k in (errx.ERROR_MSG, errx.ERROR_TYPE, errx.ERROR_STACK): + assert query.get_tag(k) + + def tearDown(self): + self.cluster.connect().execute("DROP KEYSPACE IF EXISTS test")