In [None]:
# Import and call the Riak client to connect to your Riak TS node or cluster
# See http://docs.basho.com/riak/ts/latest/developing/python/ for more information
# on how to use the Riak client for Python
from riak import RiakClient

# Note: If you wish Riak TS to return date fields as Python dates
# and not epoch you need to set the transport_options as shown below
client = RiakClient(transport_options={'ts_convert_timestamp': True})

from datetime import datetime, timedelta
import calendar

read_table = "WaterMeterData"
write_table = "WaterMeterMonthlyRollUp"

# SQL DDL to create new table to store our roll up values
query = """\
CREATE TABLE WaterMeterMonthlyRollUp (
    customer_id       varchar   not null,
    meter_id          varchar   not null,
    reading_date      timestamp not null,
    billing_month     sint64    not null,
    billing_year      sint64    not null,
    total_gallons     double    not null,
    amount_billed     double    not null,
    PRIMARY KEY(
        (quantum(reading_date, 3000, 'd')),
         reading_date, customer_id, meter_id
    )
)
"""

try:
    client.ts_query(write_table, query)
    print("Table '{}' created successfully".format(write_table))
except Exception as e:
    print(e)

In [None]:
# Retrieve and output the newly created table's schema
description = client.table(write_table).describe()
for column_desc in description.rows:
    print(column_desc)

In [None]:
# Write roll up records to our newly created table

# Funtion to convert Python date to Unix Epoch
def convert_to_epoch ( date_to_convert ):
    return calendar.timegm(datetime.timetuple( date_to_convert )) * 1000

start_month = 1
end_month = 2
end_year = 2016

data_set = []

# Loop from 0 to 12 - January to December
while start_month < 13:
    start_date = datetime(2016, start_month, 1, 0, 00)
    end_date = datetime(end_year, end_month, 1, 0, 00)
    
    # SQL to calculate gallons of water consumed for the month
    # max(total_gallons) - min(total_gallons) = total gallons used
    query = """\
        SELECT
            max(total_gallons) - min(total_gallons)
        FROM WaterMeterData
        WHERE time_stamp >= {} AND time_stamp < {} AND
            customer_id = 'CUSTOMER-0001' AND meter_id = 'METER-0001'
    """.format( convert_to_epoch( start_date ), convert_to_epoch( end_date ) )
    
    result_set = client.ts_query(read_table, query)
    
    # Create new row for the roll up table and append to data_set
    new_row = ['CUSTOMER-0001','METER-0001', end_date, end_date.month, end_date.year, 
               result_set.rows[0][0], result_set.rows[0][0] * 0.06]
    print new_row
    data_set.append(new_row)
    
    start_month += 1
    if end_month <> 12:
        end_month += 1
    else:
        end_month = 1
        end_year = 2017

# Store the data_set list of lists to WaterMeterMonthlyRollUp
result_message = ""
try:   
    table_object = client.table(write_table).new(data_set)
    result = table_object.store()
    result_message = "Records written: {}".format(result)
except Exception as e:
    result_message = "Error: {}".format(e)
print result_message

In [None]:
# Read all the records we just wrote from WaterMeterMonthlyRollUp
start_date = datetime(2016, 1, 1, 0, 00)
end_date = datetime(2017, 2, 1, 0, 00)

query = """\
    SELECT *
    FROM WaterMeterMonthlyRollUp
    WHERE reading_date >= {} AND reading_date < {}
""".format( convert_to_epoch( start_date ), convert_to_epoch( end_date ) )

print query

data_set = client.ts_query(write_table, query)
for row in data_set.rows:
    print(row)