Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,6 @@ ENV/

# Rope project settings
.ropeproject

# Vim
*.swp
1 change: 1 addition & 0 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies:
test:
override:
- docker run -d -p 9200:9200 elasticsearch:2.3
- docker run -d -p 9042:9042 cassandra:3
- docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test -e POSTGRES_DB=test postgres:9.5
- python2.7 setup.py test
- python3.4 setup.py test
8 changes: 8 additions & 0 deletions ddtrace/contrib/cassandra/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from ..util import require_modules

required_modules = ['cassandra.cluster']

with require_modules(required_modules) as missing_modules:
if not missing_modules:
from .session import get_traced_cassandra
__all__ = ['get_traced_cassanra']
147 changes: 147 additions & 0 deletions ddtrace/contrib/cassandra/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""
Trace queries along a session to a cassandra cluster
"""

# stdlib
import functools
import inspect
import logging


# project
from ...compat import stringify
from ...util import deep_getattr, safe_patch
from ...ext import net as netx, cassandra as cassx

# 3p
import cassandra.cluster


log = logging.getLogger(__name__)

RESOURCE_MAX_LENGTH=5000
DEFAULT_SERVICE = "cassandra"


def get_traced_cassandra(tracer, service=DEFAULT_SERVICE, meta=None):
return _get_traced_cluster(cassandra.cluster, tracer, service, meta)


def _get_traced_cluster(cassandra, tracer, service="cassandra", meta=None):
""" Trace synchronous cassandra commands by patching the Session class """
class TracedSession(cassandra.Session):
_datadog_tracer = tracer
_datadog_service = service
_datadog_tags = meta

def __init__(self, *args, **kwargs):
super(TracedSession, self).__init__(*args, **kwargs)

def execute(self, query, *args, **options):
if not self._datadog_tracer:
return session.execute(query, *args, **options)

with self._datadog_tracer.trace("cassandra.query", service=self._datadog_service) as span:
query_string = _sanitize_query(query)
span.resource = query_string
span.span_type = cassx.TYPE

span.set_tags(_extract_session_metas(self))
cluster = getattr(self, "cluster", None)
span.set_tags(_extract_cluster_metas(cluster))

result = None
try:
result = super(TracedSession, self).execute(query, *args, **options)
return result
finally:
span.set_tags(_extract_result_metas(result))

class TracedCluster(cassandra.Cluster):

def connect(self, *args, **kwargs):
orig = cassandra.Session
cassandra.Session = TracedSession
traced_session = super(TracedCluster, self).connect(*args, **kwargs)

# unpatch the Session class so we don't wrap already traced sessions
cassandra.Session = orig

return traced_session

return TracedCluster

def _extract_session_metas(session):
metas = {}

if getattr(session, "keyspace", None):
# NOTE the keyspace can be overridden explicitly in the query itself
# e.g. "Select * from trace.hash_to_resource"
# currently we don't account for this, which is probably fine
# since the keyspace info is contained in the query even if the metadata disagrees
metas[cassx.KEYSPACE] = session.keyspace.lower()

return metas

def _extract_cluster_metas(cluster):
metas = {}
if deep_getattr(cluster, "metadata.cluster_name"):
metas["cluster_name"] = cluster.metadata.cluster_name

if getattr(cluster, "port", None):
metas[netx.TARGET_PORT] = cluster.port

if getattr(cluster, "contact_points", None):
metas["contact_points"] = cluster.contact_points
# Use the first contact point as a persistent host
if isinstance(cluster.contact_points, list) and len(cluster.contact_points) > 0:
metas[netx.TARGET_HOST] = cluster.contact_points[0]

if getattr(cluster, "compression", None):
metas["compression"] = cluster.compression
if getattr(cluster, "cql_version", None):
metas["cql_version"] = cluster.cql_version

return metas

def _extract_result_metas(result):
metas = {}
if not result:
return metas

if deep_getattr(result, "response_future.query"):
query = result.response_future.query

if getattr(query, "consistency_level", None):
metas[cassx.CONSISTENCY_LEVEL] = query.consistency_level
if getattr(query, "keyspace", None):
# Overrides session.keyspace if the query has been prepared against a particular
# keyspace
metas[cassx.KEYSPACE] = query.keyspace.lower()

if hasattr(result, "has_more_pages"):
if result.has_more_pages:
metas[cassx.PAGINATED] = True
else:
metas[cassx.PAGINATED] = False

# NOTE(aaditya): this number only reflects the first page of results
# which could be misleading. But a true count would require iterating through
# all pages which is expensive
if hasattr(result, "current_rows"):
result_rows = result.current_rows or []
metas[cassx.ROW_COUNT] = len(result_rows)

return metas

def _sanitize_query(query):
""" Sanitize the query to something ready for the agent receiver
- Cast to unicode
- truncate if needed
"""
# TODO (aaditya): fix this hacky type check. we need it to avoid circular imports
Copy link
Contributor

@clutchski clutchski Jun 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about

cql = getattr(query, 'query_string', None) or query

if type(query).__name__ in ('SimpleStatement', 'PreparedStatement'):
# reset query if a string is available
query = getattr(query, "query_string", query)

return stringify(query)[:RESOURCE_MAX_LENGTH]
10 changes: 10 additions & 0 deletions ddtrace/ext/cassandra.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

# the type of the spans
TYPE = "cassandra"

# tags
KEYSPACE = "cassandra.keyspace"
CONSISTENCY_LEVEL = "cassandra.consistency_level"
PAGINATED = "cassandra.paginated"
ROW_COUNT = "cassandra.row_count"

83 changes: 83 additions & 0 deletions ddtrace/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Generic utilities for tracers
"""

