Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ See: [Server Interaction](https://github.com/pilosa/python-pilosa/blob/master/do

See: [Importing and Exporting Data](https://github.com/pilosa/python-pilosa/blob/master/docs/imports.md)

### Other Documentation

* [Tracing](docs/tracing.md)

## Contributing

See: [CONTRIBUTING](https://github.com/pilosa/python-pilosa/blob/master/CONTRIBUTING.md)
Expand Down
94 changes: 94 additions & 0 deletions docs/tracing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Tracing

Python-Pilosa supports distributed tracing via the [OpenTracing](https://opentracing.io/) API.

In order to use a tracer with Python-Pilosa, you should:
1. Create the tracer,
2. Pass the `tracer=tracer_object` to `Client()`.

In this document, we will be using the [Jaeger](https://www.jaegertracing.io) tracer, but OpenTracing has support for [other tracing systems](https://opentracing.io/docs/supported-tracers/).

## Running the Pilosa Server

Let's run a temporary Pilosa container:

$ docker run -it --rm -p 10101:10101 pilosa/pilosa:v1.2.0

Check that you can access Pilosa:

$ curl localhost:10101
Welcome. Pilosa is running. Visit https://www.pilosa.com/docs/ for more information.

## Running the Jaeger Server

Let's run a Jaeger Server container:

$ docker run -it --rm -p 6831:6831/udp -p 5775:5775/udp -p 16686:16686 jaegertracing/all-in-one:latest
...<title>Jaeger UI</title>...

## Writing the Sample Code

The sample code depdends on the Jaeger Python client, so let's install it first:

$ pip install jaeger-client

Save the following sample code as `pypilosa-tracing.py`:
```python
from pilosa import Client
from jaeger_client import Config


def get_tracer(service_name):
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'local_agent': {
'reporting_host': '127.0.0.1'
}
},
service_name=service_name
)
return config.new_tracer()


def main():
# Create the tracer.
tracer = get_tracer("python_client_test")

# Create the client, and pass the tracer.
client = Client(":10101", tracer=tracer)

# Read the schema from the server.
# This should create a trace on the Jaeger server.
schema = client.schema()

# Create and sync the sample schema.
# This should create a trace on the Jaeger server.
my_index = schema.index("my-index")
my_field = my_index.field("my-field")
client.sync_schema(schema)

# Run a query on Pilosa.
# This should create a trace on the Jaeger server.
client.query(my_field.set(1, 1000))

tracer.close()


if __name__ == "__main__":
main()
```

## Checking the Tracing Data

Run the sample code:

$ python pypilosa-tracing.py

* Open http://localhost:16686 in your web browser to visit Jaeger UI.
* Click on the *Search* tab and select `python_pilosa_test` in the *Service* dropdown on the right.
* Click on *Find Traces* button at the bottom left.
* You should see a couple of traces, such as: `Client.Query`, `Client.CreateField`, `Client.Schema`, etc.
154 changes: 84 additions & 70 deletions pilosa/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from datetime import datetime

import urllib3
from opentracing.tracer import Tracer
from roaring import Bitmap

from .exceptions import PilosaError, PilosaURIError, IndexExistsError, FieldExistsError
Expand Down Expand Up @@ -96,7 +97,8 @@ class Client(object):

def __init__(self, cluster_or_uri=None, connect_timeout=30000, socket_timeout=300000,
pool_size_per_route=10, pool_size_total=100, retry_count=3,
tls_skip_verify=False, tls_ca_certificate_path="", use_manual_address=False):
tls_skip_verify=False, tls_ca_certificate_path="", use_manual_address=False,
tracer=None):
"""Creates a Client.

:param object cluster_or_uri: A ``pilosa.Cluster`` or ``pilosa.URI` instance
Expand All @@ -110,6 +112,7 @@ def __init__(self, cluster_or_uri=None, connect_timeout=30000, socket_timeout=30
:param bool tls_skip_verify: Do not verify the TLS certificate of the server (Not recommended for production)
:param str tls_ca_certificate_path: Server's TLS certificate (Useful when using self-signed certificates)
:param bool use_manual_address: Forces the client to use only the manual server address
:param opentracing.tracer.Tracer tracer: Set the OpenTracing tracer. See: https://opentracing.io

* See `Pilosa Python Client/Server Interaction <https://github.com/pilosa/python-pilosa/blob/master/docs/server-interaction.md>`_.
"""
Expand All @@ -126,6 +129,7 @@ def __init__(self, cluster_or_uri=None, connect_timeout=30000, socket_timeout=30
self.logger = logging.getLogger("pilosa")
self.__coordinator_lock = threading.RLock()
self.__coordinator_uri = None
self.tracer = tracer or Tracer()

if cluster_or_uri is None:
self.cluster = Cluster(URI())
Expand Down Expand Up @@ -160,27 +164,28 @@ def query(self, query, column_attrs=False, exclude_columns=False, exclude_attrs=
"""
serialized_query = query.serialize()
request = _QueryRequest(serialized_query.query,
column_attrs=column_attrs,
exclude_columns=exclude_columns,
exclude_row_attrs=exclude_attrs,
shards=shards)
column_attrs=column_attrs,
exclude_columns=exclude_columns,
exclude_row_attrs=exclude_attrs,
shards=shards)
path = "/index/%s/query" % query.index.name
try:
headers = {
"Content-Type": "application/x-protobuf",
"Accept": "application/x-protobuf",
"PQL-Version": PQL_VERSION,
}
response = self.__http_request("POST", path,
data=request.to_protobuf(),
headers=headers,
use_coordinator=serialized_query.has_keys)
warning = response.getheader("warning")
if warning:
self.logger.warning(warning)
return QueryResponse._from_protobuf(response.data)
except PilosaServerError as e:
raise PilosaError(e.content)
with self.tracer.start_span("Client.Query") as span:
try:
headers = {
"Content-Type": "application/x-protobuf",
"Accept": "application/x-protobuf",
"PQL-Version": PQL_VERSION,
}
response = self.__http_request("POST", path,
data=request.to_protobuf(),
headers=headers,
use_coordinator=serialized_query.has_keys)
warning = response.getheader("warning")
if warning:
self.logger.warning(warning)
return QueryResponse._from_protobuf(response.data)
except PilosaServerError as e:
raise PilosaError(e.content)

def create_index(self, index):
"""Creates an index on the server using the given Index object.
Expand All @@ -190,12 +195,13 @@ def create_index(self, index):
"""
path = "/index/%s" % index.name
data = index._get_options_string()
try:
self.__http_request("POST", path, data=data)
except PilosaServerError as e:
if e.response.status == 409:
raise IndexExistsError
raise
with self.tracer.start_span("Client.CreateIndex") as scope:
try:
self.__http_request("POST", path, data=data)
except PilosaServerError as e:
if e.response.status == 409:
raise IndexExistsError
raise

def delete_index(self, index):
"""Deletes the given index on the server.
Expand All @@ -204,7 +210,8 @@ def delete_index(self, index):
:raises pilosa.PilosaError: if the index does not exist
"""
path = "/index/%s" % index.name
self.__http_request("DELETE", path)
with self.tracer.start_span("Client.DeleteIndex") as scope:
self.__http_request("DELETE", path)

def create_field(self, field):
"""Creates a field on the server using the given Field object.
Expand All @@ -214,12 +221,13 @@ def create_field(self, field):
"""
data = field._get_options_string()
path = "/index/%s/field/%s" % (field.index.name, field.name)
try:
self.__http_request("POST", path, data=data)
except PilosaServerError as e:
if e.response.status == 409:
raise FieldExistsError
raise
with self.tracer.start_span("Client.CreateField") as scope:
try:
self.__http_request("POST", path, data=data)
except PilosaServerError as e:
if e.response.status == 409:
raise FieldExistsError
raise


def delete_field(self, field):
Expand All @@ -229,7 +237,8 @@ def delete_field(self, field):
:raises pilosa.PilosaError: if the field does not exist
"""
path = "/index/%s/field/%s" % (field.index.name, field.name)
self.__http_request("DELETE", path)
with self.tracer.start_span("Client.DeleteField") as scope:
self.__http_request("DELETE", path)

def ensure_index(self, index):
"""Creates an index on the server if it does not exist.
Expand Down Expand Up @@ -262,17 +271,18 @@ def schema(self):
:rtype: pilosa.Schema
"""
schema = Schema()
for index_info in self._read_schema():
index_options = index_info.get("options", {})
index = schema.index(index_info["name"],
keys=index_options.get("keys", False),
track_existence=index_options.get("trackExistence", False),
shard_width=index_info.get("shardWidth", 0))
for field_info in index_info.get("fields") or []:
if field_info["name"] in RESERVED_FIELDS:
continue
options = decode_field_meta_options(field_info)
index.field(field_info["name"], **options)
with self.tracer.start_span("Client.Schema") as scope:
for index_info in self._read_schema():
index_options = index_info.get("options", {})
index = schema.index(index_info["name"],
keys=index_options.get("keys", False),
track_existence=index_options.get("trackExistence", False),
shard_width=index_info.get("shardWidth", 0))
for field_info in index_info.get("fields") or []:
if field_info["name"] in RESERVED_FIELDS:
continue
options = decode_field_meta_options(field_info)
index.field(field_info["name"], **options)

return schema

Expand All @@ -283,28 +293,29 @@ def sync_schema(self, schema):

:param pilosa.Schema schema: Local schema to be synced
"""
server_schema = self.schema()

# find out local - remote schema
diff_schema = schema._diff(server_schema)
# create indexes and fields which doesn't exist on the server side
for index_name, index in diff_schema._indexes.items():
if index_name not in server_schema._indexes:
self.ensure_index(index)
for field_name, field in index._fields.items():
if field_name not in RESERVED_FIELDS:
self.ensure_field(field)

# find out remote - local schema
diff_schema = server_schema._diff(schema)
for index_name, index in diff_schema._indexes.items():
local_index = schema._indexes.get(index_name)
if local_index is None:
schema._indexes[index_name] = index
else:
with self.tracer.start_span("Client.SyncSchema") as scope:
server_schema = self.schema()

# find out local - remote schema
diff_schema = schema._diff(server_schema)
# create indexes and fields which doesn't exist on the server side
for index_name, index in diff_schema._indexes.items():
if index_name not in server_schema._indexes:
self.ensure_index(index)
for field_name, field in index._fields.items():
if field_name not in RESERVED_FIELDS:
local_index._fields[field_name] = field
self.ensure_field(field)

# find out remote - local schema
diff_schema = server_schema._diff(schema)
for index_name, index in diff_schema._indexes.items():
local_index = schema._indexes.get(index_name)
if local_index is None:
schema._indexes[index_name] = index
else:
for field_name, field in index._fields.items():
if field_name not in RESERVED_FIELDS:
local_index._fields[field_name] = field

def import_field(self, field, bit_reader, batch_size=100000, fast_import=False, clear=False):
"""Imports a field using the given bit reader
Expand All @@ -316,8 +327,9 @@ def import_field(self, field, bit_reader, batch_size=100000, fast_import=False,
:param clear: clear bits instead of setting them
"""
shard_width = field.index.shard_width or DEFAULT_SHARD_WIDTH
for shard, columns in batch_columns(bit_reader, batch_size, shard_width):
self._import_data(field, shard, columns, fast_import, clear)
with self.tracer.start_span("Client.ImportField") as scope:
for shard, columns in batch_columns(bit_reader, batch_size, shard_width):
self._import_data(field, shard, columns, fast_import, clear)

def http_request(self, method, path, data=None, headers=None):
"""Sends an HTTP request to the Pilosa server
Expand All @@ -331,7 +343,9 @@ def http_request(self, method, path, data=None, headers=None):
:return: HTTP response

"""
return self.__http_request(method, path, data=data, headers=headers)
span = self.tracer.start_span("Client.HttpRequest")
with self.tracer.start_span(span) as scope:
return self.__http_request(method, path, data=data, headers=headers)

def _import_data(self, field, shard, data, fast_import, clear):
if field.field_type != "int":
Expand Down Expand Up @@ -422,7 +436,7 @@ def __http_request(self, method, path, data=None, headers=None, use_coordinator=
else:
uri = "%s%s" % (self.__get_address(), path)
try:
self.logger.debug("Request: %s %s %s", method, uri)
self.logger.debug("Request: %s %s", method, uri)
response = self.__client.request(method, uri, body=data, headers=headers)
break
except urllib3.exceptions.MaxRetryError as e:
Expand Down
2 changes: 2 additions & 0 deletions requirements/main.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
urllib3==1.24.1
protobuf==3.6.1
pilosa-roaring==0.2.0
opentracing==1.3.0