Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion btrdb/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from btrdb.stream import Stream, StreamSet
from btrdb.utils.general import unpack_stream_descriptor
from btrdb.utils.conversion import to_uuid
from btrdb.exceptions import NotFound
from btrdb.exceptions import NotFound, InvalidOperation

##########################################################################
## Module Variables
Expand Down Expand Up @@ -346,3 +346,6 @@ def collection_metadata(self, prefix):
pyTags = {tag.key: tag.count for tag in tags}
pyAnn = {ann.key: ann.count for ann in annotations}
return pyTags, pyAnn

def __reduce__(self):
raise InvalidOperation("BTrDB object cannot be reduced.")
56 changes: 56 additions & 0 deletions btrdb/utils/ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from functools import partial

import ray

import btrdb
from btrdb.conn import BTrDB

def register_serializer(conn_str=None, apikey=None, profile=None):
"""
Register serializer for BTrDB Object
Parameters
----------
conn_str: str, default=None
The address and port of the cluster to connect to, e.g. `192.168.1.1:4411`.
If set to None, will look in the environment variable `$BTRDB_ENDPOINTS`
(recommended).
apikey: str, default=None
The API key used to authenticate requests (optional). If None, the key
is looked up from the environment variable `$BTRDB_API_KEY`.
profile: str, default=None
The name of a profile containing the required connection information as
found in the user's predictive grid credentials file
`~/.predictivegrid/credentials.yaml`.
"""
ray.register_custom_serializer(
BTrDB, serializer=btrdb_serializer, deserializer=partial(btrdb_deserializer, conn_str=conn_str, apikey=apikey, profile=profile))

def btrdb_serializer(_):
"""
sererialize function
"""
return None

def btrdb_deserializer(_, conn_str=None, apikey=None, profile=None):
"""
deserialize function

Parameters
----------
conn_str: str, default=None
The address and port of the cluster to connect to, e.g. `192.168.1.1:4411`.
If set to None, will look in the environment variable `$BTRDB_ENDPOINTS`
(recommended).
apikey: str, default=None
The API key used to authenticate requests (optional). If None, the key
is looked up from the environment variable `$BTRDB_API_KEY`.
profile: str, default=None
The name of a profile containing the required connection information as
found in the user's predictive grid credentials file
`~/.predictivegrid/credentials.yaml`.
Returns
-------
db : BTrDB
An instance of the BTrDB context to directly interact with the database.
"""
return btrdb.connect(conn_str=conn_str, apikey=apikey, profile=profile)
1 change: 1 addition & 0 deletions docs/source/working.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ to interact with the BTrDB database.
working/stream-view-data
working/streamsets
working/multiprocessing
working/ray
58 changes: 58 additions & 0 deletions docs/source/working/ray.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
Working with Ray
================================

To use BTrDB connection, stream and streamsets objects in the parallelization library ray,
a special serializer is required. BTrDB provides a utility function that register the serializer with ray.
An example is shown below.

Setting up the ray serializer
-----------------------------
.. code-block:: python

import btrdb
import ray
from btrdb.utils.ray import register_serializer

uuids = ["b19592fc-fb71-4f61-9d49-8646d4b1c2a1",
"07b2cff3-e957-4fa9-b1b3-e14d5afb1e63"]
ray.init()

conn_params = {"profile": "profile_name"}

# register serializer with the connection parameters
register_serializer(**conn_params)

conn = btrdb.connect(**conn_params)

# BTrDB connection object can be passed as an argument
# to a ray remote function
@ray.remote
def test_btrdb(conn):
print(conn.info())

# Stream object can be passed as an argument
# to a ray remote function
@ray.remote
def test_stream(stream):
print(stream.earliest())

# StreamSet object can be passed as an argument
# to a ray remote function
@ray.remote
def test_streamset(streamset):
print(streamset.earliest())
print(streamset)


ids = [test_btrdb.remote(conn),
test_stream.remote(conn.stream_from_uuid(uuids[0])),
test_streamset.remote(conn.streams(*uuids))]

ray.get(ids)
# output of test_btrdb
>>(pid=28479) {'majorVersion': 5, 'build': '5.10.5', 'proxy': {'proxyEndpoints': []}}
# output of test_stream
>>(pid=28482) (RawPoint(1533210100000000000, 0.0), 0)
# output of test_streamset
>>(pid=28481) (RawPoint(1533210100000000000, 0.0), RawPoint(1533210100000000000, 0.0))
>>(pid=28481) StreamSet with 2 streams