diff --git a/btrdb/endpoint.py b/btrdb/endpoint.py index bb33c84..bc95886 100644 --- a/btrdb/endpoint.py +++ b/btrdb/endpoint.py @@ -66,7 +66,7 @@ def obliterate(self, uu): result = self.stub.Obliterate(params) BTrDBError.checkProtoStat(result.stat) - def setStreamAnnotations(self, uu, expected, changes): + def setStreamAnnotations(self, uu, expected, changes, removals): annkvlist = [] for k, v in changes.items(): if v is None: @@ -77,7 +77,7 @@ def setStreamAnnotations(self, uu, expected, changes): ov = btrdb_pb2.OptValue(value = v) kv = btrdb_pb2.KeyOptValue(key = k, val = ov) annkvlist.append(kv) - params = btrdb_pb2.SetStreamAnnotationsParams(uuid=uu.bytes, expectedPropertyVersion=expected, changes=annkvlist) + params = btrdb_pb2.SetStreamAnnotationsParams(uuid=uu.bytes, expectedPropertyVersion=expected, changes=annkvlist, removals=removals) result = self.stub.SetStreamAnnotations(params) BTrDBError.checkProtoStat(result.stat) diff --git a/btrdb/stream.py b/btrdb/stream.py index 7d19544..d4d01fc 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -108,7 +108,7 @@ def exists(self): Returns ------- bool - Indicates whether stream exists. + Indicates whether stream is extant in the BTrDB server. """ if self._known_to_exist: @@ -425,7 +425,7 @@ def _update_tags_collection(self, tags, collection): collection=collection ) - def _update_annotations(self, annotations, encoder): + def _update_annotations(self, annotations, encoder, replace): # make a copy of the annotations to prevent accidental mutable object mutation serialized = deepcopy(annotations) if encoder is not None: @@ -434,32 +434,64 @@ def _update_annotations(self, annotations, encoder): for k, v in serialized.items() } + removals = [] + if replace: + removals = [i for i in self._annotations.keys() if i not in annotations.keys()] + self._btrdb.ep.setStreamAnnotations( uu=self.uuid, expected=self._property_version, - changes=serialized + changes=serialized, + removals=removals ) - def update(self, tags=None, annotations=None, collection=None, encoder=AnnotationEncoder): + def update(self, tags=None, annotations=None, collection=None, encoder=AnnotationEncoder, replace=False): """ - Updates metadata including tags, annotations, and collection. + Updates metadata including tags, annotations, and collection as an + UPSERT operation. + + By default, the update will only affect the keys and values in the + specified tags and annotations dictionaries, inserting them if they + don't exist, or updating the value for the key if it does exist. If + any of the update arguments (i.e. tags, annotations, collection) are + None, they will remain unchanged in the database. + + To delete either tags or annotations, you must specify exactly which + keys and values you want set for the field and set `replace=True`. For + example: + + >>> annotations, _ = stream.anotations() + >>> del annotations["key_to_delete"] + >>> stream.update(annotations=annotations, replace=True) + + This ensures that all of the keys and values for the annotations are + preserved except for the key to be deleted. Parameters - ---------- - tags: dict - dict of tag information for the stream. - annotations: dict - dict of annotation information for the stream. - collection: str - The collection prefix for a stream - encoder: json.JSONEncoder or None - JSON encoder to class to use for annotation serializations, set to - None to prevent JSON encoding of the annotations. + ----------- + tags : dict, optional + Specify the tag key/value pairs as a dictionary to upsert or + replace. If None, the tags will remain unchanged in the database. + annotations : dict, optional + Specify the annotations key/value pairs as a dictionary to upsert + or replace. If None, the annotations will remain unchanged in the + database. + collection : str, optional + Specify a new collection for the stream. If None, the collection + will remain unchanged. + encoder : json.JSONEncoder or None + JSON encoder class to use for annotation serialization. Set to None + to prevent JSON encoding of the annotations. + replace : bool, default: False + Replace all annotations or tags with the specified dictionaries + instead of performing the normal upsert operation. Specifying True + is the only way to remove annotation keys. Returns ------- int The version of the metadata (separate from the version of the data) + also known as the "property version". """ if tags is None and annotations is None and collection is None: @@ -479,7 +511,7 @@ def update(self, tags=None, annotations=None, collection=None, encoder=Annotatio self.refresh_metadata() if annotations is not None: - self._update_annotations(annotations, encoder) + self._update_annotations(annotations, encoder, replace) self.refresh_metadata() return self._property_version diff --git a/docs/source/working/stream-manage-metadata.rst b/docs/source/working/stream-manage-metadata.rst index 7c53db1..a6c38fc 100644 --- a/docs/source/working/stream-manage-metadata.rst +++ b/docs/source/working/stream-manage-metadata.rst @@ -50,6 +50,10 @@ is also returned when asking for annotations. This version number is incremente whenever metadata (tags, annotations, collection, etc.) are updated but not when making changes to the underlying time series data. +By default the method will attempt to provide a cached copy of the annotations +however you can request the latest version from the server using the `refresh` +argument. + .. code-block:: python stream.annotations(refresh=True) @@ -76,8 +80,14 @@ just a convenience as this value can also be found within the tags. Updating Metadata ---------------------------- An :code:`update` method is available if you would like to make changes to -the tags, annotations, or collection. Note that a single operation could make -multiple updates to the property version. +the tags, annotations, or collection. By default, all updates are implemented +as an UPSERT operation and a single change could result in multiple increments +to the property version (the version of the metadata). + +Upon calling this method, the library will first verify that the local property version of your +stream object matches the version found on the server. If the two versions +do not match then you will not be allowed to perform an update as this implies +that the data has already been changed by another user or process. .. code-block:: python @@ -87,4 +97,17 @@ multiple updates to the property version. 'state': 'VT', 'created': '2018-01-01 12:42:03 -0500' } - prop_version = stream.update(collection=collection, annotations=annotations) + property_version = stream.update( + collection=collection, + annotations=annotations + ) + +If you would like to remove any keys from your annotations you must use the `replace=True` keyword argument. This will ensure that the annotations dictionary you provide completely replaces the existing values rather than perform an UPSERT operation. The example below shows how you could remove an existing key from the annotations dictionary. + +.. code-block:: python + + annotations, _ = stream.anotations() + del annotations["key_to_delete"] + stream.update(annotations=annotations, replace=True) + + diff --git a/tests/btrdb/test_stream.py b/tests/btrdb/test_stream.py index 17ce446..4ced514 100644 --- a/tests/btrdb/test_stream.py +++ b/tests/btrdb/test_stream.py @@ -276,7 +276,8 @@ def test_update_annotations(self): 'frequency': '30', 'control': '2019-11-07 13:21:23.000000-0500', "calibrate": '{"racf": 1.8, "pacf": 0.005}', - } + }, + removals=[], ) stream._btrdb.ep.setStreamTags.assert_not_called() @@ -327,6 +328,9 @@ def test_update_annotations_nested_conversions(self): ) def test_update_annotations_no_encoder(self): + """ + Assert update annotations works with None as encoder argument + """ uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a') endpoint = Mock(Endpoint) endpoint.streamInfo = Mock(return_value=("koala", 42, {}, {}, None)) @@ -340,11 +344,44 @@ def test_update_annotations_no_encoder(self): uu=uu, expected=42, changes=annotations, + removals=[], ) # TODO: mock json.dumps # assert mock_dumps.assert_not_called() + def test_update_annotations_replace(self): + """ + Assert that replace argument will add proper keys to removals array in + endpoint call. + """ + uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a') + endpoint = Mock(Endpoint) + endpoint.streamInfo = Mock(return_value=("koala", 42, {}, {"phase": "A", "source": "PJM"}, None)) + stream = Stream(btrdb=BTrDB(endpoint), uuid=uu) + + annotations = {"foo": "this is a string", "phase": "A", } + + stream.refresh_metadata() + + # remove one of the keys and add new ones + stream.update(annotations=annotations, replace=True) + stream._btrdb.ep.setStreamAnnotations.assert_called_once_with( + uu=uu, + expected=42, + changes=annotations, + removals=["source"], + ) + + # clear annotations + stream.update(annotations={}, replace=True) + stream._btrdb.ep.setStreamAnnotations.assert_called_with( + uu=uu, + expected=42, + changes={}, + removals=["phase", "source"], + ) + ########################################################################## ## exists tests ##########################################################################