import inspect


def deep_getattr(obj, attr_string, default=None):
"""
Returns the attribute of `obj` at the dotted path given by `attr_string`
If no such attribute is reachable, returns `default`

>>> deep_getattr(cass, "cluster")
<cassandra.cluster.Cluster object at 0xa20c350

>>> deep_getattr(cass, "cluster.metadata.partitioner")
u'org.apache.cassandra.dht.Murmur3Partitioner'

>>> deep_getattr(cass, "i.dont.exist", default="default")
'default'
"""
attrs = attr_string.split('.')
for attr in attrs:
try:
obj = getattr(obj, attr)
except AttributeError:
return default

return obj


def safe_patch(patchable, key, patch_func, service, meta, tracer):
""" takes patch_func (signature: takes the orig_method that is
wrapped in the monkey patch == UNBOUND + service and meta) and
attach the patched result to patchable at patchable.key


- if this is the module/class we can rely on methods being unbound, and just have to
update the __dict__

- if this is an instance, we have to unbind the current and rebind our
patched method

- If patchable is an instance and if we've already patched at the module/class level
then patchable[key] contains an already patched command!
To workaround this, check if patchable or patchable.__class__ are _dogtraced
If is isn't, nothing to worry about, patch the key as usual
But if it is, search for a "__dd_orig_{key}" method on the class, which is
the original unpatched method we wish to trace.

"""

def _get_original_method(thing, key):
orig = None
if hasattr(thing, '_dogtraced'):
# Search for original method
orig = getattr(thing, "__dd_orig_{}".format(key), None)
else:
orig = getattr(thing, key)
# Set it for the next time we attempt to patch `thing`
setattr(thing, "__dd_orig_{}".format(key), orig)

return orig

if inspect.isclass(patchable) or inspect.ismodule(patchable):
orig = _get_original_method(patchable, key)
if not orig:
# Should never happen
return
elif hasattr(patchable, '__class__'):
orig = _get_original_method(patchable.__class__, key)
if not orig:
# Should never happen
return
else:
return

dest = patch_func(orig, service, meta, tracer)

if inspect.isclass(patchable) or inspect.ismodule(patchable):
setattr(patchable, key, dest)
elif hasattr(patchable, '__class__'):
setattr(patchable, key, dest.__get__(patchable, patchable.__class__))
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
'blinker',
'elasticsearch',
'psycopg2',
'django'
'django',
'cassandra-driver'
]

