Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions riak/codecs/pbuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,8 @@ def encode_timeseries_query(self, table, query, interpolations=None):
rc = riak.pb.messages.MSG_CODE_TS_QUERY_RESP
return Msg(mc, req.SerializeToString(), rc)

def decode_timeseries(self, resp, tsobj):
def decode_timeseries(self, resp, tsobj,
convert_timestamp=False):
"""
Fills an TsObject with the appropriate data and
metadata from a TsGetResp / TsQueryResp.
Expand All @@ -783,6 +784,8 @@ def decode_timeseries(self, resp, tsobj):
riak.pb.riak_ts_pb2.TsGetResp
:param tsobj: a TsObject
:type tsobj: TsObject
:param convert_timestamp: Convert timestamps to datetime objects
:type tsobj: boolean
"""
if resp.columns is not None:
col_names = []
Expand All @@ -798,7 +801,7 @@ def decode_timeseries(self, resp, tsobj):
for row in resp.rows:
tsobj.rows.append(
self.decode_timeseries_row(
row, resp.columns))
row, resp.columns, convert_timestamp))

def decode_timeseries_col_type(self, col_type):
# NB: these match the atom names for column types
Expand All @@ -816,7 +819,8 @@ def decode_timeseries_col_type(self, col_type):
msg = 'could not decode column type: {}'.format(col_type)
raise RiakError(msg)

def decode_timeseries_row(self, tsrow, tscols=None):
def decode_timeseries_row(self, tsrow, tscols=None,
convert_timestamp=False):
"""
Decodes a TsRow into a list

Expand Down Expand Up @@ -850,8 +854,10 @@ def decode_timeseries_row(self, tsrow, tscols=None):
if col and col.type != TsColumnType.Value('TIMESTAMP'):
raise TypeError('expected TIMESTAMP column')
else:
dt = datetime_from_unix_time_millis(
cell.timestamp_value)
dt = cell.timestamp_value
if convert_timestamp:
dt = datetime_from_unix_time_millis(
cell.timestamp_value)
row.append(dt)
elif cell.HasField('boolean_value'):
if col and col.type != TsColumnType.Value('BOOLEAN'):
Expand Down
37 changes: 21 additions & 16 deletions riak/codecs/ttb.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from riak import RiakError
from riak.codecs import Codec, Msg
from riak.pb.messages import MSG_CODE_TS_TTB_MSG
from riak.ts_object import TsColumns
from riak.util import bytes_to_str, unix_time_millis, \
datetime_from_unix_time_millis
Expand All @@ -23,9 +24,6 @@
tsdelreq_a = Atom('tsdelreq')
timestamp_a = Atom('timestamp')

# TODO RTS-842
MSG_CODE_TS_TTB = 104


class TtbCodec(Codec):
'''
Expand All @@ -36,7 +34,7 @@ def __init__(self, **unused_args):
super(TtbCodec, self).__init__(**unused_args)

def parse_msg(self, msg_code, data):
if msg_code != MSG_CODE_TS_TTB:
if msg_code != MSG_CODE_TS_TTB_MSG:
raise RiakError("TTB can't parse code: {}".format(msg_code))
if len(data) > 0:
decoded = decode(data)
Expand All @@ -61,6 +59,7 @@ def encode_to_ts_cell(self, cell):
else:
if isinstance(cell, datetime.datetime):
ts = unix_time_millis(cell)
# logging.debug('encoded datetime %s as %s', cell, ts)
return ts
elif isinstance(cell, bool):
return cell
Expand All @@ -84,19 +83,19 @@ def encode_timeseries_keyreq(self, table, key, is_delete=False):
else:
raise ValueError("key must be a list")

mc = MSG_CODE_TS_TTB
rc = MSG_CODE_TS_TTB
mc = MSG_CODE_TS_TTB_MSG
rc = MSG_CODE_TS_TTB_MSG
req_atom = tsgetreq_a
if is_delete:
req_atom = tsdelreq_a

# TODO RTS-842 timeout is last
# TODO FUTURE add timeout as last param
req = req_atom, table.name, \
[self.encode_to_ts_cell(k) for k in key_vals], udef_a
return Msg(mc, encode(req), rc)

def validate_timeseries_put_resp(self, resp_code, resp):
if resp is None and resp_code == MSG_CODE_TS_TTB:
if resp is None and resp_code == MSG_CODE_TS_TTB_MSG:
return True
if resp is not None:
return True
Expand All @@ -123,8 +122,8 @@ def encode_timeseries_put(self, tsobj):
req_r.append(self.encode_to_ts_cell(cell))
req_rows.append(tuple(req_r))
req = tsputreq_a, tsobj.table.name, [], req_rows
mc = MSG_CODE_TS_TTB
rc = MSG_CODE_TS_TTB
mc = MSG_CODE_TS_TTB_MSG
rc = MSG_CODE_TS_TTB_MSG
return Msg(mc, encode(req), rc)
else:
raise RiakError("TsObject requires a list of rows")
Expand All @@ -135,11 +134,12 @@ def encode_timeseries_query(self, table, query, interpolations=None):
q = q.format(table=table.name)
tsi = tsinterpolation_a, q, []
req = tsqueryreq_a, tsi, False, []
mc = MSG_CODE_TS_TTB
rc = MSG_CODE_TS_TTB
mc = MSG_CODE_TS_TTB_MSG
rc = MSG_CODE_TS_TTB_MSG
return Msg(mc, encode(req), rc)

def decode_timeseries(self, resp_ttb, tsobj):
def decode_timeseries(self, resp_ttb, tsobj,
convert_timestamp=False):
"""
Fills an TsObject with the appropriate data and
metadata from a TTB-encoded TsGetResp / TsQueryResp.
Expand All @@ -148,6 +148,8 @@ def decode_timeseries(self, resp_ttb, tsobj):
:type resp_ttb: TTB-encoded tsqueryrsp or tsgetresp
:param tsobj: a TsObject
:type tsobj: TsObject
:param convert_timestamp: Convert timestamps to datetime objects
:type tsobj: boolean
"""
if resp_ttb is None:
return tsobj
Expand All @@ -170,7 +172,8 @@ def decode_timeseries(self, resp_ttb, tsobj):
tsobj.rows = []
for resp_row in resp_rows:
tsobj.rows.append(
self.decode_timeseries_row(resp_row, resp_coltypes))
self.decode_timeseries_row(resp_row, resp_coltypes,
convert_timestamp))
else:
raise RiakError(
"Expected 3-tuple in response, got: {}".format(resp_data))
Expand All @@ -182,14 +185,16 @@ def decode_timeseries_cols(self, cnames, ctypes):
ctypes = [str(ctype) for ctype in ctypes]
return TsColumns(cnames, ctypes)

def decode_timeseries_row(self, tsrow, tsct):
def decode_timeseries_row(self, tsrow, tsct, convert_timestamp=False):
"""
Decodes a TTB-encoded TsRow into a list

