Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions btrdb/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def obliterate(self, uu):
result = self.stub.Obliterate(params)
BTrDBError.checkProtoStat(result.stat)

def setStreamAnnotations(self, uu, expected, changes):
def setStreamAnnotations(self, uu, expected, changes, removals):
annkvlist = []
for k, v in changes.items():
if v is None:
Expand All @@ -77,7 +77,7 @@ def setStreamAnnotations(self, uu, expected, changes):
ov = btrdb_pb2.OptValue(value = v)
kv = btrdb_pb2.KeyOptValue(key = k, val = ov)
annkvlist.append(kv)
params = btrdb_pb2.SetStreamAnnotationsParams(uuid=uu.bytes, expectedPropertyVersion=expected, changes=annkvlist)
params = btrdb_pb2.SetStreamAnnotationsParams(uuid=uu.bytes, expectedPropertyVersion=expected, changes=annkvlist, removals=removals)
result = self.stub.SetStreamAnnotations(params)
BTrDBError.checkProtoStat(result.stat)

Expand Down
64 changes: 48 additions & 16 deletions btrdb/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def exists(self):
Returns
-------
bool
Indicates whether stream exists.
Indicates whether stream is extant in the BTrDB server.
"""

if self._known_to_exist:
Expand Down Expand Up @@ -425,7 +425,7 @@ def _update_tags_collection(self, tags, collection):
collection=collection
)

def _update_annotations(self, annotations, encoder):
def _update_annotations(self, annotations, encoder, replace):
# make a copy of the annotations to prevent accidental mutable object mutation
serialized = deepcopy(annotations)
if encoder is not None:
Expand All @@ -434,32 +434,64 @@ def _update_annotations(self, annotations, encoder):
for k, v in serialized.items()
}

removals = []
if replace:
removals = [i for i in self._annotations.keys() if i not in annotations.keys()]

self._btrdb.ep.setStreamAnnotations(
uu=self.uuid,
expected=self._property_version,
changes=serialized
changes=serialized,
removals=removals
)

def update(self, tags=None, annotations=None, collection=None, encoder=AnnotationEncoder):
def update(self, tags=None, annotations=None, collection=None, encoder=AnnotationEncoder, replace=False):
"""
Updates metadata including tags, annotations, and collection.
Updates metadata including tags, annotations, and collection as an
UPSERT operation.

By default, the update will only affect the keys and values in the
specified tags and annotations dictionaries, inserting them if they
don't exist, or updating the value for the key if it does exist. If
any of the update arguments (i.e. tags, annotations, collection) are
None, they will remain unchanged in the database.

To delete either tags or annotations, you must specify exactly which
keys and values you want set for the field and set `replace=True`. For
example:

>>> annotations, _ = stream.anotations()
>>> del annotations["key_to_delete"]
>>> stream.update(annotations=annotations, replace=True)

This ensures that all of the keys and values for the annotations are
preserved except for the key to be deleted.

Parameters
----------
tags: dict
dict of tag information for the stream.
annotations: dict
dict of annotation information for the stream.
collection: str
The collection prefix for a stream
encoder: json.JSONEncoder or None
JSON encoder to class to use for annotation serializations, set to
None to prevent JSON encoding of the annotations.
-----------
tags : dict, optional
Specify the tag key/value pairs as a dictionary to upsert or
replace. If None, the tags will remain unchanged in the database.
annotations : dict, optional
Specify the annotations key/value pairs as a dictionary to upsert
or replace. If None, the annotations will remain unchanged in the
database.
collection : str, optional
Specify a new collection for the stream. If None, the collection
will remain unchanged.
encoder : json.JSONEncoder or None
JSON encoder class to use for annotation serialization. Set to None
to prevent JSON encoding of the annotations.
replace : bool, default: False
Replace all annotations or tags with the specified dictionaries
instead of performing the normal upsert operation. Specifying True
is the only way to remove annotation keys.

Returns
-------
int
The version of the metadata (separate from the version of the data)
also known as the "property version".

