From 2deb32c73f9ee8942992ad7f2fa3a59e8d3402c9 Mon Sep 17 00:00:00 2001 From: Michael Chestnut Date: Tue, 23 Feb 2021 10:05:05 -0600 Subject: [PATCH 1/7] adds error handling for grpc errors --- btrdb/conn.py | 4 +- btrdb/endpoint.py | 262 +++++++++++++++++++++++++++------------ btrdb/exceptions.py | 19 ++- tests/btrdb/test_conn.py | 6 +- 4 files changed, 208 insertions(+), 83 deletions(-) diff --git a/btrdb/conn.py b/btrdb/conn.py index 8bb8a2b..55058a4 100644 --- a/btrdb/conn.py +++ b/btrdb/conn.py @@ -26,7 +26,7 @@ from btrdb.stream import Stream, StreamSet from btrdb.utils.general import unpack_stream_descriptor from btrdb.utils.conversion import to_uuid -from btrdb.exceptions import NotFound, InvalidOperation +from btrdb.exceptions import StreamNotFoundError, InvalidOperation ########################################################################## ## Module Variables @@ -187,7 +187,7 @@ def streams(self, *identifiers, versions=None, is_collection_prefix=False): if len(found) == 1: streams.append(found[0]) continue - raise NotFound(f"Could not identify stream `{ident}`") + raise StreamNotFoundError(f"Could not identify stream `{ident}`") raise ValueError(f"Could not identify stream based on `{ident}`. Identifier must be UUID or collection/name.") diff --git a/btrdb/endpoint.py b/btrdb/endpoint.py index adcea0e..e174915 100644 --- a/btrdb/endpoint.py +++ b/btrdb/endpoint.py @@ -25,45 +25,80 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import grpc + from btrdb.grpcinterface import btrdb_pb2 from btrdb.grpcinterface import btrdb_pb2_grpc from btrdb.point import RawPoint -from btrdb.exceptions import BTrDBError +from btrdb.exceptions import BTrDBError, handle_grpc_error from btrdb.utils.general import unpack_stream_descriptor + class Endpoint(object): def __init__(self, channel): self.stub = btrdb_pb2_grpc.BTrDBStub(channel) - def rawValues(self, uu, start, end, version = 0): - params = btrdb_pb2.RawValuesParams(uuid = uu.bytes, start = start, end = end, versionMajor = version) - for result in self.stub.RawValues(params): - BTrDBError.checkProtoStat(result.stat) - yield result.values, result.versionMajor + def rawValues(self, uu, start, end, version=0): + params = btrdb_pb2.RawValuesParams( + uuid=uu.bytes, start=start, end=end, versionMajor=version + ) + try: + for result in self.stub.RawValues(params): + BTrDBError.checkProtoStat(result.stat) + yield result.values, result.versionMajor + except grpc.RpcError as e: + handle_grpc_error(e) - def alignedWindows(self, uu, start, end, pointwidth, version = 0): - params = btrdb_pb2.AlignedWindowsParams(uuid = uu.bytes, start = start, end = end, versionMajor = version, pointWidth = int(pointwidth)) - for result in self.stub.AlignedWindows(params): - BTrDBError.checkProtoStat(result.stat) - yield result.values, result.versionMajor + def alignedWindows(self, uu, start, end, pointwidth, version=0): + params = btrdb_pb2.AlignedWindowsParams( + uuid=uu.bytes, + start=start, + end=end, + versionMajor=version, + pointWidth=int(pointwidth), + ) + try: + for result in self.stub.AlignedWindows(params): + BTrDBError.checkProtoStat(result.stat) + yield result.values, result.versionMajor + except grpc.RpcError as e: + handle_grpc_error(e) - def windows(self, uu, start, end, width, depth, version = 0): - params = btrdb_pb2.WindowsParams(uuid = uu.bytes, start = start, end = end, versionMajor = version, width = width, depth = depth) - for result in self.stub.Windows(params): - BTrDBError.checkProtoStat(result.stat) - yield result.values, result.versionMajor + def windows(self, uu, start, end, width, depth, version=0): + params = btrdb_pb2.WindowsParams( + uuid=uu.bytes, + start=start, + end=end, + versionMajor=version, + width=width, + depth=depth, + ) + try: + for result in self.stub.Windows(params): + BTrDBError.checkProtoStat(result.stat) + yield result.values, result.versionMajor + except grpc.RpcError as e: + handle_grpc_error(e) def streamInfo(self, uu, omitDescriptor, omitVersion): - params = btrdb_pb2.StreamInfoParams(uuid = uu.bytes, omitVersion = omitVersion, omitDescriptor = omitDescriptor) - result = self.stub.StreamInfo(params) + params = btrdb_pb2.StreamInfoParams( + uuid=uu.bytes, omitVersion=omitVersion, omitDescriptor=omitDescriptor + ) + try: + result = self.stub.StreamInfo(params) + except grpc.RpcError as e: + handle_grpc_error(e) desc = result.descriptor BTrDBError.checkProtoStat(result.stat) tagsanns = unpack_stream_descriptor(desc) return desc.collection, desc.propertyVersion, tagsanns[0], tagsanns[1], result.versionMajor def obliterate(self, uu): - params = btrdb_pb2.ObliterateParams(uuid = uu.bytes) - result = self.stub.Obliterate(params) + params = btrdb_pb2.ObliterateParams(uuid=uu.bytes) + try: + result = self.stub.Obliterate(params) + except grpc.RpcError as e: + handle_grpc_error(e) BTrDBError.checkProtoStat(result.stat) def setStreamAnnotations(self, uu, expected, changes, removals): @@ -74,11 +109,19 @@ def setStreamAnnotations(self, uu, expected, changes, removals): else: if isinstance(v, str): v = v.encode("utf-8") - ov = btrdb_pb2.OptValue(value = v) - kv = btrdb_pb2.KeyOptValue(key = k, val = ov) + ov = btrdb_pb2.OptValue(value=v) + kv = btrdb_pb2.KeyOptValue(key=k, val=ov) annkvlist.append(kv) - params = btrdb_pb2.SetStreamAnnotationsParams(uuid=uu.bytes, expectedPropertyVersion=expected, changes=annkvlist, removals=removals) - result = self.stub.SetStreamAnnotations(params) + params = btrdb_pb2.SetStreamAnnotationsParams( + uuid=uu.bytes, + expectedPropertyVersion=expected, + changes=annkvlist, + removals=removals, + ) + try: + result = self.stub.SetStreamAnnotations(params) + except grpc.RpcError as e: + handle_grpc_error(e) BTrDBError.checkProtoStat(result.stat) def setStreamTags(self, uu, expected, tags, collection): @@ -89,11 +132,19 @@ def setStreamTags(self, uu, expected, tags, collection): else: if isinstance(v, str): v = v.encode("utf-8") - ov = btrdb_pb2.OptValue(value = v) - kv = btrdb_pb2.KeyOptValue(key = k, val = ov) + ov = btrdb_pb2.OptValue(value=v) + kv = btrdb_pb2.KeyOptValue(key=k, val=ov) tag_data.append(kv) - params = btrdb_pb2.SetStreamTagsParams(uuid=uu.bytes, expectedPropertyVersion=expected, tags=tag_data, collection=collection) - result = self.stub.SetStreamTags(params) + params = btrdb_pb2.SetStreamTagsParams( + uuid=uu.bytes, + expectedPropertyVersion=expected, + tags=tag_data, + collection=collection, + ) + try: + result = self.stub.SetStreamTags(params) + except grpc.RpcError as e: + handle_grpc_error(e) BTrDBError.checkProtoStat(result.stat) def create(self, uu, collection, tags, annotations): @@ -105,8 +156,13 @@ def create(self, uu, collection, tags, annotations): for k, v in annotations.items(): kv = btrdb_pb2.KeyOptValue(key = k, val = btrdb_pb2.OptValue(value=v)) annkvlist.append(kv) - params = btrdb_pb2.CreateParams(uuid = uu.bytes, collection = collection, tags = tagkvlist, annotations = annkvlist) - result = self.stub.Create(params) + params = btrdb_pb2.CreateParams( + uuid=uu.bytes, collection=collection, tags=tagkvlist, annotations=annkvlist + ) + try: + result = self.stub.Create(params) + except grpc.RpcError as e: + handle_grpc_error(e) BTrDBError.checkProtoStat(result.stat) def listCollections(self, prefix): @@ -118,9 +174,12 @@ def listCollections(self, prefix): collection paths : list[str] """ params = btrdb_pb2.ListCollectionsParams(prefix=prefix) - for msg in self.stub.ListCollections(params): - BTrDBError.checkProtoStat(msg.stat) - yield msg.collections + try: + for msg in self.stub.ListCollections(params): + BTrDBError.checkProtoStat(msg.stat) + yield msg.collections + except grpc.RpcError as e: + handle_grpc_error(e) def lookupStreams(self, collection, isCollectionPrefix, tags, annotations): tagkvlist = [] @@ -130,8 +189,8 @@ def lookupStreams(self, collection, isCollectionPrefix, tags, annotations): else: if isinstance(v, str): v = v.encode("utf-8") - ov = btrdb_pb2.OptValue(value = v) - kv = btrdb_pb2.KeyOptValue(key = k, val = ov) + ov = btrdb_pb2.OptValue(value=v) + kv = btrdb_pb2.KeyOptValue(key=k, val=ov) tagkvlist.append(kv) annkvlist = [] for k, v in annotations.items(): @@ -140,87 +199,136 @@ def lookupStreams(self, collection, isCollectionPrefix, tags, annotations): else: if isinstance(v, str): v = v.encode("utf-8") - ov = btrdb_pb2.OptValue(value = v) - kv = btrdb_pb2.KeyOptValue(key = k, val = ov) + ov = btrdb_pb2.OptValue(value=v) + kv = btrdb_pb2.KeyOptValue(key=k, val=ov) annkvlist.append(kv) - params = btrdb_pb2.LookupStreamsParams(collection = collection, isCollectionPrefix = isCollectionPrefix, tags = tagkvlist, annotations = annkvlist) - for result in self.stub.LookupStreams(params): - BTrDBError.checkProtoStat(result.stat) - yield result.results + params = btrdb_pb2.LookupStreamsParams( + collection=collection, + isCollectionPrefix=isCollectionPrefix, + tags=tagkvlist, + annotations=annkvlist, + ) + try: + for result in self.stub.LookupStreams(params): + BTrDBError.checkProtoStat(result.stat) + yield result.results + except grpc.RpcError as e: + handle_grpc_error(e) def nearest(self, uu, time, version, backward): - params = btrdb_pb2.NearestParams(uuid = uu.bytes, time = time, versionMajor = version, backward = backward) - result = self.stub.Nearest(params) + params = btrdb_pb2.NearestParams( + uuid=uu.bytes, time=time, versionMajor=version, backward=backward + ) + try: + result = self.stub.Nearest(params) + except grpc.RpcError as e: + handle_grpc_error(e) BTrDBError.checkProtoStat(result.stat) return result.value, result.versionMajor def changes(self, uu, fromVersion, toVersion, resolution): - params = btrdb_pb2.ChangesParams(uuid = uu.bytes, fromMajor = fromVersion, toMajor = toVersion, resolution = resolution) - for result in self.stub.Changes(params): - BTrDBError.checkProtoStat(result.stat) - yield result.ranges, result.versionMajor + params = btrdb_pb2.ChangesParams( + uuid=uu.bytes, + fromMajor=fromVersion, + toMajor=toVersion, + resolution=resolution, + ) + try: + for result in self.stub.Changes(params): + BTrDBError.checkProtoStat(result.stat) + yield result.ranges, result.versionMajor + except grpc.RpcError as e: + handle_grpc_error(e) def insert(self, uu, values, policy): policy_map = { - 'never': btrdb_pb2.MergePolicy.NEVER, - 'equal': btrdb_pb2.MergePolicy.EQUAL, - 'retain': btrdb_pb2.MergePolicy.RETAIN, - 'replace': btrdb_pb2.MergePolicy.REPLACE, + "never": btrdb_pb2.MergePolicy.NEVER, + "equal": btrdb_pb2.MergePolicy.EQUAL, + "retain": btrdb_pb2.MergePolicy.RETAIN, + "replace": btrdb_pb2.MergePolicy.REPLACE, } protoValues = RawPoint.to_proto_list(values) - params = btrdb_pb2.InsertParams(uuid = uu.bytes, sync = False, values = protoValues, merge_policy = policy_map[policy]) - result = self.stub.Insert(params) + params = btrdb_pb2.InsertParams( + uuid=uu.bytes, + sync=False, + values=protoValues, + merge_policy=policy_map[policy], + ) + try: + result = self.stub.Insert(params) + except grpc.RpcError as e: + handle_grpc_error(e) BTrDBError.checkProtoStat(result.stat) return result.versionMajor def deleteRange(self, uu, start, end): - params = btrdb_pb2.DeleteParams(uuid = uu.bytes, start = start, end = end) - result = self.stub.Delete(params) + params = btrdb_pb2.DeleteParams(uuid=uu.bytes, start=start, end=end) + try: + result = self.stub.Delete(params) + except grpc.RpcError as e: + handle_grpc_error(e) BTrDBError.checkProtoStat(result.stat) return result.versionMajor def info(self): params = btrdb_pb2.InfoParams() - result = self.stub.Info(params) + try: + result = self.stub.Info(params) + except grpc.RpcError as e: + handle_grpc_error(e) BTrDBError.checkProtoStat(result.stat) return result def faultInject(self, typ, args): - params = btrdb_pb2.FaultInjectParams(type = typ, params = args) - result = self.stub.FaultInject(params) + params = btrdb_pb2.FaultInjectParams(type=typ, params=args) + try: + result = self.stub.FaultInject(params) + except grpc.RpcError as e: + handle_grpc_error(e) BTrDBError.checkProtoStat(result.stat) return result.rv def flush(self, uu): - params = btrdb_pb2.FlushParams(uuid = uu.bytes) - result = self.stub.Flush(params) + params = btrdb_pb2.FlushParams(uuid=uu.bytes) + try: + result = self.stub.Flush(params) + except grpc.RpcError as e: + handle_grpc_error(e) BTrDBError.checkProtoStat(result.stat) def getMetadataUsage(self, prefix): - params = btrdb_pb2.MetadataUsageParams(prefix = prefix) - result = self.stub.GetMetadataUsage(params) + params = btrdb_pb2.MetadataUsageParams(prefix=prefix) + try: + result = self.stub.GetMetadataUsage(params) + except grpc.RpcError as e: + handle_grpc_error(e) BTrDBError.checkProtoStat(result.stat) return result.tags, result.annotations def generateCSV(self, queryType, start, end, width, depth, includeVersions, *streams): protoStreams = [btrdb_pb2.StreamCSVConfig(version = stream[0], - label = stream[1], - uuid = stream[2].bytes) + label = stream[1], + uuid = stream[2].bytes) for stream in streams] params = btrdb_pb2.GenerateCSVParams(queryType = queryType.to_proto(), - startTime = start, - endTime = end, - windowSize = width, - depth = depth, - includeVersions = includeVersions, - streams = protoStreams) - result = self.stub.GenerateCSV(params) - for result in self.stub.GenerateCSV(params): - BTrDBError.checkProtoStat(result.stat) - yield result.row + startTime = start, + endTime = end, + windowSize = width, + depth = depth, + includeVersions = includeVersions, + streams = protoStreams) + try: + for result in self.stub.GenerateCSV(params): + BTrDBError.checkProtoStat(result.stat) + yield result.row + except grpc.RpcError as e: + handle_grpc_error(e) def sql_query(self, stmt, params=[]): request = btrdb_pb2.SQLQueryParams(query=stmt, params=params) - for page in self.stub.SQLQuery(request): - BTrDBError.checkProtoStat(page.stat) - yield page.SQLQueryRow + try: + for page in self.stub.SQLQuery(request): + BTrDBError.checkProtoStat(page.stat) + yield page.SQLQueryRow + except grpc.RpcError as e: + handle_grpc_error(e) \ No newline at end of file diff --git a/btrdb/exceptions.py b/btrdb/exceptions.py index 3c22c4a..a0e90a1 100644 --- a/btrdb/exceptions.py +++ b/btrdb/exceptions.py @@ -11,11 +11,22 @@ Module for custom exceptions """ +# Handles grpc errors +def handle_grpc_error(err): + if err.details() == "[404] stream does not exist": + raise StreamNotFoundError( + "Stream not found with provided uuid" + ) from None + elif err.details() == "failed to connect to all addresses": + raise ConnectionError() from None + raise err + class BTrDBError(Exception): """ The primary exception for grpc related errors. """ + def __init__(self, code, msg, mash): self.code = code self.msg = msg @@ -50,36 +61,42 @@ class ConnectionError(Exception): """ pass + class InvalidOperation(Exception): """ An invalid BTrDB operation has been requested. """ pass -class NotFound(Exception): + +class StreamNotFoundError(Exception): """ A problem interacting with the BTrDB server. """ pass + class BTRDBValueError(ValueError): """ A problem interacting with the BTrDB server. """ pass + class BTRDBTypeError(TypeError): """ A problem interacting with the BTrDB server. """ pass + class CredentialsFileNotFound(FileNotFoundError): """ The credentials file could not be found. """ pass + class ProfileNotFound(Exception): """ A requested profile could not be found in the credentials file. diff --git a/tests/btrdb/test_conn.py b/tests/btrdb/test_conn.py index 202395f..f863187 100644 --- a/tests/btrdb/test_conn.py +++ b/tests/btrdb/test_conn.py @@ -145,17 +145,17 @@ def test_streams_handles_path(self, mock_func): @patch('btrdb.conn.BTrDB.streams_in_collection') def test_streams_raises_err(self, mock_func): """ - Assert streams raises NotFound + Assert streams raises StreamNotFoundError """ db = BTrDB(None) ident = "zoo/animal/dog" mock_func.return_value = [] - with pytest.raises(NotFound) as exc: + with pytest.raises(StreamNotFoundError) as exc: db.streams(ident) mock_func.return_value = [1,2] - with pytest.raises(NotFound) as exc: + with pytest.raises(StreamNotFoundError) as exc: db.streams(ident) # check that does not raise if one returned From 4c9d4e19f6942ea9744f312f8474d737096a870e Mon Sep 17 00:00:00 2001 From: Michael Chestnut Date: Tue, 2 Mar 2021 10:43:35 -0600 Subject: [PATCH 2/7] create custom btrdb exceptions, add decorator for error handling, update tests --- btrdb/endpoint.py | 183 ++++++++++++++----------------------- btrdb/errors.py | 163 +++++++++++++++++++++++++++++++++ btrdb/exceptions.py | 134 +++++++++++++++++---------- btrdb/stream.py | 43 +++++---- tests/btrdb/test_stream.py | 40 ++++---- 5 files changed, 367 insertions(+), 196 deletions(-) create mode 100644 btrdb/errors.py diff --git a/btrdb/endpoint.py b/btrdb/endpoint.py index e174915..0a05845 100644 --- a/btrdb/endpoint.py +++ b/btrdb/endpoint.py @@ -25,30 +25,28 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import grpc - from btrdb.grpcinterface import btrdb_pb2 from btrdb.grpcinterface import btrdb_pb2_grpc from btrdb.point import RawPoint -from btrdb.exceptions import BTrDBError, handle_grpc_error +from btrdb.exceptions import BTrDBError from btrdb.utils.general import unpack_stream_descriptor +from btrdb.errors import error_handler, gen_error_handler, check_proto_stat class Endpoint(object): def __init__(self, channel): self.stub = btrdb_pb2_grpc.BTrDBStub(channel) + @gen_error_handler def rawValues(self, uu, start, end, version=0): params = btrdb_pb2.RawValuesParams( uuid=uu.bytes, start=start, end=end, versionMajor=version ) - try: - for result in self.stub.RawValues(params): - BTrDBError.checkProtoStat(result.stat) - yield result.values, result.versionMajor - except grpc.RpcError as e: - handle_grpc_error(e) + for result in self.stub.RawValues(params): + check_proto_stat(result.stat) + yield result.values, result.versionMajor + @gen_error_handler def alignedWindows(self, uu, start, end, pointwidth, version=0): params = btrdb_pb2.AlignedWindowsParams( uuid=uu.bytes, @@ -57,13 +55,11 @@ def alignedWindows(self, uu, start, end, pointwidth, version=0): versionMajor=version, pointWidth=int(pointwidth), ) - try: - for result in self.stub.AlignedWindows(params): - BTrDBError.checkProtoStat(result.stat) - yield result.values, result.versionMajor - except grpc.RpcError as e: - handle_grpc_error(e) + for result in self.stub.AlignedWindows(params): + check_proto_stat(result.stat) + yield result.values, result.versionMajor + @gen_error_handler def windows(self, uu, start, end, width, depth, version=0): params = btrdb_pb2.WindowsParams( uuid=uu.bytes, @@ -73,34 +69,28 @@ def windows(self, uu, start, end, width, depth, version=0): width=width, depth=depth, ) - try: - for result in self.stub.Windows(params): - BTrDBError.checkProtoStat(result.stat) - yield result.values, result.versionMajor - except grpc.RpcError as e: - handle_grpc_error(e) + for result in self.stub.Windows(params): + check_proto_stat(result.stat) + yield result.values, result.versionMajor + @error_handler def streamInfo(self, uu, omitDescriptor, omitVersion): params = btrdb_pb2.StreamInfoParams( uuid=uu.bytes, omitVersion=omitVersion, omitDescriptor=omitDescriptor ) - try: - result = self.stub.StreamInfo(params) - except grpc.RpcError as e: - handle_grpc_error(e) + result = self.stub.StreamInfo(params) desc = result.descriptor - BTrDBError.checkProtoStat(result.stat) + check_proto_stat(result.stat) tagsanns = unpack_stream_descriptor(desc) return desc.collection, desc.propertyVersion, tagsanns[0], tagsanns[1], result.versionMajor + @error_handler def obliterate(self, uu): params = btrdb_pb2.ObliterateParams(uuid=uu.bytes) - try: - result = self.stub.Obliterate(params) - except grpc.RpcError as e: - handle_grpc_error(e) - BTrDBError.checkProtoStat(result.stat) + result = self.stub.Obliterate(params) + check_proto_stat(result.stat) + @error_handler def setStreamAnnotations(self, uu, expected, changes, removals): annkvlist = [] for k, v in changes.items(): @@ -118,12 +108,10 @@ def setStreamAnnotations(self, uu, expected, changes, removals): changes=annkvlist, removals=removals, ) - try: - result = self.stub.SetStreamAnnotations(params) - except grpc.RpcError as e: - handle_grpc_error(e) - BTrDBError.checkProtoStat(result.stat) + result = self.stub.SetStreamAnnotations(params) + check_proto_stat(result.stat) + @error_handler def setStreamTags(self, uu, expected, tags, collection): tag_data = [] for k, v in tags.items(): @@ -141,12 +129,10 @@ def setStreamTags(self, uu, expected, tags, collection): tags=tag_data, collection=collection, ) - try: - result = self.stub.SetStreamTags(params) - except grpc.RpcError as e: - handle_grpc_error(e) - BTrDBError.checkProtoStat(result.stat) + result = self.stub.SetStreamTags(params) + check_proto_stat(result.stat) + @error_handler def create(self, uu, collection, tags, annotations): tagkvlist = [] for k, v in tags.items(): @@ -159,12 +145,10 @@ def create(self, uu, collection, tags, annotations): params = btrdb_pb2.CreateParams( uuid=uu.bytes, collection=collection, tags=tagkvlist, annotations=annkvlist ) - try: - result = self.stub.Create(params) - except grpc.RpcError as e: - handle_grpc_error(e) - BTrDBError.checkProtoStat(result.stat) + result = self.stub.Create(params) + check_proto_stat(result.stat) + @gen_error_handler def listCollections(self, prefix): """ Returns a generator for windows of collection paths matching search @@ -174,13 +158,11 @@ def listCollections(self, prefix): collection paths : list[str] """ params = btrdb_pb2.ListCollectionsParams(prefix=prefix) - try: - for msg in self.stub.ListCollections(params): - BTrDBError.checkProtoStat(msg.stat) - yield msg.collections - except grpc.RpcError as e: - handle_grpc_error(e) + for msg in self.stub.ListCollections(params): + check_proto_stat(msg.stat) + yield msg.collections + @gen_error_handler def lookupStreams(self, collection, isCollectionPrefix, tags, annotations): tagkvlist = [] for k, v in tags.items(): @@ -208,24 +190,20 @@ def lookupStreams(self, collection, isCollectionPrefix, tags, annotations): tags=tagkvlist, annotations=annkvlist, ) - try: - for result in self.stub.LookupStreams(params): - BTrDBError.checkProtoStat(result.stat) - yield result.results - except grpc.RpcError as e: - handle_grpc_error(e) + for result in self.stub.LookupStreams(params): + check_proto_stat(result.stat) + yield result.results + @error_handler def nearest(self, uu, time, version, backward): params = btrdb_pb2.NearestParams( uuid=uu.bytes, time=time, versionMajor=version, backward=backward ) - try: - result = self.stub.Nearest(params) - except grpc.RpcError as e: - handle_grpc_error(e) - BTrDBError.checkProtoStat(result.stat) + result = self.stub.Nearest(params) + check_proto_stat(result.stat) return result.value, result.versionMajor - + + @gen_error_handler def changes(self, uu, fromVersion, toVersion, resolution): params = btrdb_pb2.ChangesParams( uuid=uu.bytes, @@ -233,13 +211,11 @@ def changes(self, uu, fromVersion, toVersion, resolution): toMajor=toVersion, resolution=resolution, ) - try: - for result in self.stub.Changes(params): - BTrDBError.checkProtoStat(result.stat) - yield result.ranges, result.versionMajor - except grpc.RpcError as e: - handle_grpc_error(e) + for result in self.stub.Changes(params): + check_proto_stat(result.stat) + yield result.ranges, result.versionMajor + @error_handler def insert(self, uu, values, policy): policy_map = { "never": btrdb_pb2.MergePolicy.NEVER, @@ -254,57 +230,45 @@ def insert(self, uu, values, policy): values=protoValues, merge_policy=policy_map[policy], ) - try: - result = self.stub.Insert(params) - except grpc.RpcError as e: - handle_grpc_error(e) - BTrDBError.checkProtoStat(result.stat) + result = self.stub.Insert(params) + check_proto_stat(result.stat) return result.versionMajor + @error_handler def deleteRange(self, uu, start, end): params = btrdb_pb2.DeleteParams(uuid=uu.bytes, start=start, end=end) - try: - result = self.stub.Delete(params) - except grpc.RpcError as e: - handle_grpc_error(e) - BTrDBError.checkProtoStat(result.stat) + result = self.stub.Delete(params) + check_proto_stat(result.stat) return result.versionMajor + @error_handler def info(self): params = btrdb_pb2.InfoParams() - try: - result = self.stub.Info(params) - except grpc.RpcError as e: - handle_grpc_error(e) - BTrDBError.checkProtoStat(result.stat) + result = self.stub.Info(params) + check_proto_stat(result.stat) return result + @error_handler def faultInject(self, typ, args): params = btrdb_pb2.FaultInjectParams(type=typ, params=args) - try: - result = self.stub.FaultInject(params) - except grpc.RpcError as e: - handle_grpc_error(e) - BTrDBError.checkProtoStat(result.stat) + result = self.stub.FaultInject(params) + check_proto_stat(result.stat) return result.rv + @error_handler def flush(self, uu): params = btrdb_pb2.FlushParams(uuid=uu.bytes) - try: - result = self.stub.Flush(params) - except grpc.RpcError as e: - handle_grpc_error(e) - BTrDBError.checkProtoStat(result.stat) + result = self.stub.Flush(params) + check_proto_stat(result.stat) + @error_handler def getMetadataUsage(self, prefix): params = btrdb_pb2.MetadataUsageParams(prefix=prefix) - try: - result = self.stub.GetMetadataUsage(params) - except grpc.RpcError as e: - handle_grpc_error(e) - BTrDBError.checkProtoStat(result.stat) + result = self.stub.GetMetadataUsage(params) + check_proto_stat(result.stat) return result.tags, result.annotations + @gen_error_handler def generateCSV(self, queryType, start, end, width, depth, includeVersions, *streams): protoStreams = [btrdb_pb2.StreamCSVConfig(version = stream[0], label = stream[1], @@ -317,18 +281,13 @@ def generateCSV(self, queryType, start, end, width, depth, includeVersions, *str depth = depth, includeVersions = includeVersions, streams = protoStreams) - try: - for result in self.stub.GenerateCSV(params): - BTrDBError.checkProtoStat(result.stat) - yield result.row - except grpc.RpcError as e: - handle_grpc_error(e) + for result in self.stub.GenerateCSV(params): + check_proto_stat(result.stat) + yield result.row + @gen_error_handler def sql_query(self, stmt, params=[]): request = btrdb_pb2.SQLQueryParams(query=stmt, params=params) - try: - for page in self.stub.SQLQuery(request): - BTrDBError.checkProtoStat(page.stat) - yield page.SQLQueryRow - except grpc.RpcError as e: - handle_grpc_error(e) \ No newline at end of file + for page in self.stub.SQLQuery(request): + check_proto_stat(page.stat) + yield page.SQLQueryRow diff --git a/btrdb/errors.py b/btrdb/errors.py new file mode 100644 index 0000000..9f3d4db --- /dev/null +++ b/btrdb/errors.py @@ -0,0 +1,163 @@ +# btrdb.errors +# Module for error handling +# +# Author: PingThings +# Created: Thurs Feb 25 2021 +# +# For license information, see LICENSE.txt +# ID: errors.py [] michael.chestnut@pingthings.io $ + +########################################################################## +## Imports +########################################################################## + +import grpc +from functools import partial, wraps +from btrdb.exceptions import ( + BTrDBError, + BTRDBServerError, + NoSuchPoint, + StreamNotFoundError, + InvalidCollection, + InvalidTagKey, + InvalidTagValue, + InvalidTimeRange, + InvalidPointWidth, + StreamExists, + AmbiguousStream, + BadValue, + RecycledUUID, + BadSQLValue, + VersionNotAvailable +) + +########################################################################## +## Error mapping +########################################################################## + +# Errors that we have custom Exceptions for +BTRDB_ERRORS = { + 401: NoSuchPoint, + 404: StreamNotFoundError, + 407: InvalidCollection, + 408: InvalidTagKey, + 409: InvalidTagValue, + 413: InvalidTimeRange, + 415: InvalidPointWidth, + 417: StreamExists, + 418: AmbiguousStream, + 425: BadValue, + 429: RecycledUUID, + 441: BadSQLValue, + 450: VersionNotAvailable +} + +# All of these raise BTRDBServerError +BTRDB_SERVER_ERRORS = [ + 402, # ContextError + 403, # InsertFailure + 405, # WrongEndpoint + 414, # InsertTooBig + 421, # WrongArgs + 423, # AnnotationVersionMismatch + 424, # FaultInjectionDisabled + 426, # ResourceDepleted + 427, # InvalidVersions + 431, # ObliterateDisabled + 432, # CephError + 433, # NodeExisted + 434, # JournalError + 438, # InvalidParameter + 440, # MetadataConnectionError + 452, # OverlappingTrimRange + 453, # LockFailed + 454, # NoLeafNode + 455, # TierCacheError + 456, # StreamEmpty + 457, # TieredStorageBackendError + 458, # TieredStorageOutOfBounds + 459, # TieredStorageTemporaryError + 500, # InvariantFailure + 501, # NotImplemented + 502, # UnsupportedRollback +] + +########################################################################## +## Decorators +########################################################################## + +def consume_generator(fn, *args, **kwargs): + yield from fn(*args, **kwargs) + +def _error(fn, generator): + """ + Base decorator that checks endpoint functions for grpc.RpcErrors + + Parameters + ---------- + fn: function + generator: bool + specifies whether or not the input function is a generator + """ + # allows input func to keep its name and metadata + @wraps(fn) + def wrap(*args, **kwargs): + try: + if generator: + # putting yield directly in this function turns it into a generator, + # so keeping it separate + consume_generator(fn, *args, **kwargs) + return fn(*args, **kwargs) + except grpc.RpcError as e: + handle_grpc_error(e) + return wrap + + +# Create two partial functions to be used as error handling decorators for +# endpoint functions. One is used for regular endpoints, and one is used for +# endpoints that are generators +error_handler = partial(_error, generator=False) +gen_error_handler = partial(_error, generator=True) + +########################################################################## +## gRPC error handling +########################################################################## + +# NOTE: this function relies on matching strings and isn't really ideal. +# this is more of a band-aid solution while we figure out how to send +# better errors from btrdb-server +def handle_grpc_error(err): + """ + Gets called by endpoint functions when a gRPC error is encountered. + Checks error details strings to catch known errors, if error is not + known then a generic BTrDBError gets raised + + Parameters + ---------- + err: grpc.RpcError + """ + details = err.details() + if details == "[404] stream does not exist": + raise StreamNotFoundError("Stream not found with provided uuid") from None + elif details == "failed to connect to all addresses": + raise ConnectionError("Failed to connect to BTrDB") from None + elif any(e in err.debug_error_string() for e in BTRDB_SERVER_ERRORS): + raise BTRDBServerError("An error has occured with btrdb-server") from None + raise BTrDBError(err.code(), details, None) from None + +def check_proto_stat(stat): + """ + Checks status of result after gRPC request and raises appropriate + error based on status code + + Parameters + ---------- + stat: btrdb_pb2.Status + """ + code = stat.code + if code != 0: + if code in BTRDB_ERRORS: + raise BTRDB_ERRORS[code](stat.msg) + elif code in BTRDB_SERVER_ERRORS: + raise BTRDBServerError(stat.msg) + raise BTrDBError(stat.msg) \ No newline at end of file diff --git a/btrdb/exceptions.py b/btrdb/exceptions.py index a0e90a1..0d3486d 100644 --- a/btrdb/exceptions.py +++ b/btrdb/exceptions.py @@ -7,98 +7,132 @@ # For license information, see LICENSE.txt # ID: exceptions.py [] allen@pingthings.io $ -""" -Module for custom exceptions -""" - -# Handles grpc errors -def handle_grpc_error(err): - if err.details() == "[404] stream does not exist": - raise StreamNotFoundError( - "Stream not found with provided uuid" - ) from None - elif err.details() == "failed to connect to all addresses": - raise ConnectionError() from None - raise err - +########################################################################## +## BTrDB Exceptions +########################################################################## class BTrDBError(Exception): """ The primary exception for grpc related errors. """ + pass - def __init__(self, code, msg, mash): - self.code = code - self.msg = msg - self.mash = mash - - @staticmethod - def fromProtoStat(protoStatus): - return BTrDBError(protoStatus.code, protoStatus.msg, protoStatus.mash) +class ConnectionError(BTrDBError): + """ + An error has occurred while trying to establish a connection with BTrDB. + """ + pass - @staticmethod - def checkProtoStat(protoStatus): - stat = BTrDBError.fromProtoStat(protoStatus) - if stat.isError(): - raise stat +class StreamNotFoundError(BTrDBError): + """ + A problem interacting with the BTrDB server. + """ + pass - def isError(self): - return self.code != 0 +class CredentialsFileNotFound(FileNotFoundError, BTrDBError): + """ + The credentials file could not be found. + """ + pass - def __repr__(self): - return "{3}({0}, {1}, {2})".format(repr(self.code), repr(self.msg), repr(self.mash), self.__class__.__name__) +class ProfileNotFound(BTrDBError): + """ + A requested profile could not be found in the credentials file. + """ + pass - def __str__(self): - if self.isError(): - return "[{0}] {1}".format(self.code, self.msg) - else: - return "" +class BTRDBServerError(BTrDBError): + """ + An error occured with btrdb-server. + """ + pass +class BTRDBTypeError(TypeError, BTrDBError): + """ + A problem interacting with the BTrDB server. + """ + pass -class ConnectionError(Exception): +class InvalidOperation(BTrDBError): """ - An error has occurred while interacting with the BTrDB server or when trying to establish a connection. + An invalid BTrDB operation has been requested. """ pass +class StreamExists(InvalidOperation): + """ + Create() has been attempted and the uuid already exists + """ + pass -class InvalidOperation(Exception): +class AmbiguousStream(InvalidOperation): """ - An invalid BTrDB operation has been requested. + Create() has been attempted and uuid is different, but collection and tags already exist """ pass +class BTRDBValueError(ValueError, BTrDBError): + """ + An invalid value has been passed to a BTrDB operation. + """ + pass -class StreamNotFoundError(Exception): +class InvalidCollection(BTRDBValueError): """ - A problem interacting with the BTrDB server. + Collection name is invalid. It is either too long or not a valid string """ pass +class InvalidTagKey(BTRDBValueError): + """ + Tag key is invalid. Must be one of ("name", "unit", "ingress", "distiller") + """ + pass -class BTRDBValueError(ValueError): +class InvalidTagValue(BTRDBValueError): """ - A problem interacting with the BTrDB server. + Tag value is invalid. It is either too long or not a valid string """ pass +class InvalidTimeRange(BTRDBValueError): + """ + Insert contains a timestamp outside the range of (btrdb.MINIMUM_TIME, btrdb.MAXIMUM_TIME) + """ + pass -class BTRDBTypeError(TypeError): +class InvalidPointWidth(BTRDBValueError): """ - A problem interacting with the BTrDB server. + Valid pointwidths are (0, 64) """ pass +class BadValue(BTRDBValueError): + """ + Returned when you try to insert None values + """ + pass -class CredentialsFileNotFound(FileNotFoundError): +class RecycledUUID(BTRDBValueError): """ - The credentials file could not be found. + Returned if you try to create a stream with a uuid that matches a previously deleted stream """ pass +class BadSQLValue(BTRDBValueError): + """ + Invalid parameters have been passed to metadata db + """ + pass -class ProfileNotFound(Exception): +class VersionNotAvailable(BTRDBValueError): """ - A requested profile could not be found in the credentials file. + When querying a stream at a pruned version """ pass + +class NoSuchPoint(BTRDBValueError): + """ + If you ask for next/previous point and there isn't one + """ + pass \ No newline at end of file diff --git a/btrdb/stream.py b/btrdb/stream.py index fbb43d4..109346c 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -24,10 +24,18 @@ from btrdb.utils.buffer import PointBuffer from btrdb.point import RawPoint, StatPoint from btrdb.transformers import StreamSetTransformer -from btrdb.exceptions import BTrDBError, InvalidOperation from btrdb.utils.timez import currently_as_ns, to_nanoseconds from btrdb.utils.conversion import AnnotationEncoder, AnnotationDecoder from btrdb.utils.general import pointwidth as pw +from btrdb.exceptions import ( + BTrDBError, + BTRDBTypeError, + BTRDBValueError, + InvalidOperation, + InvalidCollection, + StreamNotFoundError, + NoSuchPoint +) ########################################################################## @@ -70,7 +78,7 @@ def __init__(self, btrdb, uuid, **db_values): setattr(self, "_{}".format(key), value) if db_values: bad_keys = ", ".join(db_values.keys()) - raise TypeError("got unexpected db_values argument(s) '{}'".format(bad_keys)) + raise BTRDBTypeError("got unexpected db_values argument(s) '{}'".format(bad_keys)) self._btrdb = btrdb self._uuid = uuid @@ -119,7 +127,7 @@ def exists(self): self.refresh_metadata() return True except BTrDBError as bte: - if bte.code == 404: + if isinstance(bte, StreamNotFoundError): return False raise bte @@ -442,7 +450,7 @@ def _update_tags_collection(self, tags, collection): tags = self.tags() if tags is None else tags collection = self.collection if collection is None else collection if collection is None: - raise ValueError("collection must be provided to update tags or collection") + raise BTRDBValueError("collection must be provided to update tags or collection") self._btrdb.ep.setStreamTags( uu=self.uuid, @@ -521,16 +529,16 @@ def update(self, tags=None, annotations=None, collection=None, encoder=Annotatio """ if tags is None and annotations is None and collection is None: - raise ValueError("you must supply a tags, annotations, or collection argument") + raise BTRDBValueError("you must supply a tags, annotations, or collection argument") if tags is not None and isinstance(tags, dict) is False: - raise TypeError("tags must be of type dict") + raise BTRDBTypeError("tags must be of type dict") if annotations is not None and isinstance(annotations, dict) is False: - raise TypeError("annotations must be of type dict") + raise BTRDBTypeError("annotations must be of type dict") if collection is not None and isinstance(collection, str) is False: - raise TypeError("collection must be of type string") + raise InvalidCollection("collection must be of type string") if tags is not None or collection is not None: self._update_tags_collection(tags, collection) @@ -755,12 +763,11 @@ def nearest(self, time, version, backward=False): the value was retrieved at (tuple(RawPoint, int)). """ - try: rp, version = self._btrdb.ep.nearest(self._uuid, to_nanoseconds(time), version, backward) except BTrDBError as exc: - if exc.code != 401: + if not isinstance(exc, NoSuchPoint): raise return None @@ -837,11 +844,11 @@ def pin_versions(self, versions=None): """ if versions is not None: if not isinstance(versions, dict): - raise TypeError("`versions` argument must be dict") + raise BTRDBTypeError("`versions` argument must be dict") for key in versions.keys(): if not isinstance(key, uuidlib.UUID): - raise TypeError("version keys must be type UUID") + raise BTRDBTypeError("version keys must be type UUID") self._pinned_versions = self._latest_versions() if not versions else versions @@ -976,7 +983,7 @@ def current(self): start = params.get("start", None) if (end is not None and end <= now) or (start is not None and start > now): - raise ValueError("current time is not included in filtered stream range") + raise BTRDBValueError("current time is not included in filtered stream range") for s in self._streams: version = self.versions()[s.uuid] @@ -1040,7 +1047,7 @@ def filter(self, start=None, end=None, collection=None, name=None, unit=None, elif isinstance(collection, str): obj._streams = [s for s in obj._streams if s.collection.lower() == collection.lower()] else: - raise TypeError("collection must be string or compiled regex") + raise BTRDBTypeError("collection must be string or compiled regex") # filter by name if name is not None: @@ -1049,7 +1056,7 @@ def filter(self, start=None, end=None, collection=None, name=None, unit=None, elif isinstance(name, str): obj._streams = [s for s in obj._streams if s.name.lower() == name.lower()] else: - raise TypeError("name must be string or compiled regex") + raise BTRDBTypeError("name must be string or compiled regex") # filter by unit if unit is not None: @@ -1058,7 +1065,7 @@ def filter(self, start=None, end=None, collection=None, name=None, unit=None, elif isinstance(unit, str): obj._streams = [s for s in obj._streams if s.tags().get("unit", "").lower() == unit.lower()] else: - raise TypeError("unit must be string or compiled regex") + raise BTRDBTypeError("unit must be string or compiled regex") # filter by tags if tags: @@ -1339,7 +1346,7 @@ def __init__(self, start=None, end=None): self.end = to_nanoseconds(end) if end else None if self.start is None and self.end is None: - raise ValueError("A valid `start` or `end` must be supplied") + raise BTRDBValueError("A valid `start` or `end` must be supplied") if self.start is not None and self.end is not None and self.start >= self.end: - raise ValueError("`start` must be strictly less than `end` argument") + raise BTRDBValueError("`start` must be strictly less than `end` argument") diff --git a/tests/btrdb/test_stream.py b/tests/btrdb/test_stream.py index a36fab6..3ba9461 100644 --- a/tests/btrdb/test_stream.py +++ b/tests/btrdb/test_stream.py @@ -29,7 +29,13 @@ from btrdb import MINIMUM_TIME, MAXIMUM_TIME from btrdb.stream import Stream, StreamSet, StreamFilter, INSERT_BATCH_SIZE from btrdb.point import RawPoint, StatPoint -from btrdb.exceptions import BTrDBError, InvalidOperation +from btrdb.exceptions import ( + BTrDBError, + InvalidOperation, + StreamNotFoundError, + InvalidCollection, + NoSuchPoint +) from btrdb.grpcinterface import btrdb_pb2 RawPointProto = btrdb_pb2.RawPoint @@ -198,7 +204,7 @@ def test_update_arguments(self): assert "annotations must be of type dict" in str(exc) # collection not string - with pytest.raises(TypeError) as exc: + with pytest.raises(InvalidCollection) as exc: stream.update(collection=42) assert "collection must be of type string" in str(exc) @@ -416,7 +422,9 @@ def test_exists_returns_false_on_404(self): """ uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a') stream = Stream(btrdb=BTrDB(Mock(Endpoint)), uuid=uu) - stream.refresh_metadata = Mock(side_effect=BTrDBError(code=404, msg="hello", mash="")) + stream.refresh_metadata = Mock(side_effect=StreamNotFoundError( + "stream not found with provided uuid" + )) assert stream.exists() == False assert stream.refresh_metadata.call_count == 1 @@ -627,7 +635,7 @@ def test_earliest_swallows_exception(self): uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a') endpoint = Mock(Endpoint) stream = Stream(btrdb=BTrDB(endpoint), uuid=uu) - endpoint.nearest = Mock(side_effect=BTrDBError(401,"empty",None)) + endpoint.nearest = Mock(side_effect=NoSuchPoint("next point does not exist")) assert stream.earliest() is None endpoint.nearest.assert_called_once_with(uu, MINIMUM_TIME, 0, False) @@ -640,11 +648,11 @@ def test_earliest_passes_exception(self): uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a') endpoint = Mock(Endpoint) stream = Stream(btrdb=BTrDB(endpoint), uuid=uu) - endpoint.nearest = Mock(side_effect=BTrDBError(999,"empty",None)) + endpoint.nearest = Mock(side_effect=BTrDBError("empty")) with pytest.raises(BTrDBError) as exc: stream.earliest() - assert exc.value.code == 999 + assert exc.value.args[0] == "empty" endpoint.nearest.assert_called_once_with(uu, MINIMUM_TIME, 0, False) @@ -669,7 +677,7 @@ def test_latest_swallows_exception(self): uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a') endpoint = Mock(Endpoint) stream = Stream(btrdb=BTrDB(endpoint), uuid=uu) - endpoint.nearest = Mock(side_effect=BTrDBError(401,"empty",None)) + endpoint.nearest = Mock(side_effect=NoSuchPoint("empty")) assert stream.latest() is None endpoint.nearest.assert_called_once_with(uu, MAXIMUM_TIME, 0, True) @@ -682,11 +690,11 @@ def test_latest_passes_exception(self): uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a') endpoint = Mock(Endpoint) stream = Stream(btrdb=BTrDB(endpoint), uuid=uu) - endpoint.nearest = Mock(side_effect=BTrDBError(999,"empty",None)) + endpoint.nearest = Mock(side_effect=BTrDBError("empty")) with pytest.raises(BTrDBError) as exc: stream.latest() - assert exc.value.code == 999 + assert exc.value.args[0] == "empty" endpoint.nearest.assert_called_once_with(uu, MAXIMUM_TIME, 0, True) @@ -715,7 +723,7 @@ def test_currently_swallows_exception(self, mocked): uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a') endpoint = Mock(Endpoint) stream = Stream(btrdb=BTrDB(endpoint), uuid=uu) - endpoint.nearest = Mock(side_effect=BTrDBError(401,"empty",None)) + endpoint.nearest = Mock(side_effect=NoSuchPoint("empty")) ns_fake_time = 1514808000000000000 mocked.return_value = ns_fake_time @@ -731,13 +739,13 @@ def test_currently_passes_exception(self, mocked): uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a') endpoint = Mock(Endpoint) stream = Stream(btrdb=BTrDB(endpoint), uuid=uu) - endpoint.nearest = Mock(side_effect=BTrDBError(999,"empty",None)) + endpoint.nearest = Mock(side_effect=BTrDBError("empty")) ns_fake_time = 1514808000000000000 mocked.return_value = ns_fake_time with pytest.raises(BTrDBError) as exc: stream.current() - assert exc.value.code == 999 + assert exc.value.args[0] == "empty" endpoint.nearest.assert_called_once_with(uu, ns_fake_time, 0, True) @@ -798,7 +806,7 @@ def test_nearest_swallows_exception(self): uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a') endpoint = Mock(Endpoint) stream = Stream(btrdb=BTrDB(endpoint), uuid=uu) - endpoint.nearest = Mock(side_effect=BTrDBError(401,"empty",None)) + endpoint.nearest = Mock(side_effect=NoSuchPoint("next point does not exist")) assert stream.nearest(0, 0, False) is None endpoint.nearest.assert_called_once_with(uu, 0, 0, False) @@ -811,11 +819,11 @@ def test_nearest_passes_exception(self): uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a') endpoint = Mock(Endpoint) stream = Stream(btrdb=BTrDB(endpoint), uuid=uu) - endpoint.nearest = Mock(side_effect=BTrDBError(999,"empty",None)) + endpoint.nearest = Mock(side_effect=BTrDBError("foo")) with pytest.raises(BTrDBError) as exc: stream.nearest(0, 0, False) - assert exc.value.code == 999 + assert exc.value.args[0] == "foo" endpoint.nearest.assert_called_once_with(uu, 0, 0, False) @@ -858,7 +866,7 @@ def test_obliterate_allows_error(self): """ uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a') endpoint = Mock(Endpoint) - endpoint.obliterate = Mock(side_effect=BTrDBError(code=404, msg="hello", mash="")) + endpoint.obliterate = Mock(side_effect=StreamNotFoundError()) stream = Stream(btrdb=BTrDB(endpoint), uuid=uu) with pytest.raises(BTrDBError): From 00099e425a9be0de96e83e842a29c39dc8361250 Mon Sep 17 00:00:00 2001 From: Michael Chestnut Date: Thu, 4 Mar 2021 09:57:04 -0600 Subject: [PATCH 3/7] fixes trailing whitespace --- btrdb/errors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/btrdb/errors.py b/btrdb/errors.py index 9f3d4db..86b9901 100644 --- a/btrdb/errors.py +++ b/btrdb/errors.py @@ -124,7 +124,7 @@ def wrap(*args, **kwargs): ########################################################################## # NOTE: this function relies on matching strings and isn't really ideal. -# this is more of a band-aid solution while we figure out how to send +# this is more of a band-aid solution while we figure out how to send # better errors from btrdb-server def handle_grpc_error(err): """ From f310ba8dd5df4cd819ce50ba4e3671089ff5605a Mon Sep 17 00:00:00 2001 From: Michael Chestnut Date: Thu, 4 Mar 2021 15:21:54 -0600 Subject: [PATCH 4/7] catch and reformat permissions denied error --- btrdb/errors.py | 9 ++++++--- btrdb/exceptions.py | 6 ++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/btrdb/errors.py b/btrdb/errors.py index 86b9901..91ec041 100644 --- a/btrdb/errors.py +++ b/btrdb/errors.py @@ -28,7 +28,8 @@ BadValue, RecycledUUID, BadSQLValue, - VersionNotAvailable + VersionNotAvailable, + PermissionDenied ) ########################################################################## @@ -141,9 +142,11 @@ def handle_grpc_error(err): raise StreamNotFoundError("Stream not found with provided uuid") from None elif details == "failed to connect to all addresses": raise ConnectionError("Failed to connect to BTrDB") from None - elif any(e in err.debug_error_string() for e in BTRDB_SERVER_ERRORS): + elif any(str(e) in err.debug_error_string() for e in BTRDB_SERVER_ERRORS): raise BTRDBServerError("An error has occured with btrdb-server") from None - raise BTrDBError(err.code(), details, None) from None + elif str(err.code()) == "StatusCode.PERMISSION_DENIED": + raise PermissionDenied(details) from None + raise BTrDBError(details) from None def check_proto_stat(stat): """ diff --git a/btrdb/exceptions.py b/btrdb/exceptions.py index 0d3486d..fdb9362 100644 --- a/btrdb/exceptions.py +++ b/btrdb/exceptions.py @@ -71,6 +71,12 @@ class AmbiguousStream(InvalidOperation): """ pass +class PermissionDenied(InvalidOperation): + """ + User does not have permission to perform an operation + """ + pass + class BTRDBValueError(ValueError, BTrDBError): """ An invalid value has been passed to a BTrDB operation. From bfe037c3de4de386e8d187ff5f7d77ac96bdf7af Mon Sep 17 00:00:00 2001 From: Michael Chestnut Date: Fri, 5 Mar 2021 09:36:22 -0600 Subject: [PATCH 5/7] simplified error handling decorator --- btrdb/endpoint.py | 18 +++++++++--------- btrdb/errors.py | 19 ++++++------------- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/btrdb/endpoint.py b/btrdb/endpoint.py index 0a05845..63244a3 100644 --- a/btrdb/endpoint.py +++ b/btrdb/endpoint.py @@ -30,14 +30,14 @@ from btrdb.point import RawPoint from btrdb.exceptions import BTrDBError from btrdb.utils.general import unpack_stream_descriptor -from btrdb.errors import error_handler, gen_error_handler, check_proto_stat +from btrdb.errors import error_handler, check_proto_stat class Endpoint(object): def __init__(self, channel): self.stub = btrdb_pb2_grpc.BTrDBStub(channel) - @gen_error_handler + @error_handler def rawValues(self, uu, start, end, version=0): params = btrdb_pb2.RawValuesParams( uuid=uu.bytes, start=start, end=end, versionMajor=version @@ -46,7 +46,7 @@ def rawValues(self, uu, start, end, version=0): check_proto_stat(result.stat) yield result.values, result.versionMajor - @gen_error_handler + @error_handler def alignedWindows(self, uu, start, end, pointwidth, version=0): params = btrdb_pb2.AlignedWindowsParams( uuid=uu.bytes, @@ -59,7 +59,7 @@ def alignedWindows(self, uu, start, end, pointwidth, version=0): check_proto_stat(result.stat) yield result.values, result.versionMajor - @gen_error_handler + @error_handler def windows(self, uu, start, end, width, depth, version=0): params = btrdb_pb2.WindowsParams( uuid=uu.bytes, @@ -148,7 +148,7 @@ def create(self, uu, collection, tags, annotations): result = self.stub.Create(params) check_proto_stat(result.stat) - @gen_error_handler + @error_handler def listCollections(self, prefix): """ Returns a generator for windows of collection paths matching search @@ -162,7 +162,7 @@ def listCollections(self, prefix): check_proto_stat(msg.stat) yield msg.collections - @gen_error_handler + @error_handler def lookupStreams(self, collection, isCollectionPrefix, tags, annotations): tagkvlist = [] for k, v in tags.items(): @@ -203,7 +203,7 @@ def nearest(self, uu, time, version, backward): check_proto_stat(result.stat) return result.value, result.versionMajor - @gen_error_handler + @error_handler def changes(self, uu, fromVersion, toVersion, resolution): params = btrdb_pb2.ChangesParams( uuid=uu.bytes, @@ -268,7 +268,7 @@ def getMetadataUsage(self, prefix): check_proto_stat(result.stat) return result.tags, result.annotations - @gen_error_handler + @error_handler def generateCSV(self, queryType, start, end, width, depth, includeVersions, *streams): protoStreams = [btrdb_pb2.StreamCSVConfig(version = stream[0], label = stream[1], @@ -285,7 +285,7 @@ def generateCSV(self, queryType, start, end, width, depth, includeVersions, *str check_proto_stat(result.stat) yield result.row - @gen_error_handler + @error_handler def sql_query(self, stmt, params=[]): request = btrdb_pb2.SQLQueryParams(query=stmt, params=params) for page in self.stub.SQLQuery(request): diff --git a/btrdb/errors.py b/btrdb/errors.py index 91ec041..322d41b 100644 --- a/btrdb/errors.py +++ b/btrdb/errors.py @@ -12,6 +12,7 @@ ########################################################################## import grpc +import inspect from functools import partial, wraps from btrdb.exceptions import ( BTrDBError, @@ -90,21 +91,19 @@ def consume_generator(fn, *args, **kwargs): yield from fn(*args, **kwargs) -def _error(fn, generator): +def error_handler(fn): """ Base decorator that checks endpoint functions for grpc.RpcErrors Parameters ---------- fn: function - generator: bool - specifies whether or not the input function is a generator """ # allows input func to keep its name and metadata @wraps(fn) def wrap(*args, **kwargs): try: - if generator: + if inspect.isgeneratorfunction(fn): # putting yield directly in this function turns it into a generator, # so keeping it separate consume_generator(fn, *args, **kwargs) @@ -113,13 +112,6 @@ def wrap(*args, **kwargs): handle_grpc_error(e) return wrap - -# Create two partial functions to be used as error handling decorators for -# endpoint functions. One is used for regular endpoints, and one is used for -# endpoints that are generators -error_handler = partial(_error, generator=False) -gen_error_handler = partial(_error, generator=True) - ########################################################################## ## gRPC error handling ########################################################################## @@ -129,7 +121,7 @@ def wrap(*args, **kwargs): # better errors from btrdb-server def handle_grpc_error(err): """ - Gets called by endpoint functions when a gRPC error is encountered. + Called by endpoint functions when a gRPC error is encountered. Checks error details strings to catch known errors, if error is not known then a generic BTrDBError gets raised @@ -142,12 +134,13 @@ def handle_grpc_error(err): raise StreamNotFoundError("Stream not found with provided uuid") from None elif details == "failed to connect to all addresses": raise ConnectionError("Failed to connect to BTrDB") from None - elif any(str(e) in err.debug_error_string() for e in BTRDB_SERVER_ERRORS): + elif any(str(e) in err.details() for e in BTRDB_SERVER_ERRORS): raise BTRDBServerError("An error has occured with btrdb-server") from None elif str(err.code()) == "StatusCode.PERMISSION_DENIED": raise PermissionDenied(details) from None raise BTrDBError(details) from None + def check_proto_stat(stat): """ Checks status of result after gRPC request and raises appropriate From fe43e0c08326438846dba6f7954291dca0d390ec Mon Sep 17 00:00:00 2001 From: Michael Chestnut Date: Thu, 18 Mar 2021 15:43:44 -0500 Subject: [PATCH 6/7] minor error handling decoratorfixes, standardized and more descriptive exception docstrings --- btrdb/endpoint.py | 3 +- btrdb/errors.py | 159 -------------------------------------- btrdb/exceptions.py | 184 ++++++++++++++++++++++++++++++++++++++------ 3 files changed, 162 insertions(+), 184 deletions(-) delete mode 100644 btrdb/errors.py diff --git a/btrdb/endpoint.py b/btrdb/endpoint.py index 63244a3..de5378e 100644 --- a/btrdb/endpoint.py +++ b/btrdb/endpoint.py @@ -28,9 +28,8 @@ from btrdb.grpcinterface import btrdb_pb2 from btrdb.grpcinterface import btrdb_pb2_grpc from btrdb.point import RawPoint -from btrdb.exceptions import BTrDBError +from btrdb.exceptions import BTrDBError, error_handler, check_proto_stat from btrdb.utils.general import unpack_stream_descriptor -from btrdb.errors import error_handler, check_proto_stat class Endpoint(object): diff --git a/btrdb/errors.py b/btrdb/errors.py deleted file mode 100644 index 322d41b..0000000 --- a/btrdb/errors.py +++ /dev/null @@ -1,159 +0,0 @@ -# btrdb.errors -# Module for error handling -# -# Author: PingThings -# Created: Thurs Feb 25 2021 -# -# For license information, see LICENSE.txt -# ID: errors.py [] michael.chestnut@pingthings.io $ - -########################################################################## -## Imports -########################################################################## - -import grpc -import inspect -from functools import partial, wraps -from btrdb.exceptions import ( - BTrDBError, - BTRDBServerError, - NoSuchPoint, - StreamNotFoundError, - InvalidCollection, - InvalidTagKey, - InvalidTagValue, - InvalidTimeRange, - InvalidPointWidth, - StreamExists, - AmbiguousStream, - BadValue, - RecycledUUID, - BadSQLValue, - VersionNotAvailable, - PermissionDenied -) - -########################################################################## -## Error mapping -########################################################################## - -# Errors that we have custom Exceptions for -BTRDB_ERRORS = { - 401: NoSuchPoint, - 404: StreamNotFoundError, - 407: InvalidCollection, - 408: InvalidTagKey, - 409: InvalidTagValue, - 413: InvalidTimeRange, - 415: InvalidPointWidth, - 417: StreamExists, - 418: AmbiguousStream, - 425: BadValue, - 429: RecycledUUID, - 441: BadSQLValue, - 450: VersionNotAvailable -} - -# All of these raise BTRDBServerError -BTRDB_SERVER_ERRORS = [ - 402, # ContextError - 403, # InsertFailure - 405, # WrongEndpoint - 414, # InsertTooBig - 421, # WrongArgs - 423, # AnnotationVersionMismatch - 424, # FaultInjectionDisabled - 426, # ResourceDepleted - 427, # InvalidVersions - 431, # ObliterateDisabled - 432, # CephError - 433, # NodeExisted - 434, # JournalError - 438, # InvalidParameter - 440, # MetadataConnectionError - 452, # OverlappingTrimRange - 453, # LockFailed - 454, # NoLeafNode - 455, # TierCacheError - 456, # StreamEmpty - 457, # TieredStorageBackendError - 458, # TieredStorageOutOfBounds - 459, # TieredStorageTemporaryError - 500, # InvariantFailure - 501, # NotImplemented - 502, # UnsupportedRollback -] - -########################################################################## -## Decorators -########################################################################## - -def consume_generator(fn, *args, **kwargs): - yield from fn(*args, **kwargs) - -def error_handler(fn): - """ - Base decorator that checks endpoint functions for grpc.RpcErrors - - Parameters - ---------- - fn: function - """ - # allows input func to keep its name and metadata - @wraps(fn) - def wrap(*args, **kwargs): - try: - if inspect.isgeneratorfunction(fn): - # putting yield directly in this function turns it into a generator, - # so keeping it separate - consume_generator(fn, *args, **kwargs) - return fn(*args, **kwargs) - except grpc.RpcError as e: - handle_grpc_error(e) - return wrap - -########################################################################## -## gRPC error handling -########################################################################## - -# NOTE: this function relies on matching strings and isn't really ideal. -# this is more of a band-aid solution while we figure out how to send -# better errors from btrdb-server -def handle_grpc_error(err): - """ - Called by endpoint functions when a gRPC error is encountered. - Checks error details strings to catch known errors, if error is not - known then a generic BTrDBError gets raised - - Parameters - ---------- - err: grpc.RpcError - """ - details = err.details() - if details == "[404] stream does not exist": - raise StreamNotFoundError("Stream not found with provided uuid") from None - elif details == "failed to connect to all addresses": - raise ConnectionError("Failed to connect to BTrDB") from None - elif any(str(e) in err.details() for e in BTRDB_SERVER_ERRORS): - raise BTRDBServerError("An error has occured with btrdb-server") from None - elif str(err.code()) == "StatusCode.PERMISSION_DENIED": - raise PermissionDenied(details) from None - raise BTrDBError(details) from None - - -def check_proto_stat(stat): - """ - Checks status of result after gRPC request and raises appropriate - error based on status code - - Parameters - ---------- - stat: btrdb_pb2.Status - """ - code = stat.code - if code != 0: - if code in BTRDB_ERRORS: - raise BTRDB_ERRORS[code](stat.msg) - elif code in BTRDB_SERVER_ERRORS: - raise BTRDBServerError(stat.msg) - raise BTrDBError(stat.msg) \ No newline at end of file diff --git a/btrdb/exceptions.py b/btrdb/exceptions.py index fdb9362..7ee29f8 100644 --- a/btrdb/exceptions.py +++ b/btrdb/exceptions.py @@ -7,138 +7,276 @@ # For license information, see LICENSE.txt # ID: exceptions.py [] allen@pingthings.io $ +########################################################################## +## Imports +########################################################################## + +import grpc +import inspect +from functools import wraps + +########################################################################## +## Decorators +########################################################################## + +def consume_generator(fn, *args, **kwargs): + # when a generator is passed back to the calling function, it may encounter an error + # when trying to call next(), in that case we want to yield an Exception + try: + yield from fn(*args, **kwargs) + except grpc.RpcError as e: + handle_grpc_error(e) + +def error_handler(fn): + """ + decorates endpoint functions and checks for grpc.RpcErrors + + Parameters + ---------- + fn: function + """ + # allows input func to keep its name and metadata + @wraps(fn) + def wrap(*args, **kwargs): + if inspect.isgeneratorfunction(fn): + return consume_generator(fn, *args, **kwargs) + try: + return fn(*args, **kwargs) + except grpc.RpcError as e: + handle_grpc_error(e) + return wrap + +########################################################################## +## gRPC error handling +########################################################################## + +# NOTE: this function relies on matching strings and isn't really ideal. +# this is more of a band-aid solution while we figure out how to send +# better errors from btrdb-server +def handle_grpc_error(err): + """ + Called by endpoint functions when a gRPC error is encountered. + Checks error details strings to catch known errors, if error is not + known then a generic BTrDBError gets raised + + Parameters + ---------- + err: grpc.RpcError + """ + details = err.details() + if details == "[404] stream does not exist": + raise StreamNotFoundError("Stream not found with provided uuid") from None + elif details == "failed to connect to all addresses": + raise ConnectionError("Failed to connect to BTrDB") from None + elif any(str(e) in err.details() for e in BTRDB_SERVER_ERRORS): + raise BTRDBServerError("An error has occured with btrdb-server") from None + elif str(err.code()) == "StatusCode.PERMISSION_DENIED": + raise PermissionDenied(details) from None + raise BTrDBError(details) from None + + +def check_proto_stat(stat): + """ + Checks status of result after gRPC request and raises appropriate + error based on status code + + Parameters + ---------- + stat: btrdb_pb2.Status + """ + code = stat.code + if code != 0: + if code in BTRDB_ERRORS: + raise BTRDB_ERRORS[code](stat.msg) + elif code in BTRDB_SERVER_ERRORS: + raise BTRDBServerError(stat.msg) + raise BTrDBError(stat.msg) + ########################################################################## ## BTrDB Exceptions ########################################################################## class BTrDBError(Exception): """ - The primary exception for grpc related errors. + The primary exception for BTrDB errors. """ pass class ConnectionError(BTrDBError): """ - An error has occurred while trying to establish a connection with BTrDB. + Raised when an error occurrs while trying to establish a connection with BTrDB. """ pass class StreamNotFoundError(BTrDBError): """ - A problem interacting with the BTrDB server. + Raised when attempting to perform an operation on a stream that does not exist in + the specified BTrDB allocation. """ pass class CredentialsFileNotFound(FileNotFoundError, BTrDBError): """ - The credentials file could not be found. + Raised when a credentials file could not be found. """ pass class ProfileNotFound(BTrDBError): """ - A requested profile could not be found in the credentials file. + Raised when a requested profile could not be found in the credentials file. """ pass class BTRDBServerError(BTrDBError): """ - An error occured with btrdb-server. + Raised when an error occurs with btrdb-server. """ pass class BTRDBTypeError(TypeError, BTrDBError): """ - A problem interacting with the BTrDB server. + Raised when attempting to perform an operation with an invalid type. """ pass class InvalidOperation(BTrDBError): """ - An invalid BTrDB operation has been requested. + Raised when an invalid BTrDB operation has been requested. """ pass class StreamExists(InvalidOperation): """ - Create() has been attempted and the uuid already exists + Raised when create() has been attempted and the uuid already exists. """ pass class AmbiguousStream(InvalidOperation): """ - Create() has been attempted and uuid is different, but collection and tags already exist + Raised when create() has been attempted and uuid is different, but collection and tags already exist """ pass class PermissionDenied(InvalidOperation): """ - User does not have permission to perform an operation + Raised when user does not have permission to perform an operation. """ pass class BTRDBValueError(ValueError, BTrDBError): """ - An invalid value has been passed to a BTrDB operation. + Raised when an invalid value has been passed to a BTrDB operation. """ pass class InvalidCollection(BTRDBValueError): """ - Collection name is invalid. It is either too long or not a valid string + Raised when a collection name is invalid. It is either too long or not a valid string. """ pass class InvalidTagKey(BTRDBValueError): """ - Tag key is invalid. Must be one of ("name", "unit", "ingress", "distiller") + Raised when a tag key is invalid. Must be one of ("name", "unit", "ingress", "distiller"). """ pass class InvalidTagValue(BTRDBValueError): """ - Tag value is invalid. It is either too long or not a valid string + Raised when a tag value is invalid. It is either too long or not a valid string. """ pass class InvalidTimeRange(BTRDBValueError): """ - Insert contains a timestamp outside the range of (btrdb.MINIMUM_TIME, btrdb.MAXIMUM_TIME) + Raised when insert data contains a timestamp outside the range of (btrdb.MINIMUM_TIME, btrdb.MAXIMUM_TIME) """ pass class InvalidPointWidth(BTRDBValueError): """ - Valid pointwidths are (0, 64) + Raised when attempting to use a pointwidth that is not a whole number between 0 and 64 (exclusive). """ pass class BadValue(BTRDBValueError): """ - Returned when you try to insert None values + Raised when attempting to insert data that contains non-float values such as None. """ pass class RecycledUUID(BTRDBValueError): """ - Returned if you try to create a stream with a uuid that matches a previously deleted stream + Raised when attempting to create a stream with a uuid that matches a previously deleted stream. """ pass class BadSQLValue(BTRDBValueError): """ - Invalid parameters have been passed to metadata db + Raised when invalid parameters have been passed to metadata db. """ pass class VersionNotAvailable(BTRDBValueError): """ - When querying a stream at a pruned version + Raised when querying a stream at a pruned or otherwise invalid version number. """ pass class NoSuchPoint(BTRDBValueError): """ - If you ask for next/previous point and there isn't one + Raised when asking for next/previous point and there isn't one. """ - pass \ No newline at end of file + pass + + +########################################################################## +## Exception mapping +########################################################################## + +# Errors that we have custom Exceptions for +BTRDB_ERRORS = { + 401: NoSuchPoint, + 404: StreamNotFoundError, + 407: InvalidCollection, + 408: InvalidTagKey, + 409: InvalidTagValue, + 413: InvalidTimeRange, + 415: InvalidPointWidth, + 417: StreamExists, + 418: AmbiguousStream, + 425: BadValue, + 429: RecycledUUID, + 441: BadSQLValue, + 450: VersionNotAvailable +} + +# All of these raise BTRDBServerError +BTRDB_SERVER_ERRORS = [ + 402, # ContextError + 403, # InsertFailure + 405, # WrongEndpoint + 414, # InsertTooBig + 421, # WrongArgs + 423, # AnnotationVersionMismatch + 424, # FaultInjectionDisabled + 426, # ResourceDepleted + 427, # InvalidVersions + 431, # ObliterateDisabled + 432, # CephError + 433, # NodeExisted + 434, # JournalError + 438, # InvalidParameter + 440, # MetadataConnectionError + 452, # OverlappingTrimRange + 453, # LockFailed + 454, # NoLeafNode + 455, # TierCacheError + 456, # StreamEmpty + 457, # TieredStorageBackendError + 458, # TieredStorageOutOfBounds + 459, # TieredStorageTemporaryError + 500, # InvariantFailure + 501, # NotImplemented + 502, # UnsupportedRollback +] \ No newline at end of file From a236fd2a85e0fc8d2a39e148e563e2817bb3fbb6 Mon Sep 17 00:00:00 2001 From: Michael Chestnut Date: Fri, 19 Mar 2021 12:12:02 -0500 Subject: [PATCH 7/7] fix grpc imports --- btrdb/exceptions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/btrdb/exceptions.py b/btrdb/exceptions.py index 7ee29f8..2007503 100644 --- a/btrdb/exceptions.py +++ b/btrdb/exceptions.py @@ -11,8 +11,8 @@ ## Imports ########################################################################## -import grpc import inspect +from grpc import RpcError from functools import wraps ########################################################################## @@ -24,7 +24,7 @@ def consume_generator(fn, *args, **kwargs): # when trying to call next(), in that case we want to yield an Exception try: yield from fn(*args, **kwargs) - except grpc.RpcError as e: + except RpcError as e: handle_grpc_error(e) def error_handler(fn): @@ -42,7 +42,7 @@ def wrap(*args, **kwargs): return consume_generator(fn, *args, **kwargs) try: return fn(*args, **kwargs) - except grpc.RpcError as e: + except RpcError as e: handle_grpc_error(e) return wrap