:param tsrow: the TTB decoded TsRow to decode.
:type tsrow: TTB dncoded row
:param tsct: the TTB decoded column types (atoms).
:type tsct: list
:param convert_timestamp: Convert timestamps to datetime objects
:type tsobj: boolean
:rtype list
"""
row = []
Expand All @@ -199,7 +204,7 @@ def decode_timeseries_row(self, tsrow, tsct):
elif isinstance(cell, list) and len(cell) == 0:
row.append(None)
else:
if tsct[i] == timestamp_a:
if convert_timestamp and tsct[i] == timestamp_a:
row.append(datetime_from_unix_time_millis(cell))
else:
row.append(cell)
Expand Down
8 changes: 2 additions & 6 deletions riak/pb/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@
MSG_CODE_TS_COVERAGE_RESP = 101
MSG_CODE_TS_COVERAGE_ENTRY = 102
MSG_CODE_TS_RANGE = 103
MSG_CODE_TS_TTB_PUT_REQ = 104
MSG_CODE_TOGGLE_ENCODING_REQ = 110
MSG_CODE_TOGGLE_ENCODING_RESP = 111
MSG_CODE_TS_TTB_MSG = 104
MSG_CODE_AUTH_REQ = 253
MSG_CODE_AUTH_RESP = 254
MSG_CODE_START_TLS = 255
Expand Down Expand Up @@ -168,9 +166,7 @@
MSG_CODE_TS_COVERAGE_RESP: riak.pb.riak_ts_pb2.TsCoverageResp,
MSG_CODE_TS_COVERAGE_ENTRY: riak.pb.riak_ts_pb2.TsCoverageEntry,
MSG_CODE_TS_RANGE: riak.pb.riak_ts_pb2.TsRange,
MSG_CODE_TS_TTB_PUT_REQ: riak.pb.riak_ts_pb2.TsTtbPutReq,
MSG_CODE_TOGGLE_ENCODING_REQ: riak.pb.riak_pb2.RpbToggleEncodingReq,
MSG_CODE_TOGGLE_ENCODING_RESP: riak.pb.riak_pb2.RpbToggleEncodingResp,
MSG_CODE_TS_TTB_MSG: None,
MSG_CODE_AUTH_REQ: riak.pb.riak_pb2.RpbAuthReq,
MSG_CODE_AUTH_RESP: None,
MSG_CODE_START_TLS: None
Expand Down
72 changes: 1 addition & 71 deletions riak/pb/riak_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading