From 001865c138ef773c6ff13a35992019b25c6c6344 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 29 Apr 2016 12:51:41 -0700 Subject: [PATCH 1/4] Improve unix timestamp -> datetime conversion, update to riak_pb 2.1.3.0 --- riak/codecs/ttb.py | 23 +++---- riak/pb/messages.py | 8 +-- riak/pb/riak_pb2.py | 72 +------------------- riak/pb/riak_ts_pb2.py | 109 ++++++++---------------------- riak/tests/__init__.py | 8 +++ riak/tests/base.py | 37 +++------- riak/tests/test_timeseries_ttb.py | 2 +- riak/tests/test_util.py | 65 +++++++++++++++++- riak/transports/tcp/transport.py | 17 ++--- riak/transports/transport.py | 4 +- riak/util.py | 15 ++-- riak_pb | 2 +- 12 files changed, 149 insertions(+), 213 deletions(-) diff --git a/riak/codecs/ttb.py b/riak/codecs/ttb.py index 6bb81f0d..e8358326 100644 --- a/riak/codecs/ttb.py +++ b/riak/codecs/ttb.py @@ -6,6 +6,7 @@ from riak import RiakError from riak.codecs import Codec, Msg +from riak.pb.messages import MSG_CODE_TS_TTB_MSG from riak.ts_object import TsColumns from riak.util import bytes_to_str, unix_time_millis, \ datetime_from_unix_time_millis @@ -23,9 +24,6 @@ tsdelreq_a = Atom('tsdelreq') timestamp_a = Atom('timestamp') -# TODO RTS-842 -MSG_CODE_TS_TTB = 104 - class TtbCodec(Codec): ''' @@ -36,7 +34,7 @@ def __init__(self, **unused_args): super(TtbCodec, self).__init__(**unused_args) def parse_msg(self, msg_code, data): - if msg_code != MSG_CODE_TS_TTB: + if msg_code != MSG_CODE_TS_TTB_MSG: raise RiakError("TTB can't parse code: {}".format(msg_code)) if len(data) > 0: decoded = decode(data) @@ -61,6 +59,7 @@ def encode_to_ts_cell(self, cell): else: if isinstance(cell, datetime.datetime): ts = unix_time_millis(cell) + # logging.debug('encoded datetime %s as %s', cell, ts) return ts elif isinstance(cell, bool): return cell @@ -84,19 +83,19 @@ def encode_timeseries_keyreq(self, table, key, is_delete=False): else: raise ValueError("key must be a list") - mc = MSG_CODE_TS_TTB - rc = MSG_CODE_TS_TTB + mc = MSG_CODE_TS_TTB_MSG + rc = MSG_CODE_TS_TTB_MSG req_atom = tsgetreq_a if is_delete: req_atom = tsdelreq_a - # TODO RTS-842 timeout is last + # TODO FUTURE add timeout as last param req = req_atom, table.name, \ [self.encode_to_ts_cell(k) for k in key_vals], udef_a return Msg(mc, encode(req), rc) def validate_timeseries_put_resp(self, resp_code, resp): - if resp is None and resp_code == MSG_CODE_TS_TTB: + if resp is None and resp_code == MSG_CODE_TS_TTB_MSG: return True if resp is not None: return True @@ -123,8 +122,8 @@ def encode_timeseries_put(self, tsobj): req_r.append(self.encode_to_ts_cell(cell)) req_rows.append(tuple(req_r)) req = tsputreq_a, tsobj.table.name, [], req_rows - mc = MSG_CODE_TS_TTB - rc = MSG_CODE_TS_TTB + mc = MSG_CODE_TS_TTB_MSG + rc = MSG_CODE_TS_TTB_MSG return Msg(mc, encode(req), rc) else: raise RiakError("TsObject requires a list of rows") @@ -135,8 +134,8 @@ def encode_timeseries_query(self, table, query, interpolations=None): q = q.format(table=table.name) tsi = tsinterpolation_a, q, [] req = tsqueryreq_a, tsi, False, [] - mc = MSG_CODE_TS_TTB - rc = MSG_CODE_TS_TTB + mc = MSG_CODE_TS_TTB_MSG + rc = MSG_CODE_TS_TTB_MSG return Msg(mc, encode(req), rc) def decode_timeseries(self, resp_ttb, tsobj): diff --git a/riak/pb/messages.py b/riak/pb/messages.py index 9bbf284a..76c25e82 100644 --- a/riak/pb/messages.py +++ b/riak/pb/messages.py @@ -81,9 +81,7 @@ MSG_CODE_TS_COVERAGE_RESP = 101 MSG_CODE_TS_COVERAGE_ENTRY = 102 MSG_CODE_TS_RANGE = 103 -MSG_CODE_TS_TTB_PUT_REQ = 104 -MSG_CODE_TOGGLE_ENCODING_REQ = 110 -MSG_CODE_TOGGLE_ENCODING_RESP = 111 +MSG_CODE_TS_TTB_MSG = 104 MSG_CODE_AUTH_REQ = 253 MSG_CODE_AUTH_RESP = 254 MSG_CODE_START_TLS = 255 @@ -168,9 +166,7 @@ MSG_CODE_TS_COVERAGE_RESP: riak.pb.riak_ts_pb2.TsCoverageResp, MSG_CODE_TS_COVERAGE_ENTRY: riak.pb.riak_ts_pb2.TsCoverageEntry, MSG_CODE_TS_RANGE: riak.pb.riak_ts_pb2.TsRange, - MSG_CODE_TS_TTB_PUT_REQ: riak.pb.riak_ts_pb2.TsTtbPutReq, - MSG_CODE_TOGGLE_ENCODING_REQ: riak.pb.riak_pb2.RpbToggleEncodingReq, - MSG_CODE_TOGGLE_ENCODING_RESP: riak.pb.riak_pb2.RpbToggleEncodingResp, + MSG_CODE_TS_TTB_MSG: None, MSG_CODE_AUTH_REQ: riak.pb.riak_pb2.RpbAuthReq, MSG_CODE_AUTH_RESP: None, MSG_CODE_START_TLS: None diff --git a/riak/pb/riak_pb2.py b/riak/pb/riak_pb2.py index d55a142c..a757940a 100644 --- a/riak/pb/riak_pb2.py +++ b/riak/pb/riak_pb2.py @@ -14,7 +14,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='riak.proto', package='', - serialized_pb='\n\nriak.proto\"/\n\x0cRpbErrorResp\x12\x0e\n\x06\x65rrmsg\x18\x01 \x02(\x0c\x12\x0f\n\x07\x65rrcode\x18\x02 \x02(\r\"<\n\x14RpbGetServerInfoResp\x12\x0c\n\x04node\x18\x01 \x01(\x0c\x12\x16\n\x0eserver_version\x18\x02 \x01(\x0c\"%\n\x07RpbPair\x12\x0b\n\x03key\x18\x01 \x02(\x0c\x12\r\n\x05value\x18\x02 \x01(\x0c\"/\n\x0fRpbGetBucketReq\x12\x0e\n\x06\x62ucket\x18\x01 \x02(\x0c\x12\x0c\n\x04type\x18\x02 \x01(\x0c\"2\n\x10RpbGetBucketResp\x12\x1e\n\x05props\x18\x01 \x02(\x0b\x32\x0f.RpbBucketProps\"O\n\x0fRpbSetBucketReq\x12\x0e\n\x06\x62ucket\x18\x01 \x02(\x0c\x12\x1e\n\x05props\x18\x02 \x02(\x0b\x32\x0f.RpbBucketProps\x12\x0c\n\x04type\x18\x03 \x01(\x0c\"1\n\x11RpbResetBucketReq\x12\x0e\n\x06\x62ucket\x18\x01 \x02(\x0c\x12\x0c\n\x04type\x18\x02 \x01(\x0c\"#\n\x13RpbGetBucketTypeReq\x12\x0c\n\x04type\x18\x01 \x02(\x0c\"C\n\x13RpbSetBucketTypeReq\x12\x0c\n\x04type\x18\x01 \x02(\x0c\x12\x1e\n\x05props\x18\x02 \x02(\x0b\x32\x0f.RpbBucketProps\"-\n\tRpbModFun\x12\x0e\n\x06module\x18\x01 \x02(\x0c\x12\x10\n\x08\x66unction\x18\x02 \x02(\x0c\"9\n\rRpbCommitHook\x12\x1a\n\x06modfun\x18\x01 \x01(\x0b\x32\n.RpbModFun\x12\x0c\n\x04name\x18\x02 \x01(\x0c\"\xb0\x05\n\x0eRpbBucketProps\x12\r\n\x05n_val\x18\x01 \x01(\r\x12\x12\n\nallow_mult\x18\x02 \x01(\x08\x12\x17\n\x0flast_write_wins\x18\x03 \x01(\x08\x12!\n\tprecommit\x18\x04 \x03(\x0b\x32\x0e.RpbCommitHook\x12\x1c\n\rhas_precommit\x18\x05 \x01(\x08:\x05\x66\x61lse\x12\"\n\npostcommit\x18\x06 \x03(\x0b\x32\x0e.RpbCommitHook\x12\x1d\n\x0ehas_postcommit\x18\x07 \x01(\x08:\x05\x66\x61lse\x12 \n\x0c\x63hash_keyfun\x18\x08 \x01(\x0b\x32\n.RpbModFun\x12\x1b\n\x07linkfun\x18\t \x01(\x0b\x32\n.RpbModFun\x12\x12\n\nold_vclock\x18\n \x01(\r\x12\x14\n\x0cyoung_vclock\x18\x0b \x01(\r\x12\x12\n\nbig_vclock\x18\x0c \x01(\r\x12\x14\n\x0csmall_vclock\x18\r \x01(\r\x12\n\n\x02pr\x18\x0e \x01(\r\x12\t\n\x01r\x18\x0f \x01(\r\x12\t\n\x01w\x18\x10 \x01(\r\x12\n\n\x02pw\x18\x11 \x01(\r\x12\n\n\x02\x64w\x18\x12 \x01(\r\x12\n\n\x02rw\x18\x13 \x01(\r\x12\x14\n\x0c\x62\x61sic_quorum\x18\x14 \x01(\x08\x12\x13\n\x0bnotfound_ok\x18\x15 \x01(\x08\x12\x0f\n\x07\x62\x61\x63kend\x18\x16 \x01(\x0c\x12\x0e\n\x06search\x18\x17 \x01(\x08\x12)\n\x04repl\x18\x18 \x01(\x0e\x32\x1b.RpbBucketProps.RpbReplMode\x12\x14\n\x0csearch_index\x18\x19 \x01(\x0c\x12\x10\n\x08\x64\x61tatype\x18\x1a \x01(\x0c\x12\x12\n\nconsistent\x18\x1b \x01(\x08\x12\x12\n\nwrite_once\x18\x1c \x01(\x08\">\n\x0bRpbReplMode\x12\t\n\x05\x46\x41LSE\x10\x00\x12\x0c\n\x08REALTIME\x10\x01\x12\x0c\n\x08\x46ULLSYNC\x10\x02\x12\x08\n\x04TRUE\x10\x03\",\n\nRpbAuthReq\x12\x0c\n\x04user\x18\x01 \x02(\x0c\x12\x10\n\x08password\x18\x02 \x02(\x0c\"*\n\x14RpbToggleEncodingReq\x12\x12\n\nuse_native\x18\x01 \x02(\x08\"+\n\x15RpbToggleEncodingResp\x12\x12\n\nuse_native\x18\x01 \x02(\x08\x42!\n\x17\x63om.basho.riak.protobufB\x06RiakPB') + serialized_pb='\n\nriak.proto\"/\n\x0cRpbErrorResp\x12\x0e\n\x06\x65rrmsg\x18\x01 \x02(\x0c\x12\x0f\n\x07\x65rrcode\x18\x02 \x02(\r\"<\n\x14RpbGetServerInfoResp\x12\x0c\n\x04node\x18\x01 \x01(\x0c\x12\x16\n\x0eserver_version\x18\x02 \x01(\x0c\"%\n\x07RpbPair\x12\x0b\n\x03key\x18\x01 \x02(\x0c\x12\r\n\x05value\x18\x02 \x01(\x0c\"/\n\x0fRpbGetBucketReq\x12\x0e\n\x06\x62ucket\x18\x01 \x02(\x0c\x12\x0c\n\x04type\x18\x02 \x01(\x0c\"2\n\x10RpbGetBucketResp\x12\x1e\n\x05props\x18\x01 \x02(\x0b\x32\x0f.RpbBucketProps\"O\n\x0fRpbSetBucketReq\x12\x0e\n\x06\x62ucket\x18\x01 \x02(\x0c\x12\x1e\n\x05props\x18\x02 \x02(\x0b\x32\x0f.RpbBucketProps\x12\x0c\n\x04type\x18\x03 \x01(\x0c\"1\n\x11RpbResetBucketReq\x12\x0e\n\x06\x62ucket\x18\x01 \x02(\x0c\x12\x0c\n\x04type\x18\x02 \x01(\x0c\"#\n\x13RpbGetBucketTypeReq\x12\x0c\n\x04type\x18\x01 \x02(\x0c\"C\n\x13RpbSetBucketTypeReq\x12\x0c\n\x04type\x18\x01 \x02(\x0c\x12\x1e\n\x05props\x18\x02 \x02(\x0b\x32\x0f.RpbBucketProps\"-\n\tRpbModFun\x12\x0e\n\x06module\x18\x01 \x02(\x0c\x12\x10\n\x08\x66unction\x18\x02 \x02(\x0c\"9\n\rRpbCommitHook\x12\x1a\n\x06modfun\x18\x01 \x01(\x0b\x32\n.RpbModFun\x12\x0c\n\x04name\x18\x02 \x01(\x0c\"\xb0\x05\n\x0eRpbBucketProps\x12\r\n\x05n_val\x18\x01 \x01(\r\x12\x12\n\nallow_mult\x18\x02 \x01(\x08\x12\x17\n\x0flast_write_wins\x18\x03 \x01(\x08\x12!\n\tprecommit\x18\x04 \x03(\x0b\x32\x0e.RpbCommitHook\x12\x1c\n\rhas_precommit\x18\x05 \x01(\x08:\x05\x66\x61lse\x12\"\n\npostcommit\x18\x06 \x03(\x0b\x32\x0e.RpbCommitHook\x12\x1d\n\x0ehas_postcommit\x18\x07 \x01(\x08:\x05\x66\x61lse\x12 \n\x0c\x63hash_keyfun\x18\x08 \x01(\x0b\x32\n.RpbModFun\x12\x1b\n\x07linkfun\x18\t \x01(\x0b\x32\n.RpbModFun\x12\x12\n\nold_vclock\x18\n \x01(\r\x12\x14\n\x0cyoung_vclock\x18\x0b \x01(\r\x12\x12\n\nbig_vclock\x18\x0c \x01(\r\x12\x14\n\x0csmall_vclock\x18\r \x01(\r\x12\n\n\x02pr\x18\x0e \x01(\r\x12\t\n\x01r\x18\x0f \x01(\r\x12\t\n\x01w\x18\x10 \x01(\r\x12\n\n\x02pw\x18\x11 \x01(\r\x12\n\n\x02\x64w\x18\x12 \x01(\r\x12\n\n\x02rw\x18\x13 \x01(\r\x12\x14\n\x0c\x62\x61sic_quorum\x18\x14 \x01(\x08\x12\x13\n\x0bnotfound_ok\x18\x15 \x01(\x08\x12\x0f\n\x07\x62\x61\x63kend\x18\x16 \x01(\x0c\x12\x0e\n\x06search\x18\x17 \x01(\x08\x12)\n\x04repl\x18\x18 \x01(\x0e\x32\x1b.RpbBucketProps.RpbReplMode\x12\x14\n\x0csearch_index\x18\x19 \x01(\x0c\x12\x10\n\x08\x64\x61tatype\x18\x1a \x01(\x0c\x12\x12\n\nconsistent\x18\x1b \x01(\x08\x12\x12\n\nwrite_once\x18\x1c \x01(\x08\">\n\x0bRpbReplMode\x12\t\n\x05\x46\x41LSE\x10\x00\x12\x0c\n\x08REALTIME\x10\x01\x12\x0c\n\x08\x46ULLSYNC\x10\x02\x12\x08\n\x04TRUE\x10\x03\",\n\nRpbAuthReq\x12\x0c\n\x04user\x18\x01 \x02(\x0c\x12\x10\n\x08password\x18\x02 \x02(\x0c\x42!\n\x17\x63om.basho.riak.protobufB\x06RiakPB') @@ -678,62 +678,6 @@ serialized_end=1344, ) - -_RPBTOGGLEENCODINGREQ = _descriptor.Descriptor( - name='RpbToggleEncodingReq', - full_name='RpbToggleEncodingReq', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='use_native', full_name='RpbToggleEncodingReq.use_native', index=0, - number=1, type=8, cpp_type=7, label=2, - has_default_value=False, default_value=False, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=1346, - serialized_end=1388, -) - - -_RPBTOGGLEENCODINGRESP = _descriptor.Descriptor( - name='RpbToggleEncodingResp', - full_name='RpbToggleEncodingResp', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='use_native', full_name='RpbToggleEncodingResp.use_native', index=0, - number=1, type=8, cpp_type=7, label=2, - has_default_value=False, default_value=False, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=1390, - serialized_end=1433, -) - _RPBGETBUCKETRESP.fields_by_name['props'].message_type = _RPBBUCKETPROPS _RPBSETBUCKETREQ.fields_by_name['props'].message_type = _RPBBUCKETPROPS _RPBSETBUCKETTYPEREQ.fields_by_name['props'].message_type = _RPBBUCKETPROPS @@ -757,8 +701,6 @@ DESCRIPTOR.message_types_by_name['RpbCommitHook'] = _RPBCOMMITHOOK DESCRIPTOR.message_types_by_name['RpbBucketProps'] = _RPBBUCKETPROPS DESCRIPTOR.message_types_by_name['RpbAuthReq'] = _RPBAUTHREQ -DESCRIPTOR.message_types_by_name['RpbToggleEncodingReq'] = _RPBTOGGLEENCODINGREQ -DESCRIPTOR.message_types_by_name['RpbToggleEncodingResp'] = _RPBTOGGLEENCODINGRESP @add_metaclass(_reflection.GeneratedProtocolMessageType) class RpbErrorResp(_message.Message): @@ -838,18 +780,6 @@ class RpbAuthReq(_message.Message): # @@protoc_insertion_point(class_scope:RpbAuthReq) -@add_metaclass(_reflection.GeneratedProtocolMessageType) -class RpbToggleEncodingReq(_message.Message): - DESCRIPTOR = _RPBTOGGLEENCODINGREQ - - # @@protoc_insertion_point(class_scope:RpbToggleEncodingReq) - -@add_metaclass(_reflection.GeneratedProtocolMessageType) -class RpbToggleEncodingResp(_message.Message): - DESCRIPTOR = _RPBTOGGLEENCODINGRESP - - # @@protoc_insertion_point(class_scope:RpbToggleEncodingResp) - DESCRIPTOR.has_options = True DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), '\n\027com.basho.riak.protobufB\006RiakPB') diff --git a/riak/pb/riak_ts_pb2.py b/riak/pb/riak_ts_pb2.py index ce9b250f..6e2ee149 100644 --- a/riak/pb/riak_ts_pb2.py +++ b/riak/pb/riak_ts_pb2.py @@ -16,7 +16,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='riak_ts.proto', package='', - serialized_pb='\n\rriak_ts.proto\x1a\nriak.proto\"[\n\nTsQueryReq\x12\x1f\n\x05query\x18\x01 \x01(\x0b\x32\x10.TsInterpolation\x12\x15\n\x06stream\x18\x02 \x01(\x08:\x05\x66\x61lse\x12\x15\n\rcover_context\x18\x03 \x01(\x0c\"^\n\x0bTsQueryResp\x12%\n\x07\x63olumns\x18\x01 \x03(\x0b\x32\x14.TsColumnDescription\x12\x14\n\x04rows\x18\x02 \x03(\x0b\x32\x06.TsRow\x12\x12\n\x04\x64one\x18\x03 \x01(\x08:\x04true\"@\n\x08TsGetReq\x12\r\n\x05table\x18\x01 \x02(\x0c\x12\x14\n\x03key\x18\x02 \x03(\x0b\x32\x07.TsCell\x12\x0f\n\x07timeout\x18\x03 \x01(\r\"H\n\tTsGetResp\x12%\n\x07\x63olumns\x18\x01 \x03(\x0b\x32\x14.TsColumnDescription\x12\x14\n\x04rows\x18\x02 \x03(\x0b\x32\x06.TsRow\"V\n\x08TsPutReq\x12\r\n\x05table\x18\x01 \x02(\x0c\x12%\n\x07\x63olumns\x18\x02 \x03(\x0b\x32\x14.TsColumnDescription\x12\x14\n\x04rows\x18\x03 \x03(\x0b\x32\x06.TsRow\"Y\n\x0bTsTtbPutReq\x12\r\n\x05table\x18\x01 \x02(\x0c\x12%\n\x07\x63olumns\x18\x02 \x03(\x0b\x32\x14.TsColumnDescription\x12\x14\n\x04rows\x18\x03 \x03(\x0b\x32\x06.TsRow\"\x0b\n\tTsPutResp\"P\n\x08TsDelReq\x12\r\n\x05table\x18\x01 \x02(\x0c\x12\x14\n\x03key\x18\x02 \x03(\x0b\x32\x07.TsCell\x12\x0e\n\x06vclock\x18\x03 \x01(\x0c\x12\x0f\n\x07timeout\x18\x04 \x01(\r\"\x0b\n\tTsDelResp\"A\n\x0fTsInterpolation\x12\x0c\n\x04\x62\x61se\x18\x01 \x02(\x0c\x12 \n\x0einterpolations\x18\x02 \x03(\x0b\x32\x08.RpbPair\"@\n\x13TsColumnDescription\x12\x0c\n\x04name\x18\x01 \x02(\x0c\x12\x1b\n\x04type\x18\x02 \x02(\x0e\x32\r.TsColumnType\"\x1f\n\x05TsRow\x12\x16\n\x05\x63\x65lls\x18\x01 \x03(\x0b\x32\x07.TsCell\"{\n\x06TsCell\x12\x15\n\rvarchar_value\x18\x01 \x01(\x0c\x12\x14\n\x0csint64_value\x18\x02 \x01(\x12\x12\x17\n\x0ftimestamp_value\x18\x03 \x01(\x12\x12\x15\n\rboolean_value\x18\x04 \x01(\x08\x12\x14\n\x0c\x64ouble_value\x18\x05 \x01(\x01\"/\n\rTsListKeysReq\x12\r\n\x05table\x18\x01 \x02(\x0c\x12\x0f\n\x07timeout\x18\x02 \x01(\r\"4\n\x0eTsListKeysResp\x12\x14\n\x04keys\x18\x01 \x03(\x0b\x32\x06.TsRow\x12\x0c\n\x04\x64one\x18\x02 \x01(\x08\"q\n\rTsCoverageReq\x12\x1f\n\x05query\x18\x01 \x01(\x0b\x32\x10.TsInterpolation\x12\r\n\x05table\x18\x02 \x02(\x0c\x12\x15\n\rreplace_cover\x18\x03 \x01(\x0c\x12\x19\n\x11unavailable_cover\x18\x04 \x03(\x0c\"3\n\x0eTsCoverageResp\x12!\n\x07\x65ntries\x18\x01 \x03(\x0b\x32\x10.TsCoverageEntry\"[\n\x0fTsCoverageEntry\x12\n\n\x02ip\x18\x01 \x02(\x0c\x12\x0c\n\x04port\x18\x02 \x02(\r\x12\x15\n\rcover_context\x18\x03 \x02(\x0c\x12\x17\n\x05range\x18\x04 \x01(\x0b\x32\x08.TsRange\"\x93\x01\n\x07TsRange\x12\x12\n\nfield_name\x18\x01 \x02(\x0c\x12\x13\n\x0blower_bound\x18\x02 \x02(\x12\x12\x1d\n\x15lower_bound_inclusive\x18\x03 \x02(\x08\x12\x13\n\x0bupper_bound\x18\x04 \x02(\x12\x12\x1d\n\x15upper_bound_inclusive\x18\x05 \x02(\x08\x12\x0c\n\x04\x64\x65sc\x18\x06 \x02(\x0c*O\n\x0cTsColumnType\x12\x0b\n\x07VARCHAR\x10\x00\x12\n\n\x06SINT64\x10\x01\x12\n\n\x06\x44OUBLE\x10\x02\x12\r\n\tTIMESTAMP\x10\x03\x12\x0b\n\x07\x42OOLEAN\x10\x04\x42#\n\x17\x63om.basho.riak.protobufB\x08RiakTsPB') + serialized_pb='\n\rriak_ts.proto\x1a\nriak.proto\"[\n\nTsQueryReq\x12\x1f\n\x05query\x18\x01 \x01(\x0b\x32\x10.TsInterpolation\x12\x15\n\x06stream\x18\x02 \x01(\x08:\x05\x66\x61lse\x12\x15\n\rcover_context\x18\x03 \x01(\x0c\"^\n\x0bTsQueryResp\x12%\n\x07\x63olumns\x18\x01 \x03(\x0b\x32\x14.TsColumnDescription\x12\x14\n\x04rows\x18\x02 \x03(\x0b\x32\x06.TsRow\x12\x12\n\x04\x64one\x18\x03 \x01(\x08:\x04true\"@\n\x08TsGetReq\x12\r\n\x05table\x18\x01 \x02(\x0c\x12\x14\n\x03key\x18\x02 \x03(\x0b\x32\x07.TsCell\x12\x0f\n\x07timeout\x18\x03 \x01(\r\"H\n\tTsGetResp\x12%\n\x07\x63olumns\x18\x01 \x03(\x0b\x32\x14.TsColumnDescription\x12\x14\n\x04rows\x18\x02 \x03(\x0b\x32\x06.TsRow\"V\n\x08TsPutReq\x12\r\n\x05table\x18\x01 \x02(\x0c\x12%\n\x07\x63olumns\x18\x02 \x03(\x0b\x32\x14.TsColumnDescription\x12\x14\n\x04rows\x18\x03 \x03(\x0b\x32\x06.TsRow\"\x0b\n\tTsPutResp\"P\n\x08TsDelReq\x12\r\n\x05table\x18\x01 \x02(\x0c\x12\x14\n\x03key\x18\x02 \x03(\x0b\x32\x07.TsCell\x12\x0e\n\x06vclock\x18\x03 \x01(\x0c\x12\x0f\n\x07timeout\x18\x04 \x01(\r\"\x0b\n\tTsDelResp\"A\n\x0fTsInterpolation\x12\x0c\n\x04\x62\x61se\x18\x01 \x02(\x0c\x12 \n\x0einterpolations\x18\x02 \x03(\x0b\x32\x08.RpbPair\"@\n\x13TsColumnDescription\x12\x0c\n\x04name\x18\x01 \x02(\x0c\x12\x1b\n\x04type\x18\x02 \x02(\x0e\x32\r.TsColumnType\"\x1f\n\x05TsRow\x12\x16\n\x05\x63\x65lls\x18\x01 \x03(\x0b\x32\x07.TsCell\"{\n\x06TsCell\x12\x15\n\rvarchar_value\x18\x01 \x01(\x0c\x12\x14\n\x0csint64_value\x18\x02 \x01(\x12\x12\x17\n\x0ftimestamp_value\x18\x03 \x01(\x12\x12\x15\n\rboolean_value\x18\x04 \x01(\x08\x12\x14\n\x0c\x64ouble_value\x18\x05 \x01(\x01\"/\n\rTsListKeysReq\x12\r\n\x05table\x18\x01 \x02(\x0c\x12\x0f\n\x07timeout\x18\x02 \x01(\r\"4\n\x0eTsListKeysResp\x12\x14\n\x04keys\x18\x01 \x03(\x0b\x32\x06.TsRow\x12\x0c\n\x04\x64one\x18\x02 \x01(\x08\"q\n\rTsCoverageReq\x12\x1f\n\x05query\x18\x01 \x01(\x0b\x32\x10.TsInterpolation\x12\r\n\x05table\x18\x02 \x02(\x0c\x12\x15\n\rreplace_cover\x18\x03 \x01(\x0c\x12\x19\n\x11unavailable_cover\x18\x04 \x03(\x0c\"3\n\x0eTsCoverageResp\x12!\n\x07\x65ntries\x18\x01 \x03(\x0b\x32\x10.TsCoverageEntry\"[\n\x0fTsCoverageEntry\x12\n\n\x02ip\x18\x01 \x02(\x0c\x12\x0c\n\x04port\x18\x02 \x02(\r\x12\x15\n\rcover_context\x18\x03 \x02(\x0c\x12\x17\n\x05range\x18\x04 \x01(\x0b\x32\x08.TsRange\"\x93\x01\n\x07TsRange\x12\x12\n\nfield_name\x18\x01 \x02(\x0c\x12\x13\n\x0blower_bound\x18\x02 \x02(\x12\x12\x1d\n\x15lower_bound_inclusive\x18\x03 \x02(\x08\x12\x13\n\x0bupper_bound\x18\x04 \x02(\x12\x12\x1d\n\x15upper_bound_inclusive\x18\x05 \x02(\x08\x12\x0c\n\x04\x64\x65sc\x18\x06 \x02(\x0c*O\n\x0cTsColumnType\x12\x0b\n\x07VARCHAR\x10\x00\x12\n\n\x06SINT64\x10\x01\x12\n\n\x06\x44OUBLE\x10\x02\x12\r\n\tTIMESTAMP\x10\x03\x12\x0b\n\x07\x42OOLEAN\x10\x04\x42#\n\x17\x63om.basho.riak.protobufB\x08RiakTsPB') _TSCOLUMNTYPE = _descriptor.EnumDescriptor( name='TsColumnType', @@ -47,8 +47,8 @@ ], containing_type=None, options=None, - serialized_start=1450, - serialized_end=1529, + serialized_start=1359, + serialized_end=1438, ) TsColumnType = enum_type_wrapper.EnumTypeWrapper(_TSCOLUMNTYPE) @@ -263,48 +263,6 @@ ) -_TSTTBPUTREQ = _descriptor.Descriptor( - name='TsTtbPutReq', - full_name='TsTtbPutReq', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='table', full_name='TsTtbPutReq.table', index=0, - number=1, type=12, cpp_type=9, label=2, - has_default_value=False, default_value="", - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='columns', full_name='TsTtbPutReq.columns', index=1, - number=2, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='rows', full_name='TsTtbPutReq.rows', index=2, - number=3, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=446, - serialized_end=535, -) - - _TSPUTRESP = _descriptor.Descriptor( name='TsPutResp', full_name='TsPutResp', @@ -321,8 +279,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=537, - serialized_end=548, + serialized_start=446, + serialized_end=457, ) @@ -370,8 +328,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=550, - serialized_end=630, + serialized_start=459, + serialized_end=539, ) @@ -391,8 +349,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=632, - serialized_end=643, + serialized_start=541, + serialized_end=552, ) @@ -426,8 +384,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=645, - serialized_end=710, + serialized_start=554, + serialized_end=619, ) @@ -461,8 +419,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=712, - serialized_end=776, + serialized_start=621, + serialized_end=685, ) @@ -489,8 +447,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=778, - serialized_end=809, + serialized_start=687, + serialized_end=718, ) @@ -545,8 +503,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=811, - serialized_end=934, + serialized_start=720, + serialized_end=843, ) @@ -580,8 +538,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=936, - serialized_end=983, + serialized_start=845, + serialized_end=892, ) @@ -615,8 +573,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=985, - serialized_end=1037, + serialized_start=894, + serialized_end=946, ) @@ -664,8 +622,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=1039, - serialized_end=1152, + serialized_start=948, + serialized_end=1061, ) @@ -692,8 +650,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=1154, - serialized_end=1205, + serialized_start=1063, + serialized_end=1114, ) @@ -741,8 +699,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=1207, - serialized_end=1298, + serialized_start=1116, + serialized_end=1207, ) @@ -804,8 +762,8 @@ options=None, is_extendable=False, extension_ranges=[], - serialized_start=1301, - serialized_end=1448, + serialized_start=1210, + serialized_end=1357, ) _TSQUERYREQ.fields_by_name['query'].message_type = _TSINTERPOLATION @@ -816,8 +774,6 @@ _TSGETRESP.fields_by_name['rows'].message_type = _TSROW _TSPUTREQ.fields_by_name['columns'].message_type = _TSCOLUMNDESCRIPTION _TSPUTREQ.fields_by_name['rows'].message_type = _TSROW -_TSTTBPUTREQ.fields_by_name['columns'].message_type = _TSCOLUMNDESCRIPTION -_TSTTBPUTREQ.fields_by_name['rows'].message_type = _TSROW _TSDELREQ.fields_by_name['key'].message_type = _TSCELL _TSINTERPOLATION.fields_by_name['interpolations'].message_type = riak.pb.riak_pb2._RPBPAIR _TSCOLUMNDESCRIPTION.fields_by_name['type'].enum_type = _TSCOLUMNTYPE @@ -831,7 +787,6 @@ DESCRIPTOR.message_types_by_name['TsGetReq'] = _TSGETREQ DESCRIPTOR.message_types_by_name['TsGetResp'] = _TSGETRESP DESCRIPTOR.message_types_by_name['TsPutReq'] = _TSPUTREQ -DESCRIPTOR.message_types_by_name['TsTtbPutReq'] = _TSTTBPUTREQ DESCRIPTOR.message_types_by_name['TsPutResp'] = _TSPUTRESP DESCRIPTOR.message_types_by_name['TsDelReq'] = _TSDELREQ DESCRIPTOR.message_types_by_name['TsDelResp'] = _TSDELRESP @@ -876,12 +831,6 @@ class TsPutReq(_message.Message): # @@protoc_insertion_point(class_scope:TsPutReq) -@add_metaclass(_reflection.GeneratedProtocolMessageType) -class TsTtbPutReq(_message.Message): - DESCRIPTOR = _TSTTBPUTREQ - - # @@protoc_insertion_point(class_scope:TsTtbPutReq) - @add_metaclass(_reflection.GeneratedProtocolMessageType) class TsPutResp(_message.Message): DESCRIPTOR = _TSPUTRESP diff --git a/riak/tests/__init__.py b/riak/tests/__init__.py index 0cf7d1d4..a3b82800 100644 --- a/riak/tests/__init__.py +++ b/riak/tests/__init__.py @@ -1,5 +1,7 @@ +import logging import os import socket +import sys from riak.test_server import TestServer from riak.security import SecurityCreds @@ -27,6 +29,12 @@ def hostname_resolves(hostname): except socket.error: return 0 +distutils_debug = os.environ.get('DISTUTILS_DEBUG', '0') +if distutils_debug == '1': + logger = logging.getLogger() + logger.level = logging.DEBUG + logger.addHandler(logging.StreamHandler(sys.stdout)) + HOST = os.environ.get('RIAK_TEST_HOST', '127.0.0.1') PROTOCOL = os.environ.get('RIAK_TEST_PROTOCOL', 'pbc') diff --git a/riak/tests/base.py b/riak/tests/base.py index 97b9607c..4e9f5122 100644 --- a/riak/tests/base.py +++ b/riak/tests/base.py @@ -45,16 +45,16 @@ def create_client(cls, host=None, http_port=None, pb_port=None, if hasattr(cls, 'client_options'): kwargs.update(cls.client_options) - if hasattr(cls, 'logging_enabled') and cls.logging_enabled: - cls.logger.debug("RiakClient(protocol='%s', host='%s', " + - "pb_port='%d', http_port='%d', " + - "credentials='%s', kwargs='%s')", - protocol, - host, - pb_port, - http_port, - credentials, - kwargs) + logger = logging.getLogger() + logger.debug("RiakClient(protocol='%s', host='%s', " + + "pb_port='%d', http_port='%d', " + + "credentials='%s', kwargs='%s')", + protocol, + host, + pb_port, + http_port, + credentials, + kwargs) return RiakClient(protocol=protocol, host=host, @@ -63,23 +63,6 @@ def create_client(cls, host=None, http_port=None, pb_port=None, pb_port=pb_port, **kwargs) - @classmethod - def setUpClass(cls): - cls.logging_enabled = False - distutils_debug = os.environ.get('DISTUTILS_DEBUG', '0') - if distutils_debug == '1': - cls.logging_enabled = True - cls.logger = logging.getLogger() - cls.logger.level = logging.DEBUG - cls.logging_stream_handler = logging.StreamHandler(sys.stdout) - cls.logger.addHandler(cls.logging_stream_handler) - - @classmethod - def tearDownClass(cls): - if hasattr(cls, 'logging_enabled') and cls.logging_enabled: - cls.logger.removeHandler(cls.logging_stream_handler) - cls.logging_enabled = False - def setUp(self): self.bucket_name = self.randname() self.key_name = self.randname() diff --git a/riak/tests/test_timeseries_ttb.py b/riak/tests/test_timeseries_ttb.py index 339dd040..58b1322b 100644 --- a/riak/tests/test_timeseries_ttb.py +++ b/riak/tests/test_timeseries_ttb.py @@ -38,7 +38,7 @@ bd1 = six.u('временные ряды') fiveMins = datetime.timedelta(0, 300) -ts0 = datetime.datetime(2015, 1, 1, 12, 0, 0) +ts0 = datetime.datetime(2015, 1, 1, 12, 1, 2, 987000) ts1 = ts0 + fiveMins diff --git a/riak/tests/test_util.py b/riak/tests/test_util.py index af704516..becfd06a 100644 --- a/riak/tests/test_util.py +++ b/riak/tests/test_util.py @@ -1,10 +1,67 @@ +import datetime import unittest -from riak.util import is_timeseries_supported +from riak.util import is_timeseries_supported, \ + datetime_from_unix_time_millis, \ + unix_time_millis class UtilUnitTests(unittest.TestCase): + # NB: + # 144379690 secs, 987 msecs past epoch + # 144379690987 total msecs past epoch + def test_conv_ms_timestamp_to_datetime_and_back(self): + if is_timeseries_supported(): + # this is what would be stored in Riak TS + v = 144379690987 + dt = datetime_from_unix_time_millis(v) + + # This is how Python represents the above + utp = 144379690.987000 + dtp = datetime.datetime.utcfromtimestamp(utp) + self.assertEqual(dt, dtp) + + utm = unix_time_millis(dt) + self.assertEqual(v, utm) + else: + pass + + def test_conv_datetime_to_unix_millis(self): + # This is the "native" Python unix timestamp including + # microseconds, as float. timedelta "total_seconds()" + # returns a value like this + if is_timeseries_supported(): + v = 144379690.987000 + d = datetime.datetime.utcfromtimestamp(v) + utm = unix_time_millis(d) + self.assertEqual(utm, 144379690987) + else: + pass + + def test_unix_millis_validation(self): + v = 144379690.987 + with self.assertRaises(ValueError): + datetime_from_unix_time_millis(v) + + def test_unix_millis_small_value(self): + if is_timeseries_supported(): + # this is what would be stored in Riak TS + v = 1001 + dt = datetime_from_unix_time_millis(v) + + # This is how Python represents the above + utp = 1.001 + dtp = datetime.datetime.utcfromtimestamp(utp) + self.assertEqual(dt, dtp) + + utm = unix_time_millis(dt) + self.assertEqual(v, utm) + else: + pass + def test_is_timeseries_supported(self): + v = (2, 7, 10) + self.assertEqual(True, is_timeseries_supported(v)) v = (2, 7, 11) self.assertEqual(True, is_timeseries_supported(v)) v = (2, 7, 12) @@ -13,3 +70,9 @@ def test_is_timeseries_supported(self): self.assertEqual(False, is_timeseries_supported(v)) v = (3, 4, 3) self.assertEqual(False, is_timeseries_supported(v)) + v = (3, 4, 4) + self.assertEqual(True, is_timeseries_supported(v)) + v = (3, 4, 5) + self.assertEqual(True, is_timeseries_supported(v)) + v = (3, 5, 1) + self.assertEqual(True, is_timeseries_supported(v)) diff --git a/riak/transports/tcp/transport.py b/riak/transports/tcp/transport.py index 58420767..7d99dcd1 100644 --- a/riak/transports/tcp/transport.py +++ b/riak/transports/tcp/transport.py @@ -1,10 +1,12 @@ import six + import riak.pb.messages from riak import RiakError from riak.codecs import Codec, Msg from riak.codecs.pbuf import PbufCodec -from riak.codecs.ttb import TtbCodec, MSG_CODE_TS_TTB +from riak.codecs.ttb import TtbCodec +from riak.pb.messages import MSG_CODE_TS_TTB_MSG from riak.transports.transport import Transport from riak.ts_object import TsObject @@ -54,7 +56,7 @@ def _get_ttb_codec(self): return codec def _get_codec(self, msg_code): - if msg_code == MSG_CODE_TS_TTB: + if msg_code == MSG_CODE_TS_TTB_MSG: codec = self._get_ttb_codec() elif msg_code == riak.pb.messages.MSG_CODE_TS_GET_REQ: codec = self._get_ttb_codec() @@ -140,7 +142,7 @@ def ts_describe(self, table): return self.ts_query(table, query) def ts_get(self, table, key): - msg_code = MSG_CODE_TS_TTB + msg_code = MSG_CODE_TS_TTB_MSG codec = self._get_codec(msg_code) msg = codec.encode_timeseries_keyreq(table, key) resp_code, resp = self._request(msg, codec) @@ -149,7 +151,7 @@ def ts_get(self, table, key): return tsobj def ts_put(self, tsobj): - msg_code = MSG_CODE_TS_TTB + msg_code = MSG_CODE_TS_TTB_MSG codec = self._get_codec(msg_code) msg = codec.encode_timeseries_put(tsobj) resp_code, resp = self._request(msg, codec) @@ -327,7 +329,7 @@ def stream_mapred(self, inputs, query, timeout=None): def get_index(self, bucket, index, startkey, endkey=None, return_terms=None, max_results=None, continuation=None, timeout=None, term_regex=None): - # TODO RTS-842 NUKE THIS + # TODO FUTURE NUKE THIS MAPRED if not self.pb_indexes(): return self._get_index_mapred_emu(bucket, index, startkey, endkey) @@ -428,10 +430,9 @@ def get_search_schema(self, schema): return codec.decode_get_search_schema(resp) def search(self, index, query, **kwargs): - # TODO RTS-842 NUKE THIS + # TODO FUTURE NUKE THIS MAPRED if not self.pb_search(): return self._search_mapred_emu(index, query) - # TODO RTS-842 six.u() instead? if six.PY2 and isinstance(query, unicode): # noqa query = query.encode('utf8') msg_code = riak.pb.messages.MSG_CODE_SEARCH_QUERY_REQ @@ -527,7 +528,7 @@ def _request(self, msg, codec=None): resp_code, data = self._send_recv(msg_code, data) codec.maybe_riak_error(resp_code, data) codec.maybe_incorrect_code(resp_code, expect) - if resp_code == MSG_CODE_TS_TTB or \ + if resp_code == MSG_CODE_TS_TTB_MSG or \ resp_code in riak.pb.messages.MESSAGE_CLASSES: msg = codec.parse_msg(resp_code, data) else: diff --git a/riak/transports/transport.py b/riak/transports/transport.py index 6e5fee2c..bda18e35 100644 --- a/riak/transports/transport.py +++ b/riak/transports/transport.py @@ -295,7 +295,7 @@ def get_preflist(self, bucket, key): """ raise NotImplementedError - # TODO RTS-842 NUKE THIS + # TODO FUTURE NUKE THIS MAPRED def _search_mapred_emu(self, index, query): """ Emulates a search request via MapReduce. Used in the case @@ -321,7 +321,7 @@ def _search_mapred_emu(self, index, query): result['docs'].append({u'id': key}) return result - # TODO RTS-842 NUKE THIS + # TODO FUTURE NUKE THIS MAPRED def _get_index_mapred_emu(self, bucket, index, startkey, endkey=None): """ Emulates a secondary index request via MapReduce. Used in the diff --git a/riak/util.py b/riak/util.py index 4cbe6c0f..b9ca0e2b 100644 --- a/riak/util.py +++ b/riak/util.py @@ -1,28 +1,35 @@ from __future__ import print_function +import datetime +import decimal import sys import warnings from collections import Mapping from six import string_types, PY2 -import datetime epoch = datetime.datetime.utcfromtimestamp(0) def unix_time_millis(dt): td = dt - epoch - return int(td.total_seconds() * 1000.0) + tdms = ((td.days * 24 * 3600) + td.seconds) * 1000 + ms = td.microseconds // 1000 + return tdms + ms def datetime_from_unix_time_millis(ut): - return datetime.datetime.utcfromtimestamp(ut / 1000.0) + if isinstance(ut, float): + raise ValueError('unix timestamp must not be a float, \ + it must be total milliseconds since epoch as an integer') + utms = ut / 1000.0 + return datetime.datetime.utcfromtimestamp(utms) def is_timeseries_supported(v=None): if v is None: v = sys.version_info - return v < (3,) or v >= (3, 4, 4) + return v < (3,) or v[:3] >= (3, 4, 4) def quacks_like_dict(object): diff --git a/riak_pb b/riak_pb index 341269c1..d14b2c97 160000 --- a/riak_pb +++ b/riak_pb @@ -1 +1 @@ -Subproject commit 341269c19c75fa0557d5aa5fd5ac1f0dfe18cfae +Subproject commit d14b2c9758427f47106ef8064d39415b59076f72 From cabb3737815f8e8d20d801969fa3811a841ee3e9 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 29 Apr 2016 13:54:10 -0700 Subject: [PATCH 2/4] Add ts_convert_timestamp transport option --- riak/codecs/pbuf.py | 16 +++++++++++----- riak/codecs/ttb.py | 14 ++++++++++---- riak/tests/test_timeseries_pbuf.py | 5 +++-- riak/tests/test_timeseries_ttb.py | 8 +++++--- riak/transports/tcp/stream.py | 7 ++++++- riak/transports/tcp/transport.py | 10 +++++++--- 6 files changed, 42 insertions(+), 18 deletions(-) diff --git a/riak/codecs/pbuf.py b/riak/codecs/pbuf.py index a976d1ff..db941a34 100644 --- a/riak/codecs/pbuf.py +++ b/riak/codecs/pbuf.py @@ -773,7 +773,8 @@ def encode_timeseries_query(self, table, query, interpolations=None): rc = riak.pb.messages.MSG_CODE_TS_QUERY_RESP return Msg(mc, req.SerializeToString(), rc) - def decode_timeseries(self, resp, tsobj): + def decode_timeseries(self, resp, tsobj, + convert_timestamp=False): """ Fills an TsObject with the appropriate data and metadata from a TsGetResp / TsQueryResp. @@ -783,6 +784,8 @@ def decode_timeseries(self, resp, tsobj): riak.pb.riak_ts_pb2.TsGetResp :param tsobj: a TsObject :type tsobj: TsObject + :param convert_timestamp: Convert timestamps to datetime objects + :type tsobj: boolean """ if resp.columns is not None: col_names = [] @@ -798,7 +801,7 @@ def decode_timeseries(self, resp, tsobj): for row in resp.rows: tsobj.rows.append( self.decode_timeseries_row( - row, resp.columns)) + row, resp.columns, convert_timestamp)) def decode_timeseries_col_type(self, col_type): # NB: these match the atom names for column types @@ -816,7 +819,8 @@ def decode_timeseries_col_type(self, col_type): msg = 'could not decode column type: {}'.format(col_type) raise RiakError(msg) - def decode_timeseries_row(self, tsrow, tscols=None): + def decode_timeseries_row(self, tsrow, tscols=None, + convert_timestamp=False): """ Decodes a TsRow into a list @@ -850,8 +854,10 @@ def decode_timeseries_row(self, tsrow, tscols=None): if col and col.type != TsColumnType.Value('TIMESTAMP'): raise TypeError('expected TIMESTAMP column') else: - dt = datetime_from_unix_time_millis( - cell.timestamp_value) + dt = cell.timestamp_value + if convert_timestamp: + dt = datetime_from_unix_time_millis( + cell.timestamp_value) row.append(dt) elif cell.HasField('boolean_value'): if col and col.type != TsColumnType.Value('BOOLEAN'): diff --git a/riak/codecs/ttb.py b/riak/codecs/ttb.py index e8358326..3a9b1144 100644 --- a/riak/codecs/ttb.py +++ b/riak/codecs/ttb.py @@ -138,7 +138,8 @@ def encode_timeseries_query(self, table, query, interpolations=None): rc = MSG_CODE_TS_TTB_MSG return Msg(mc, encode(req), rc) - def decode_timeseries(self, resp_ttb, tsobj): + def decode_timeseries(self, resp_ttb, tsobj, + convert_timestamp=False): """ Fills an TsObject with the appropriate data and metadata from a TTB-encoded TsGetResp / TsQueryResp. @@ -147,6 +148,8 @@ def decode_timeseries(self, resp_ttb, tsobj): :type resp_ttb: TTB-encoded tsqueryrsp or tsgetresp :param tsobj: a TsObject :type tsobj: TsObject + :param convert_timestamp: Convert timestamps to datetime objects + :type tsobj: boolean """ if resp_ttb is None: return tsobj @@ -169,7 +172,8 @@ def decode_timeseries(self, resp_ttb, tsobj): tsobj.rows = [] for resp_row in resp_rows: tsobj.rows.append( - self.decode_timeseries_row(resp_row, resp_coltypes)) + self.decode_timeseries_row(resp_row, resp_coltypes, + convert_timestamp)) else: raise RiakError( "Expected 3-tuple in response, got: {}".format(resp_data)) @@ -181,7 +185,7 @@ def decode_timeseries_cols(self, cnames, ctypes): ctypes = [str(ctype) for ctype in ctypes] return TsColumns(cnames, ctypes) - def decode_timeseries_row(self, tsrow, tsct): + def decode_timeseries_row(self, tsrow, tsct, convert_timestamp=False): """ Decodes a TTB-encoded TsRow into a list @@ -189,6 +193,8 @@ def decode_timeseries_row(self, tsrow, tsct): :type tsrow: TTB dncoded row :param tsct: the TTB decoded column types (atoms). :type tsct: list + :param convert_timestamp: Convert timestamps to datetime objects + :type tsobj: boolean :rtype list """ row = [] @@ -198,7 +204,7 @@ def decode_timeseries_row(self, tsrow, tsct): elif isinstance(cell, list) and len(cell) == 0: row.append(None) else: - if tsct[i] == timestamp_a: + if convert_timestamp and tsct[i] == timestamp_a: row.append(datetime_from_unix_time_millis(cell)) else: row.append(cell) diff --git a/riak/tests/test_timeseries_pbuf.py b/riak/tests/test_timeseries_pbuf.py index 50c0818c..8ef75470 100644 --- a/riak/tests/test_timeseries_pbuf.py +++ b/riak/tests/test_timeseries_pbuf.py @@ -161,7 +161,7 @@ def test_decode_data_from_query(self): tsobj = TsObject(None, self.table) c = PbufCodec() - c.decode_timeseries(tqr, tsobj) + c.decode_timeseries(tqr, tsobj, True) self.assertEqual(len(tsobj.rows), len(self.rows)) self.assertEqual(len(tsobj.columns.names), len(tqr.columns)) @@ -197,7 +197,8 @@ def test_decode_data_from_query(self): @unittest.skipUnless(is_timeseries_supported() and RUN_TIMESERIES, 'Timeseries not supported or RUN_TIMESERIES is 0') class TimeseriesPbufTests(IntegrationTestBase, unittest.TestCase): - client_options = {'transport_options': {'use_ttb': False}} + client_options = {'transport_options': + {'use_ttb': False, 'ts_convert_timestamp': True}} @classmethod def setUpClass(cls): diff --git a/riak/tests/test_timeseries_ttb.py b/riak/tests/test_timeseries_ttb.py index 58b1322b..45c2289f 100644 --- a/riak/tests/test_timeseries_ttb.py +++ b/riak/tests/test_timeseries_ttb.py @@ -92,8 +92,9 @@ def test_decode_data_from_get(self): self.assertEqual(r[0], dr[0].encode('utf-8')) self.assertEqual(r[1], dr[1]) self.assertEqual(r[2], dr[2]) - dt = datetime_from_unix_time_millis(dr[3]) - self.assertEqual(r[3], dt) + # NB *not* decoding timestamps + # dt = datetime_from_unix_time_millis(dr[3]) + self.assertEqual(r[3], dr[3]) if i == 0: self.assertEqual(r[4], True) else: @@ -123,7 +124,8 @@ def test_encode_data_for_put(self): @unittest.skipUnless(is_timeseries_supported() and RUN_TIMESERIES, 'Timeseries not supported or RUN_TIMESERIES is 0') class TimeseriesTtbTests(IntegrationTestBase, unittest.TestCase): - client_options = {'transport_options': {'use_ttb': True}} + client_options = {'transport_options': + {'use_ttb': True, 'ts_convert_timestamp': True}} @classmethod def setUpClass(cls): diff --git a/riak/transports/tcp/stream.py b/riak/transports/tcp/stream.py index 3cf0e974..3ef29ce1 100644 --- a/riak/transports/tcp/stream.py +++ b/riak/transports/tcp/stream.py @@ -174,6 +174,10 @@ class PbufTsKeyStream(PbufStream, TtbCodec): _expect = riak.pb.messages.MSG_CODE_TS_LIST_KEYS_RESP + def __init__(self, transport, codec, convert_timestamp=False): + super(PbufTsKeyStream, self).__init__(transport, codec) + self._convert_timestamp = convert_timestamp + def next(self): response = super(PbufTsKeyStream, self).next() @@ -182,7 +186,8 @@ def next(self): keys = [] for tsrow in response.keys: - keys.append(self.codec.decode_timeseries_row(tsrow)) + keys.append(self.codec.decode_timeseries_row(tsrow, + convert_timestamp=self._convert_timestamp)) return keys diff --git a/riak/transports/tcp/transport.py b/riak/transports/tcp/transport.py index 7d99dcd1..dcb597f2 100644 --- a/riak/transports/tcp/transport.py +++ b/riak/transports/tcp/transport.py @@ -38,6 +38,8 @@ def __init__(self, self._pbuf_c = None self._ttb_c = None self._use_ttb = kwargs.get('use_ttb', True) + self._ts_convert_timestamp = \ + kwargs.get('ts_convert_timestamp', False) def _get_pbuf_codec(self): if not self._pbuf_c: @@ -147,7 +149,8 @@ def ts_get(self, table, key): msg = codec.encode_timeseries_keyreq(table, key) resp_code, resp = self._request(msg, codec) tsobj = TsObject(self._client, table) - codec.decode_timeseries(resp, tsobj) + codec.decode_timeseries(resp, tsobj, + self._ts_convert_timestamp) return tsobj def ts_put(self, tsobj): @@ -173,7 +176,8 @@ def ts_query(self, table, query, interpolations=None): msg = codec.encode_timeseries_query(table, query, interpolations) resp_code, resp = self._request(msg, codec) tsobj = TsObject(self._client, table) - codec.decode_timeseries(resp, tsobj) + codec.decode_timeseries(resp, tsobj, + self._ts_convert_timestamp) return tsobj def ts_stream_keys(self, table, timeout=None): @@ -185,7 +189,7 @@ def ts_stream_keys(self, table, timeout=None): codec = self._get_codec(msg_code) msg = codec.encode_timeseries_listkeysreq(table, timeout) self._send_msg(msg.msg_code, msg.data) - return PbufTsKeyStream(self, codec) + return PbufTsKeyStream(self, codec, self._ts_convert_timestamp) def delete(self, robj, rw=None, r=None, w=None, dw=None, pr=None, pw=None, timeout=None): From 568648275d4b7e13ad03cb335c21432bd94e4a04 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 2 May 2016 07:22:46 -0700 Subject: [PATCH 3/4] make grouchy linter happy --- riak/codecs/pbuf.py | 4 ++-- riak/codecs/ttb.py | 4 ++-- riak/tests/base.py | 14 +++----------- riak/tests/test_timeseries_pbuf.py | 2 +- riak/tests/test_timeseries_ttb.py | 5 ++--- riak/transports/tcp/stream.py | 2 +- riak/transports/tcp/transport.py | 4 ++-- riak/util.py | 1 - 8 files changed, 13 insertions(+), 23 deletions(-) diff --git a/riak/codecs/pbuf.py b/riak/codecs/pbuf.py index db941a34..73914352 100644 --- a/riak/codecs/pbuf.py +++ b/riak/codecs/pbuf.py @@ -774,7 +774,7 @@ def encode_timeseries_query(self, table, query, interpolations=None): return Msg(mc, req.SerializeToString(), rc) def decode_timeseries(self, resp, tsobj, - convert_timestamp=False): + convert_timestamp=False): """ Fills an TsObject with the appropriate data and metadata from a TsGetResp / TsQueryResp. @@ -820,7 +820,7 @@ def decode_timeseries_col_type(self, col_type): raise RiakError(msg) def decode_timeseries_row(self, tsrow, tscols=None, - convert_timestamp=False): + convert_timestamp=False): """ Decodes a TsRow into a list diff --git a/riak/codecs/ttb.py b/riak/codecs/ttb.py index 3a9b1144..1c0b3bfe 100644 --- a/riak/codecs/ttb.py +++ b/riak/codecs/ttb.py @@ -139,7 +139,7 @@ def encode_timeseries_query(self, table, query, interpolations=None): return Msg(mc, encode(req), rc) def decode_timeseries(self, resp_ttb, tsobj, - convert_timestamp=False): + convert_timestamp=False): """ Fills an TsObject with the appropriate data and metadata from a TTB-encoded TsGetResp / TsQueryResp. @@ -173,7 +173,7 @@ def decode_timeseries(self, resp_ttb, tsobj, for resp_row in resp_rows: tsobj.rows.append( self.decode_timeseries_row(resp_row, resp_coltypes, - convert_timestamp)) + convert_timestamp)) else: raise RiakError( "Expected 3-tuple in response, got: {}".format(resp_data)) diff --git a/riak/tests/base.py b/riak/tests/base.py index 4e9f5122..b2891b54 100644 --- a/riak/tests/base.py +++ b/riak/tests/base.py @@ -1,8 +1,6 @@ # -*- coding: utf-8 -*- import logging -import os import random -import sys from riak.client import RiakClient from riak.tests import HOST, PROTOCOL, PB_PORT, HTTP_PORT, SECURITY_CREDS @@ -46,15 +44,9 @@ def create_client(cls, host=None, http_port=None, pb_port=None, kwargs.update(cls.client_options) logger = logging.getLogger() - logger.debug("RiakClient(protocol='%s', host='%s', " + - "pb_port='%d', http_port='%d', " + - "credentials='%s', kwargs='%s')", - protocol, - host, - pb_port, - http_port, - credentials, - kwargs) + logger.debug("RiakClient(protocol='%s', host='%s', pb_port='%d', " + "http_port='%d', credentials='%s', kwargs='%s')", + protocol, host, pb_port, http_port, credentials, kwargs) return RiakClient(protocol=protocol, host=host, diff --git a/riak/tests/test_timeseries_pbuf.py b/riak/tests/test_timeseries_pbuf.py index 8ef75470..72f13e0f 100644 --- a/riak/tests/test_timeseries_pbuf.py +++ b/riak/tests/test_timeseries_pbuf.py @@ -198,7 +198,7 @@ def test_decode_data_from_query(self): 'Timeseries not supported or RUN_TIMESERIES is 0') class TimeseriesPbufTests(IntegrationTestBase, unittest.TestCase): client_options = {'transport_options': - {'use_ttb': False, 'ts_convert_timestamp': True}} + {'use_ttb': False, 'ts_convert_timestamp': True}} @classmethod def setUpClass(cls): diff --git a/riak/tests/test_timeseries_ttb.py b/riak/tests/test_timeseries_ttb.py index 45c2289f..45ba6faf 100644 --- a/riak/tests/test_timeseries_ttb.py +++ b/riak/tests/test_timeseries_ttb.py @@ -12,8 +12,7 @@ from riak.ts_object import TsObject from riak.codecs.ttb import TtbCodec from riak.util import str_to_bytes, \ - unix_time_millis, datetime_from_unix_time_millis, \ - is_timeseries_supported + unix_time_millis, is_timeseries_supported from riak.tests import RUN_TIMESERIES from riak.tests.base import IntegrationTestBase @@ -125,7 +124,7 @@ def test_encode_data_for_put(self): 'Timeseries not supported or RUN_TIMESERIES is 0') class TimeseriesTtbTests(IntegrationTestBase, unittest.TestCase): client_options = {'transport_options': - {'use_ttb': True, 'ts_convert_timestamp': True}} + {'use_ttb': True, 'ts_convert_timestamp': True}} @classmethod def setUpClass(cls): diff --git a/riak/transports/tcp/stream.py b/riak/transports/tcp/stream.py index 3ef29ce1..1e913bda 100644 --- a/riak/transports/tcp/stream.py +++ b/riak/transports/tcp/stream.py @@ -187,7 +187,7 @@ def next(self): keys = [] for tsrow in response.keys: keys.append(self.codec.decode_timeseries_row(tsrow, - convert_timestamp=self._convert_timestamp)) + convert_timestamp=self._convert_timestamp)) return keys diff --git a/riak/transports/tcp/transport.py b/riak/transports/tcp/transport.py index d42d4dcc..466ac8d8 100644 --- a/riak/transports/tcp/transport.py +++ b/riak/transports/tcp/transport.py @@ -155,7 +155,7 @@ def ts_get(self, table, key): resp_code, resp = self._request(msg, codec) tsobj = TsObject(self._client, table) codec.decode_timeseries(resp, tsobj, - self._ts_convert_timestamp) + self._ts_convert_timestamp) return tsobj def ts_put(self, tsobj): @@ -182,7 +182,7 @@ def ts_query(self, table, query, interpolations=None): resp_code, resp = self._request(msg, codec) tsobj = TsObject(self._client, table) codec.decode_timeseries(resp, tsobj, - self._ts_convert_timestamp) + self._ts_convert_timestamp) return tsobj def ts_stream_keys(self, table, timeout=None): diff --git a/riak/util.py b/riak/util.py index b9ca0e2b..c422293e 100644 --- a/riak/util.py +++ b/riak/util.py @@ -1,7 +1,6 @@ from __future__ import print_function import datetime -import decimal import sys import warnings From 7b4b01aebf748aae4a59cb4b42ace508484d5885 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 2 May 2016 08:59:23 -0700 Subject: [PATCH 4/4] clean up multi-line string --- riak/util.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/riak/util.py b/riak/util.py index c422293e..7c2ca66d 100644 --- a/riak/util.py +++ b/riak/util.py @@ -19,8 +19,9 @@ def unix_time_millis(dt): def datetime_from_unix_time_millis(ut): if isinstance(ut, float): - raise ValueError('unix timestamp must not be a float, \ - it must be total milliseconds since epoch as an integer') + raise ValueError('unix timestamp must not be a float, ' + 'it must be total milliseconds since ' + 'epoch as an integer') utms = ut / 1000.0 return datetime.datetime.utcfromtimestamp(utms)