"""
if tags is None and annotations is None and collection is None:
Expand All @@ -479,7 +511,7 @@ def update(self, tags=None, annotations=None, collection=None, encoder=Annotatio
self.refresh_metadata()

if annotations is not None:
self._update_annotations(annotations, encoder)
self._update_annotations(annotations, encoder, replace)
self.refresh_metadata()

return self._property_version
Expand Down
29 changes: 26 additions & 3 deletions docs/source/working/stream-manage-metadata.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ is also returned when asking for annotations. This version number is incremente
whenever metadata (tags, annotations, collection, etc.) are updated but not when
making changes to the underlying time series data.

By default the method will attempt to provide a cached copy of the annotations
however you can request the latest version from the server using the `refresh`
argument.

.. code-block:: python

stream.annotations(refresh=True)
Expand All @@ -76,8 +80,14 @@ just a convenience as this value can also be found within the tags.
Updating Metadata
----------------------------
An :code:`update` method is available if you would like to make changes to
the tags, annotations, or collection. Note that a single operation could make
multiple updates to the property version.
the tags, annotations, or collection. By default, all updates are implemented
as an UPSERT operation and a single change could result in multiple increments
to the property version (the version of the metadata).

Upon calling this method, the library will first verify that the local property version of your
stream object matches the version found on the server. If the two versions
do not match then you will not be allowed to perform an update as this implies
that the data has already been changed by another user or process.

.. code-block:: python

Expand All @@ -87,4 +97,17 @@ multiple updates to the property version.
'state': 'VT',
'created': '2018-01-01 12:42:03 -0500'
}
prop_version = stream.update(collection=collection, annotations=annotations)
property_version = stream.update(
collection=collection,
annotations=annotations
)

If you would like to remove any keys from your annotations you must use the `replace=True` keyword argument. This will ensure that the annotations dictionary you provide completely replaces the existing values rather than perform an UPSERT operation. The example below shows how you could remove an existing key from the annotations dictionary.

.. code-block:: python

annotations, _ = stream.anotations()
del annotations["key_to_delete"]
stream.update(annotations=annotations, replace=True)


39 changes: 38 additions & 1 deletion tests/btrdb/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ def test_update_annotations(self):
'frequency': '30',
'control': '2019-11-07 13:21:23.000000-0500',
"calibrate": '{"racf": 1.8, "pacf": 0.005}',
}
},
removals=[],
)
stream._btrdb.ep.setStreamTags.assert_not_called()

Expand Down Expand Up @@ -327,6 +328,9 @@ def test_update_annotations_nested_conversions(self):
)

def test_update_annotations_no_encoder(self):
"""
Assert update annotations works with None as encoder argument
"""
uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a')
endpoint = Mock(Endpoint)
endpoint.streamInfo = Mock(return_value=("koala", 42, {}, {}, None))
Expand All @@ -340,11 +344,44 @@ def test_update_annotations_no_encoder(self):
uu=uu,
expected=42,
changes=annotations,
removals=[],
)

# TODO: mock json.dumps
# assert mock_dumps.assert_not_called()

def test_update_annotations_replace(self):
"""
Assert that replace argument will add proper keys to removals array in
endpoint call.
"""
uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a')
endpoint = Mock(Endpoint)
endpoint.streamInfo = Mock(return_value=("koala", 42, {}, {"phase": "A", "source": "PJM"}, None))
stream = Stream(btrdb=BTrDB(endpoint), uuid=uu)

annotations = {"foo": "this is a string", "phase": "A", }

stream.refresh_metadata()

# remove one of the keys and add new ones
stream.update(annotations=annotations, replace=True)
stream._btrdb.ep.setStreamAnnotations.assert_called_once_with(
uu=uu,
expected=42,
changes=annotations,
removals=["source"],
)

# clear annotations
stream.update(annotations={}, replace=True)
stream._btrdb.ep.setStreamAnnotations.assert_called_with(
uu=uu,
expected=42,
changes={},
removals=["phase", "source"],
)

##########################################################################
## exists tests
##########################################################################
Expand Down