Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions btrdb/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from btrdb.stream import Stream, StreamSet
from btrdb.utils.general import unpack_stream_descriptor
from btrdb.utils.conversion import to_uuid
from btrdb.exceptions import NotFound, InvalidOperation
from btrdb.exceptions import StreamNotFoundError, InvalidOperation

##########################################################################
## Module Variables
Expand Down Expand Up @@ -187,7 +187,7 @@ def streams(self, *identifiers, versions=None, is_collection_prefix=False):
if len(found) == 1:
streams.append(found[0])
continue
raise NotFound(f"Could not identify stream `{ident}`")
raise StreamNotFoundError(f"Could not identify stream `{ident}`")

raise ValueError(f"Could not identify stream based on `{ident}`. Identifier must be UUID or collection/name.")

Expand Down
190 changes: 128 additions & 62 deletions btrdb/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,68 @@
from btrdb.grpcinterface import btrdb_pb2
from btrdb.grpcinterface import btrdb_pb2_grpc
from btrdb.point import RawPoint
from btrdb.exceptions import BTrDBError
from btrdb.exceptions import BTrDBError, error_handler, check_proto_stat
from btrdb.utils.general import unpack_stream_descriptor


class Endpoint(object):
def __init__(self, channel):
self.stub = btrdb_pb2_grpc.BTrDBStub(channel)

def rawValues(self, uu, start, end, version = 0):
params = btrdb_pb2.RawValuesParams(uuid = uu.bytes, start = start, end = end, versionMajor = version)
@error_handler
def rawValues(self, uu, start, end, version=0):
params = btrdb_pb2.RawValuesParams(
uuid=uu.bytes, start=start, end=end, versionMajor=version
)
for result in self.stub.RawValues(params):
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
yield result.values, result.versionMajor

def alignedWindows(self, uu, start, end, pointwidth, version = 0):
params = btrdb_pb2.AlignedWindowsParams(uuid = uu.bytes, start = start, end = end, versionMajor = version, pointWidth = int(pointwidth))
@error_handler
def alignedWindows(self, uu, start, end, pointwidth, version=0):
params = btrdb_pb2.AlignedWindowsParams(
uuid=uu.bytes,
start=start,
end=end,
versionMajor=version,
pointWidth=int(pointwidth),
)
for result in self.stub.AlignedWindows(params):
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
yield result.values, result.versionMajor

def windows(self, uu, start, end, width, depth, version = 0):
params = btrdb_pb2.WindowsParams(uuid = uu.bytes, start = start, end = end, versionMajor = version, width = width, depth = depth)
@error_handler
def windows(self, uu, start, end, width, depth, version=0):
params = btrdb_pb2.WindowsParams(
uuid=uu.bytes,
start=start,
end=end,
versionMajor=version,
width=width,
depth=depth,
)
for result in self.stub.Windows(params):
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
yield result.values, result.versionMajor

@error_handler
def streamInfo(self, uu, omitDescriptor, omitVersion):
params = btrdb_pb2.StreamInfoParams(uuid = uu.bytes, omitVersion = omitVersion, omitDescriptor = omitDescriptor)
params = btrdb_pb2.StreamInfoParams(
uuid=uu.bytes, omitVersion=omitVersion, omitDescriptor=omitDescriptor
)
result = self.stub.StreamInfo(params)
desc = result.descriptor
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
tagsanns = unpack_stream_descriptor(desc)
return desc.collection, desc.propertyVersion, tagsanns[0], tagsanns[1], result.versionMajor

@error_handler
def obliterate(self, uu):
params = btrdb_pb2.ObliterateParams(uuid = uu.bytes)
params = btrdb_pb2.ObliterateParams(uuid=uu.bytes)
result = self.stub.Obliterate(params)
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)

@error_handler
def setStreamAnnotations(self, uu, expected, changes, removals):
annkvlist = []
for k, v in changes.items():
Expand All @@ -74,13 +98,19 @@ def setStreamAnnotations(self, uu, expected, changes, removals):
else:
if isinstance(v, str):
v = v.encode("utf-8")
ov = btrdb_pb2.OptValue(value = v)
kv = btrdb_pb2.KeyOptValue(key = k, val = ov)
ov = btrdb_pb2.OptValue(value=v)
kv = btrdb_pb2.KeyOptValue(key=k, val=ov)
annkvlist.append(kv)
params = btrdb_pb2.SetStreamAnnotationsParams(uuid=uu.bytes, expectedPropertyVersion=expected, changes=annkvlist, removals=removals)
params = btrdb_pb2.SetStreamAnnotationsParams(
uuid=uu.bytes,
expectedPropertyVersion=expected,
changes=annkvlist,
removals=removals,
)
result = self.stub.SetStreamAnnotations(params)
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)

@error_handler
def setStreamTags(self, uu, expected, tags, collection):
tag_data = []
for k, v in tags.items():
Expand All @@ -89,13 +119,19 @@ def setStreamTags(self, uu, expected, tags, collection):
else:
if isinstance(v, str):
v = v.encode("utf-8")
ov = btrdb_pb2.OptValue(value = v)
kv = btrdb_pb2.KeyOptValue(key = k, val = ov)
ov = btrdb_pb2.OptValue(value=v)
kv = btrdb_pb2.KeyOptValue(key=k, val=ov)
tag_data.append(kv)
params = btrdb_pb2.SetStreamTagsParams(uuid=uu.bytes, expectedPropertyVersion=expected, tags=tag_data, collection=collection)
params = btrdb_pb2.SetStreamTagsParams(
uuid=uu.bytes,
expectedPropertyVersion=expected,
tags=tag_data,
collection=collection,
)
result = self.stub.SetStreamTags(params)
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)

@error_handler
def create(self, uu, collection, tags, annotations):
tagkvlist = []
for k, v in tags.items():
Expand All @@ -105,10 +141,13 @@ def create(self, uu, collection, tags, annotations):
for k, v in annotations.items():
kv = btrdb_pb2.KeyOptValue(key = k, val = btrdb_pb2.OptValue(value=v))
annkvlist.append(kv)
params = btrdb_pb2.CreateParams(uuid = uu.bytes, collection = collection, tags = tagkvlist, annotations = annkvlist)
params = btrdb_pb2.CreateParams(
uuid=uu.bytes, collection=collection, tags=tagkvlist, annotations=annkvlist
)
result = self.stub.Create(params)
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)

@error_handler
def listCollections(self, prefix):
"""
Returns a generator for windows of collection paths matching search
Expand All @@ -119,9 +158,10 @@ def listCollections(self, prefix):
"""
params = btrdb_pb2.ListCollectionsParams(prefix=prefix)
for msg in self.stub.ListCollections(params):
BTrDBError.checkProtoStat(msg.stat)
check_proto_stat(msg.stat)
yield msg.collections

@error_handler
def lookupStreams(self, collection, isCollectionPrefix, tags, annotations):
tagkvlist = []
for k, v in tags.items():
Expand All @@ -130,8 +170,8 @@ def lookupStreams(self, collection, isCollectionPrefix, tags, annotations):
else:
if isinstance(v, str):
v = v.encode("utf-8")
ov = btrdb_pb2.OptValue(value = v)
kv = btrdb_pb2.KeyOptValue(key = k, val = ov)
ov = btrdb_pb2.OptValue(value=v)
kv = btrdb_pb2.KeyOptValue(key=k, val=ov)
tagkvlist.append(kv)
annkvlist = []
for k, v in annotations.items():
Expand All @@ -140,87 +180,113 @@ def lookupStreams(self, collection, isCollectionPrefix, tags, annotations):
else:
if isinstance(v, str):
v = v.encode("utf-8")
ov = btrdb_pb2.OptValue(value = v)
kv = btrdb_pb2.KeyOptValue(key = k, val = ov)
ov = btrdb_pb2.OptValue(value=v)
kv = btrdb_pb2.KeyOptValue(key=k, val=ov)
annkvlist.append(kv)
params = btrdb_pb2.LookupStreamsParams(collection = collection, isCollectionPrefix = isCollectionPrefix, tags = tagkvlist, annotations = annkvlist)
params = btrdb_pb2.LookupStreamsParams(
collection=collection,
isCollectionPrefix=isCollectionPrefix,
tags=tagkvlist,
annotations=annkvlist,
)
for result in self.stub.LookupStreams(params):
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
yield result.results

@error_handler
def nearest(self, uu, time, version, backward):
params = btrdb_pb2.NearestParams(uuid = uu.bytes, time = time, versionMajor = version, backward = backward)
params = btrdb_pb2.NearestParams(
uuid=uu.bytes, time=time, versionMajor=version, backward=backward
)
result = self.stub.Nearest(params)
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
return result.value, result.versionMajor


@error_handler
def changes(self, uu, fromVersion, toVersion, resolution):
params = btrdb_pb2.ChangesParams(uuid = uu.bytes, fromMajor = fromVersion, toMajor = toVersion, resolution = resolution)
params = btrdb_pb2.ChangesParams(
uuid=uu.bytes,
fromMajor=fromVersion,
toMajor=toVersion,
resolution=resolution,
)
for result in self.stub.Changes(params):
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
yield result.ranges, result.versionMajor

@error_handler
def insert(self, uu, values, policy):
policy_map = {
'never': btrdb_pb2.MergePolicy.NEVER,
'equal': btrdb_pb2.MergePolicy.EQUAL,
'retain': btrdb_pb2.MergePolicy.RETAIN,
'replace': btrdb_pb2.MergePolicy.REPLACE,
"never": btrdb_pb2.MergePolicy.NEVER,
"equal": btrdb_pb2.MergePolicy.EQUAL,
"retain": btrdb_pb2.MergePolicy.RETAIN,
"replace": btrdb_pb2.MergePolicy.REPLACE,
}
protoValues = RawPoint.to_proto_list(values)
params = btrdb_pb2.InsertParams(uuid = uu.bytes, sync = False, values = protoValues, merge_policy = policy_map[policy])
params = btrdb_pb2.InsertParams(
uuid=uu.bytes,
sync=False,
values=protoValues,
merge_policy=policy_map[policy],
)
result = self.stub.Insert(params)
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
return result.versionMajor

@error_handler
def deleteRange(self, uu, start, end):
params = btrdb_pb2.DeleteParams(uuid = uu.bytes, start = start, end = end)
params = btrdb_pb2.DeleteParams(uuid=uu.bytes, start=start, end=end)
result = self.stub.Delete(params)
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
return result.versionMajor

@error_handler
def info(self):
params = btrdb_pb2.InfoParams()
result = self.stub.Info(params)
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
return result

@error_handler
def faultInject(self, typ, args):
params = btrdb_pb2.FaultInjectParams(type = typ, params = args)
params = btrdb_pb2.FaultInjectParams(type=typ, params=args)
result = self.stub.FaultInject(params)
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
return result.rv

@error_handler
def flush(self, uu):
params = btrdb_pb2.FlushParams(uuid = uu.bytes)
params = btrdb_pb2.FlushParams(uuid=uu.bytes)
result = self.stub.Flush(params)
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)

@error_handler
def getMetadataUsage(self, prefix):
params = btrdb_pb2.MetadataUsageParams(prefix = prefix)
params = btrdb_pb2.MetadataUsageParams(prefix=prefix)
result = self.stub.GetMetadataUsage(params)
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
return result.tags, result.annotations

@error_handler
def generateCSV(self, queryType, start, end, width, depth, includeVersions, *streams):
protoStreams = [btrdb_pb2.StreamCSVConfig(version = stream[0],
label = stream[1],
uuid = stream[2].bytes)
label = stream[1],
uuid = stream[2].bytes)
for stream in streams]
params = btrdb_pb2.GenerateCSVParams(queryType = queryType.to_proto(),
startTime = start,
endTime = end,
windowSize = width,
depth = depth,
includeVersions = includeVersions,
streams = protoStreams)
result = self.stub.GenerateCSV(params)
startTime = start,
endTime = end,
windowSize = width,
depth = depth,
includeVersions = includeVersions,
streams = protoStreams)
for result in self.stub.GenerateCSV(params):
BTrDBError.checkProtoStat(result.stat)
check_proto_stat(result.stat)
yield result.row

@error_handler
def sql_query(self, stmt, params=[]):
request = btrdb_pb2.SQLQueryParams(query=stmt, params=params)
for page in self.stub.SQLQuery(request):
BTrDBError.checkProtoStat(page.stat)
check_proto_stat(page.stat)
yield page.SQLQueryRow
Loading