-
Notifications
You must be signed in to change notification settings - Fork 183
Closed
Labels
Milestone
Description
I am writing the following to Riak TS:
['field1_val', 'field2_val', datetime.datetime(2015, 1, 1, 12, 0, 0), 0]
When reading this item from Riak TS I get back the following:
['field1_val', 'field2_val', 1420142400000, 0]
Converting 1420142400000 to datetime I get:
datetime.datetime.utcfromtimestamp(1420142400)
datetime.datetime(2015, 1, 1, 20, 0)
Now if I try to see the unix epoch time in milliseconds for datetime.datetime(2015, 1, 1, 12, 0, 0), I get:
1420113600000
I am using this function to convert to unix epoch milliseconds:
def unix_time_millis(dt):
td = dt - datetime.datetime.utcfromtimestamp(0)
return int(td.total_seconds() * 1000.0)
Somewhere there is a timezone issue. Not sure where.
For completeness, this is my actually code snippet:
test_row_in = ['field1_val', 'field2_val', datetime.datetime(2015, 1, 1, 12, 0, 0), 0]
test_rdd = spark_context.parallelize([test_row_in])
print(test_rdd.collect())
test_df = test_rdd.toDF(['field1', 'field2', 'datetime', 'data'])
one_second = datetime.timedelta(seconds=1)
one_day = datetime.timedelta(days=1)
start = datetime.datetime(2015, 1, 1, 12, 0, 0) - one_day
end = datetime.datetime(2015, 1, 1, 12, 0, 0) + one_day
fmt = """
select * from {table_name}
where datetime >= {start_date}
AND datetime <= {end_date}
AND field1 = '{field1}'
AND field2 = '{field2}'
"""
query = fmt.format(table_name=riak_ts_table_name, start_date=unix_time_millis(start), end_date=unix_time_millis(end), field1='field1_val', field2='field2_val')
assert retry_func_with_timeout(func=test_df.write.format('org.apache.spark.sql.riak').mode('Append').save,
times=10,
timeout=30,
signal=True,
args=[riak_ts_table_name],
use_condition=False,
condition_func=None,
condition_val=None,
test_func=None,
test_args=None
)[0] == True
assert retry_func_with_timeout(func=riak_ts_table.query,
times=10,
timeout=3,
signal=True,
args=[query],
use_condition=True,
condition_func=ts_query_condition,
condition_val=test_rdd.collect(),
test_func=None,
test_args=None
)[0] == True
I am running riak (2.5.3), on OSX El Capitan 10.11.3 (15D21)
Reactions are currently unavailable