diff --git a/README.md b/README.md index cc57ac5..df7fa07 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,10 @@ See: [Server Interaction](https://github.com/pilosa/python-pilosa/blob/master/do See: [Importing and Exporting Data](https://github.com/pilosa/python-pilosa/blob/master/docs/imports.md) +### Other Documentation + +* [Tracing](docs/tracing.md) + ## Contributing See: [CONTRIBUTING](https://github.com/pilosa/python-pilosa/blob/master/CONTRIBUTING.md) diff --git a/docs/tracing.md b/docs/tracing.md new file mode 100644 index 0000000..6882250 --- /dev/null +++ b/docs/tracing.md @@ -0,0 +1,94 @@ +# Tracing + +Python-Pilosa supports distributed tracing via the [OpenTracing](https://opentracing.io/) API. + +In order to use a tracer with Python-Pilosa, you should: +1. Create the tracer, +2. Pass the `tracer=tracer_object` to `Client()`. + +In this document, we will be using the [Jaeger](https://www.jaegertracing.io) tracer, but OpenTracing has support for [other tracing systems](https://opentracing.io/docs/supported-tracers/). + +## Running the Pilosa Server + +Let's run a temporary Pilosa container: + + $ docker run -it --rm -p 10101:10101 pilosa/pilosa:v1.2.0 + +Check that you can access Pilosa: + + $ curl localhost:10101 + Welcome. Pilosa is running. Visit https://www.pilosa.com/docs/ for more information. + +## Running the Jaeger Server + +Let's run a Jaeger Server container: + + $ docker run -it --rm -p 6831:6831/udp -p 5775:5775/udp -p 16686:16686 jaegertracing/all-in-one:latest + ...Jaeger UI... + +## Writing the Sample Code + +The sample code depdends on the Jaeger Python client, so let's install it first: + + $ pip install jaeger-client + +Save the following sample code as `pypilosa-tracing.py`: +```python +from pilosa import Client +from jaeger_client import Config + + +def get_tracer(service_name): + config = Config( + config={ + 'sampler': { + 'type': 'const', + 'param': 1, + }, + 'local_agent': { + 'reporting_host': '127.0.0.1' + } + }, + service_name=service_name + ) + return config.new_tracer() + + +def main(): + # Create the tracer. + tracer = get_tracer("python_client_test") + + # Create the client, and pass the tracer. + client = Client(":10101", tracer=tracer) + + # Read the schema from the server. + # This should create a trace on the Jaeger server. + schema = client.schema() + + # Create and sync the sample schema. + # This should create a trace on the Jaeger server. + my_index = schema.index("my-index") + my_field = my_index.field("my-field") + client.sync_schema(schema) + + # Run a query on Pilosa. + # This should create a trace on the Jaeger server. + client.query(my_field.set(1, 1000)) + + tracer.close() + + +if __name__ == "__main__": + main() +``` + +## Checking the Tracing Data + +Run the sample code: + + $ python pypilosa-tracing.py + +* Open http://localhost:16686 in your web browser to visit Jaeger UI. +* Click on the *Search* tab and select `python_pilosa_test` in the *Service* dropdown on the right. +* Click on *Find Traces* button at the bottom left. +* You should see a couple of traces, such as: `Client.Query`, `Client.CreateField`, `Client.Schema`, etc. diff --git a/pilosa/client.py b/pilosa/client.py index 08c4e69..ed841e5 100644 --- a/pilosa/client.py +++ b/pilosa/client.py @@ -40,6 +40,7 @@ from datetime import datetime import urllib3 +from opentracing.tracer import Tracer from roaring import Bitmap from .exceptions import PilosaError, PilosaURIError, IndexExistsError, FieldExistsError @@ -96,7 +97,8 @@ class Client(object): def __init__(self, cluster_or_uri=None, connect_timeout=30000, socket_timeout=300000, pool_size_per_route=10, pool_size_total=100, retry_count=3, - tls_skip_verify=False, tls_ca_certificate_path="", use_manual_address=False): + tls_skip_verify=False, tls_ca_certificate_path="", use_manual_address=False, + tracer=None): """Creates a Client. :param object cluster_or_uri: A ``pilosa.Cluster`` or ``pilosa.URI` instance @@ -110,6 +112,7 @@ def __init__(self, cluster_or_uri=None, connect_timeout=30000, socket_timeout=30 :param bool tls_skip_verify: Do not verify the TLS certificate of the server (Not recommended for production) :param str tls_ca_certificate_path: Server's TLS certificate (Useful when using self-signed certificates) :param bool use_manual_address: Forces the client to use only the manual server address + :param opentracing.tracer.Tracer tracer: Set the OpenTracing tracer. See: https://opentracing.io * See `Pilosa Python Client/Server Interaction `_. """ @@ -126,6 +129,7 @@ def __init__(self, cluster_or_uri=None, connect_timeout=30000, socket_timeout=30 self.logger = logging.getLogger("pilosa") self.__coordinator_lock = threading.RLock() self.__coordinator_uri = None + self.tracer = tracer or Tracer() if cluster_or_uri is None: self.cluster = Cluster(URI()) @@ -160,27 +164,28 @@ def query(self, query, column_attrs=False, exclude_columns=False, exclude_attrs= """ serialized_query = query.serialize() request = _QueryRequest(serialized_query.query, - column_attrs=column_attrs, - exclude_columns=exclude_columns, - exclude_row_attrs=exclude_attrs, - shards=shards) + column_attrs=column_attrs, + exclude_columns=exclude_columns, + exclude_row_attrs=exclude_attrs, + shards=shards) path = "/index/%s/query" % query.index.name - try: - headers = { - "Content-Type": "application/x-protobuf", - "Accept": "application/x-protobuf", - "PQL-Version": PQL_VERSION, - } - response = self.__http_request("POST", path, - data=request.to_protobuf(), - headers=headers, - use_coordinator=serialized_query.has_keys) - warning = response.getheader("warning") - if warning: - self.logger.warning(warning) - return QueryResponse._from_protobuf(response.data) - except PilosaServerError as e: - raise PilosaError(e.content) + with self.tracer.start_span("Client.Query") as span: + try: + headers = { + "Content-Type": "application/x-protobuf", + "Accept": "application/x-protobuf", + "PQL-Version": PQL_VERSION, + } + response = self.__http_request("POST", path, + data=request.to_protobuf(), + headers=headers, + use_coordinator=serialized_query.has_keys) + warning = response.getheader("warning") + if warning: + self.logger.warning(warning) + return QueryResponse._from_protobuf(response.data) + except PilosaServerError as e: + raise PilosaError(e.content) def create_index(self, index): """Creates an index on the server using the given Index object. @@ -190,12 +195,13 @@ def create_index(self, index): """ path = "/index/%s" % index.name data = index._get_options_string() - try: - self.__http_request("POST", path, data=data) - except PilosaServerError as e: - if e.response.status == 409: - raise IndexExistsError - raise + with self.tracer.start_span("Client.CreateIndex") as scope: + try: + self.__http_request("POST", path, data=data) + except PilosaServerError as e: + if e.response.status == 409: + raise IndexExistsError + raise def delete_index(self, index): """Deletes the given index on the server. @@ -204,7 +210,8 @@ def delete_index(self, index): :raises pilosa.PilosaError: if the index does not exist """ path = "/index/%s" % index.name - self.__http_request("DELETE", path) + with self.tracer.start_span("Client.DeleteIndex") as scope: + self.__http_request("DELETE", path) def create_field(self, field): """Creates a field on the server using the given Field object. @@ -214,12 +221,13 @@ def create_field(self, field): """ data = field._get_options_string() path = "/index/%s/field/%s" % (field.index.name, field.name) - try: - self.__http_request("POST", path, data=data) - except PilosaServerError as e: - if e.response.status == 409: - raise FieldExistsError - raise + with self.tracer.start_span("Client.CreateField") as scope: + try: + self.__http_request("POST", path, data=data) + except PilosaServerError as e: + if e.response.status == 409: + raise FieldExistsError + raise def delete_field(self, field): @@ -229,7 +237,8 @@ def delete_field(self, field): :raises pilosa.PilosaError: if the field does not exist """ path = "/index/%s/field/%s" % (field.index.name, field.name) - self.__http_request("DELETE", path) + with self.tracer.start_span("Client.DeleteField") as scope: + self.__http_request("DELETE", path) def ensure_index(self, index): """Creates an index on the server if it does not exist. @@ -262,17 +271,18 @@ def schema(self): :rtype: pilosa.Schema """ schema = Schema() - for index_info in self._read_schema(): - index_options = index_info.get("options", {}) - index = schema.index(index_info["name"], - keys=index_options.get("keys", False), - track_existence=index_options.get("trackExistence", False), - shard_width=index_info.get("shardWidth", 0)) - for field_info in index_info.get("fields") or []: - if field_info["name"] in RESERVED_FIELDS: - continue - options = decode_field_meta_options(field_info) - index.field(field_info["name"], **options) + with self.tracer.start_span("Client.Schema") as scope: + for index_info in self._read_schema(): + index_options = index_info.get("options", {}) + index = schema.index(index_info["name"], + keys=index_options.get("keys", False), + track_existence=index_options.get("trackExistence", False), + shard_width=index_info.get("shardWidth", 0)) + for field_info in index_info.get("fields") or []: + if field_info["name"] in RESERVED_FIELDS: + continue + options = decode_field_meta_options(field_info) + index.field(field_info["name"], **options) return schema @@ -283,28 +293,29 @@ def sync_schema(self, schema): :param pilosa.Schema schema: Local schema to be synced """ - server_schema = self.schema() - - # find out local - remote schema - diff_schema = schema._diff(server_schema) - # create indexes and fields which doesn't exist on the server side - for index_name, index in diff_schema._indexes.items(): - if index_name not in server_schema._indexes: - self.ensure_index(index) - for field_name, field in index._fields.items(): - if field_name not in RESERVED_FIELDS: - self.ensure_field(field) - - # find out remote - local schema - diff_schema = server_schema._diff(schema) - for index_name, index in diff_schema._indexes.items(): - local_index = schema._indexes.get(index_name) - if local_index is None: - schema._indexes[index_name] = index - else: + with self.tracer.start_span("Client.SyncSchema") as scope: + server_schema = self.schema() + + # find out local - remote schema + diff_schema = schema._diff(server_schema) + # create indexes and fields which doesn't exist on the server side + for index_name, index in diff_schema._indexes.items(): + if index_name not in server_schema._indexes: + self.ensure_index(index) for field_name, field in index._fields.items(): if field_name not in RESERVED_FIELDS: - local_index._fields[field_name] = field + self.ensure_field(field) + + # find out remote - local schema + diff_schema = server_schema._diff(schema) + for index_name, index in diff_schema._indexes.items(): + local_index = schema._indexes.get(index_name) + if local_index is None: + schema._indexes[index_name] = index + else: + for field_name, field in index._fields.items(): + if field_name not in RESERVED_FIELDS: + local_index._fields[field_name] = field def import_field(self, field, bit_reader, batch_size=100000, fast_import=False, clear=False): """Imports a field using the given bit reader @@ -316,8 +327,9 @@ def import_field(self, field, bit_reader, batch_size=100000, fast_import=False, :param clear: clear bits instead of setting them """ shard_width = field.index.shard_width or DEFAULT_SHARD_WIDTH - for shard, columns in batch_columns(bit_reader, batch_size, shard_width): - self._import_data(field, shard, columns, fast_import, clear) + with self.tracer.start_span("Client.ImportField") as scope: + for shard, columns in batch_columns(bit_reader, batch_size, shard_width): + self._import_data(field, shard, columns, fast_import, clear) def http_request(self, method, path, data=None, headers=None): """Sends an HTTP request to the Pilosa server @@ -331,7 +343,9 @@ def http_request(self, method, path, data=None, headers=None): :return: HTTP response """ - return self.__http_request(method, path, data=data, headers=headers) + span = self.tracer.start_span("Client.HttpRequest") + with self.tracer.start_span(span) as scope: + return self.__http_request(method, path, data=data, headers=headers) def _import_data(self, field, shard, data, fast_import, clear): if field.field_type != "int": @@ -422,7 +436,7 @@ def __http_request(self, method, path, data=None, headers=None, use_coordinator= else: uri = "%s%s" % (self.__get_address(), path) try: - self.logger.debug("Request: %s %s %s", method, uri) + self.logger.debug("Request: %s %s", method, uri) response = self.__client.request(method, uri, body=data, headers=headers) break except urllib3.exceptions.MaxRetryError as e: diff --git a/requirements/main.txt b/requirements/main.txt index f21930c..91bf94d 100644 --- a/requirements/main.txt +++ b/requirements/main.txt @@ -1,3 +1,5 @@ urllib3==1.24.1 protobuf==3.6.1 pilosa-roaring==0.2.0 +opentracing==1.3.0 +