diff --git a/btrdb/conn.py b/btrdb/conn.py index e3b6e5a..bfb0eb8 100644 --- a/btrdb/conn.py +++ b/btrdb/conn.py @@ -26,7 +26,7 @@ from btrdb.stream import Stream, StreamSet from btrdb.utils.general import unpack_stream_descriptor from btrdb.utils.conversion import to_uuid -from btrdb.exceptions import NotFound +from btrdb.exceptions import NotFound, InvalidOperation ########################################################################## ## Module Variables @@ -346,3 +346,6 @@ def collection_metadata(self, prefix): pyTags = {tag.key: tag.count for tag in tags} pyAnn = {ann.key: ann.count for ann in annotations} return pyTags, pyAnn + + def __reduce__(self): + raise InvalidOperation("BTrDB object cannot be reduced.") diff --git a/btrdb/utils/ray.py b/btrdb/utils/ray.py new file mode 100644 index 0000000..8df9527 --- /dev/null +++ b/btrdb/utils/ray.py @@ -0,0 +1,56 @@ +from functools import partial + +import ray + +import btrdb +from btrdb.conn import BTrDB + +def register_serializer(conn_str=None, apikey=None, profile=None): + """ + Register serializer for BTrDB Object + Parameters + ---------- + conn_str: str, default=None + The address and port of the cluster to connect to, e.g. `192.168.1.1:4411`. + If set to None, will look in the environment variable `$BTRDB_ENDPOINTS` + (recommended). + apikey: str, default=None + The API key used to authenticate requests (optional). If None, the key + is looked up from the environment variable `$BTRDB_API_KEY`. + profile: str, default=None + The name of a profile containing the required connection information as + found in the user's predictive grid credentials file + `~/.predictivegrid/credentials.yaml`. + """ + ray.register_custom_serializer( + BTrDB, serializer=btrdb_serializer, deserializer=partial(btrdb_deserializer, conn_str=conn_str, apikey=apikey, profile=profile)) + +def btrdb_serializer(_): + """ + sererialize function + """ + return None + +def btrdb_deserializer(_, conn_str=None, apikey=None, profile=None): + """ + deserialize function + + Parameters + ---------- + conn_str: str, default=None + The address and port of the cluster to connect to, e.g. `192.168.1.1:4411`. + If set to None, will look in the environment variable `$BTRDB_ENDPOINTS` + (recommended). + apikey: str, default=None + The API key used to authenticate requests (optional). If None, the key + is looked up from the environment variable `$BTRDB_API_KEY`. + profile: str, default=None + The name of a profile containing the required connection information as + found in the user's predictive grid credentials file + `~/.predictivegrid/credentials.yaml`. + Returns + ------- + db : BTrDB + An instance of the BTrDB context to directly interact with the database. + """ + return btrdb.connect(conn_str=conn_str, apikey=apikey, profile=profile) diff --git a/docs/source/working.rst b/docs/source/working.rst index 46d5cba..8773907 100644 --- a/docs/source/working.rst +++ b/docs/source/working.rst @@ -15,3 +15,4 @@ to interact with the BTrDB database. working/stream-view-data working/streamsets working/multiprocessing + working/ray diff --git a/docs/source/working/ray.rst b/docs/source/working/ray.rst new file mode 100644 index 0000000..98a7fbd --- /dev/null +++ b/docs/source/working/ray.rst @@ -0,0 +1,58 @@ +Working with Ray +================================ + +To use BTrDB connection, stream and streamsets objects in the parallelization library ray, +a special serializer is required. BTrDB provides a utility function that register the serializer with ray. +An example is shown below. + +Setting up the ray serializer +----------------------------- +.. code-block:: python + + import btrdb + import ray + from btrdb.utils.ray import register_serializer + + uuids = ["b19592fc-fb71-4f61-9d49-8646d4b1c2a1", + "07b2cff3-e957-4fa9-b1b3-e14d5afb1e63"] + ray.init() + + conn_params = {"profile": "profile_name"} + + # register serializer with the connection parameters + register_serializer(**conn_params) + + conn = btrdb.connect(**conn_params) + + # BTrDB connection object can be passed as an argument + # to a ray remote function + @ray.remote + def test_btrdb(conn): + print(conn.info()) + + # Stream object can be passed as an argument + # to a ray remote function + @ray.remote + def test_stream(stream): + print(stream.earliest()) + + # StreamSet object can be passed as an argument + # to a ray remote function + @ray.remote + def test_streamset(streamset): + print(streamset.earliest()) + print(streamset) + + + ids = [test_btrdb.remote(conn), + test_stream.remote(conn.stream_from_uuid(uuids[0])), + test_streamset.remote(conn.streams(*uuids))] + + ray.get(ids) + # output of test_btrdb + >>(pid=28479) {'majorVersion': 5, 'build': '5.10.5', 'proxy': {'proxyEndpoints': []}} + # output of test_stream + >>(pid=28482) (RawPoint(1533210100000000000, 0.0), 0) + # output of test_streamset + >>(pid=28481) (RawPoint(1533210100000000000, 0.0), RawPoint(1533210100000000000, 0.0)) + >>(pid=28481) StreamSet with 2 streams \ No newline at end of file