setup(
Expand Down
Empty file.
109 changes: 109 additions & 0 deletions tests/contrib/cassandra/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import unittest
from nose.tools import eq_

from ddtrace.contrib.cassandra import missing_modules
if missing_modules:
raise unittest.SkipTest("Missing dependencies %s" % missing_modules)

from cassandra.cluster import Cluster
from ddtrace.contrib.cassandra import get_traced_cassandra
from ddtrace.tracer import Tracer
from ddtrace.ext import net as netx, cassandra as cassx, errors as errx

from ...test_tracer import DummyWriter

class CassandraTest(unittest.TestCase):
"""Needs a running cassandra at localhost:9042"""

TEST_QUERY = "SELECT * from test.person"
TEST_KEYSPACE = "test"

def setUp(self):
if not Cluster:
raise unittest.SkipTest("cassandra.cluster.Cluster is not available.")

self.cluster = Cluster(port=9042)
session = self.cluster.connect()
session.execute("""CREATE KEYSPACE test WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor': 1
}""")
session.execute("CREATE TABLE test.person (name text PRIMARY KEY, age int, description text)")
session.execute("""INSERT INTO test.person (name, age, description) VALUES ('Cassandra', 100, 'A cruel mistress')""")


def _assert_result_correct(self, result):
eq_(len(result.current_rows), 1)
for r in result:
eq_(r.name, "Cassandra")
eq_(r.age, 100)
eq_(r.description, "A cruel mistress")

def _traced_cluster(self):
writer = DummyWriter()
tracer = Tracer(writer=writer)
TracedCluster = get_traced_cassandra(tracer)
return TracedCluster, writer


def test_get_traced_cassandra(self):
"""
Tests a traced cassandra Cluster
"""
TracedCluster, writer = self._traced_cluster()
session = TracedCluster(port=9042).connect(self.TEST_KEYSPACE)

result = session.execute(self.TEST_QUERY)
self._assert_result_correct(result)

spans = writer.pop()
assert spans

# Should be sending one request to "USE <keyspace>" and another for the actual query
eq_(len(spans), 2)
use, query = spans[0], spans[1]
eq_(use.service, "cassandra")
eq_(use.resource, "USE %s" % self.TEST_KEYSPACE)

eq_(query.service, "cassandra")
eq_(query.resource, self.TEST_QUERY)
eq_(query.span_type, cassx.TYPE)

eq_(query.get_tag(cassx.KEYSPACE), self.TEST_KEYSPACE)
eq_(query.get_tag(netx.TARGET_PORT), "9042")
eq_(query.get_tag(cassx.ROW_COUNT), "1")
eq_(query.get_tag(netx.TARGET_HOST), "127.0.0.1")

def test_trace_with_service(self):
"""
Tests tracing with a custom service
"""
writer = DummyWriter()
tracer = Tracer(writer=writer)
TracedCluster = get_traced_cassandra(tracer, service="custom")
session = TracedCluster(port=9042).connect(self.TEST_KEYSPACE)

result = session.execute(self.TEST_QUERY)
spans = writer.pop()
assert spans
eq_(len(spans), 2)
use, query = spans[0], spans[1]
eq_(use.service, "custom")
eq_(query.service, "custom")

def test_trace_error(self):
TracedCluster, writer = self._traced_cluster()
session = TracedCluster(port=9042).connect(self.TEST_KEYSPACE)

with self.assertRaises(Exception):
session.execute("select * from test.i_dont_exist limit 1")

spans = writer.pop()
assert spans
use, query = spans[0], spans[1]
eq_(query.error, 1)
for k in (errx.ERROR_MSG, errx.ERROR_TYPE, errx.ERROR_STACK):
assert query.get_tag(k)

def tearDown(self):
self.cluster.connect().execute("DROP KEYSPACE IF EXISTS test")