# Using AWS Timestream to read/write tick data

October 2020 - Saeed Amen - https://www.cuemacro.com - saeed@cuemacro.com

## Introduction

In this Jupyter notebook, we expand on the blog article at https://www.cuemacro.com/2020/10/27/using-aws-timestream-for-tick-data/ on Timestream, with more technical details and code.

We'll give simple example of how to use AWS Timestream. Timestream AWS's new serverless database designed for storing time series, in particular from IoT devices. Although, given that finance is awash with time series, I though it would be a good idea to see how it can handle high frequency financial market tick data in my example.

Timestream is also fully managed so you don't need to install the database and it should scale without you having to worry about renting more servers etc. which you'd have to do if you managed it yourself. 
It has lots of built in functions for querying the data and has the Timestream Query Language (see https://docs.aws.amazon.com/timestream/latest/developerguide/reference.html), which looks pretty similar to SQL.

Obviously, there are many other time series databases out there. There are already many others out including, kdb+/q which is heavily used in sell side firms/trading shops using high frequency tick data. There are also some open source time series databases like InfluxDB or Arctic (used with MongoDB). Many of these databases are also available in the cloud in a fully managed way too.

As to which is cheaper, I haven't attempted to do any sort of cost analysis, although [AWS](https://aws.amazon.com/blogs/aws/store-and-access-time-series-data-at-any-scale-with-amazon-timestream-now-generally-available/) claims that: 

    Timestream is a fast, scalable, and serverless time series database service that makes it easy to collect, store, and process trillions of time series events per day up to 1,000 times faster and at as little as to 1/10th the cost of a relational database.

My tcapy Python library, for transaction cost analysis of FX spot, actually has wrappers for reading/writing tick data using kdb+/q, InfluxDB and Arctic (download it for free from https://github.com/cuemacro/tcapy).

### The idea of Timestream

Timestream is a serverless database. It is also fully managed and you don't need to install the database. By having a serverless archtecture, it should hopefully scale with the data you store. You'll only pay for the space you use.

It has lots of built in functions for querying the data and has the Timestream Query Language (see https://docs.aws.amazon.com/timestream/latest/developerguide/reference.html), which looks pretty similar to SQL.

Timestream stores newer data in memory, whilst older data is stored on (cheaper) magnetic tape. Other time series databases do something similar, storing new data in RAM, whilst old data can be stored on disk.

You can stipulate how long you want Timestream to store the data in either. To the user querying the data they don't need to worry about separating their queries for the memory/magnetic tape parts of the data. This is done automatically.

### Restrictions on the time of writing data

One important point is that you can't simply record data with any timestamp. [AWS notes](https://aws.amazon.com/blogs/aws/store-and-access-time-series-data-at-any-scale-with-amazon-timestream-now-generally-available/) that

    When writing data in Timestream, you cannot insert data that is older than the retention period of the memory store. For example, in my case I will not be able to insert records older than 1 hour. Similarly, you cannot insert data with a future timestamp.

### Record, Measure and Dimensions

Each data point is a `Record` has a `Time` associated with it, and a `Measure`, which is like a database field/column and we specific a type for it. So if we think of tick data, the `Measure` could be the mid-price and the `Time` could be the time at which we write the data to Timestream. We can't specify multiple `Measure` fields. If we are storing tick data, however, we often need to store multiple values for the same time stamp.

Each data point can also have `Dimensions` associated with it, which can have multiple name/value combinations. These will typically be things which don't change much, such as the ticker. 

### Storing multiple fields using Dimensions

Whilst we can't specify multiple `Measure` fields in the same data point, we can point different `Measure` fields to the same `Dimensions`. For example, later, I'll store the `venue_time` in the `Dimensions` field. So we could have a bid value, ask value, mid value etc. all pointing to the same `Dimensions` name/value combinations. When querying the data, we could stich back these into the format we want. Having two `Time` fields might seem odd, however, in practice, this often happens when collecting tick data, you have the `venue_time` and the time when you snapped the data locally, and there's likely to be a gap between the two.

At this stage, I'm not sure what the performance implications of using the `Dimensions` to store the `venue_time`, but it does seem like a relatively simple way of storing more complicated structures, where you have multiple fields in the same record. This is approach is somewhat different to other existing time series databases like kdb+/q, InfluxDB etc. where it's fairly straightforward to store multiple fields in the same record.


### Further reading on Timestream

Timestream is quite new so there isn't a huge amount of material available on the web for it, but it does appear to be slowly increasing. The first port of call is AWS's documentation which is very comprehensive at https://docs.aws.amazon.com/timestream/index.html.

I'd also recommend reading this introduction to Timestream at https://dev.to/pblitz/aws-timestream-an-intro-4i1j

AWS have put a sample Python app on GitHub which uses boto to access a Timestream database at https://github.com/awslabs/amazon-timestream-tools/tree/master/sample_apps/python for recording your CPU utilisation. We're going to use that as reference to play with Timestream from Python!

## Making AWS accessible via Python

Given that Timestream is in the cloud, we need to make sure that AWS services need to be accessible from Python, whether we are running our process in the cloud (which seems preferable to reduce latency) or locally. Whilst we are using Python, Timestream is also accessible from many other langauges.

* Hence, before going through this tutorial, you'll need to go through several steps so AWS services are accessible from your machine
    * You'll need to create an IAM user, with appropriate permissions
        * In our case this will to have permissions to use Timestream
        * Get the Access key ID and secret access key for the IAM user
    * Install AWS CLI
        * run `sudo apt install awscli`
        * or you can download the zip file
        * run `aws configure` to set the default access key ID, default AWS availability zone etc.
        * this will create files in ~/.aws/credentials and ~/.aws/config
    * AWS CLI instructions at https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2-linux.html#cliv2-linux-install

* Once your  credentials are set, we can use boto3, which is an SDK for Python developers to access AWS resources:
    * boto3 instructions https://boto3.amazonaws.com/v1/documentation/api/latest/index.html
    * You can install boto3 using pip
    * If you follow the instruction at https://github.com/cuemacro/teaching/blob/master/pythoncourse/installation/installing_anaconda_and_pycharm.ipynb - you'll create a conda environment `py37class` which includes boto3, and many useful data science libraries, which I use for my Python teaching

## Download tick data

Let's first download some FX tick data from Dukascopy, which is a Swiss FX retail broker, which offers access to its dataset. We'll later store in Timestream. Create a folder under the working directory called `raw_data` for it doesn't already exists, where we'll store the data initally as flat files.

In [1]:
!mkdir raw_data

mkdir: cannot create directory ‘raw_data’: File exists


Download EURUSD tick data from Dukascopy for 2019 (note: this will take a while!) and dump to a Parquet file. If we re-run our code, it should pick up the Parquet file instead of downloading from the web.

In [2]:
from findatapy.market import Market, MarketDataGenerator, MarketDataRequest

import pandas as pd
import os

market = Market(market_data_generator=MarketDataGenerator())

if os.path.exists('raw_data/EURUSD_2019.gzip'):
    df_tick = pd.read_parquet('raw_data/EURUSD_2019.gzip')
else:
    md_request = MarketDataRequest(
        start_date='01 Jan 2018', finish_date='01 Jan 2019',
        fields=['bid', 'ask'], vendor_fields=['bid', 'ask'],
        freq='tick', data_source='dukascopy',
        tickers=['EURUSD'], vendor_tickers=['EURUSD'], category='fx')

    df_tick = Market(market_data_generator=MarketDataGenerator()).fetch_market(md_request)
    df_tick.to_parquet('raw_data/EURUSD_2019.gzip')

Create a mid price column, which we'll use later.

In [3]:
df_tick['mid'] = (df_tick['EURUSD.bid'] + df_tick['EURUSD.ask'])/2.0

2020-10-31 12:53:31,069 - numexpr.utils - INFO - Note: NumExpr detected 28 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
2020-10-31 12:53:31,070 - numexpr.utils - INFO - NumExpr defaulting to 8 threads.


Print tick data to see what it looks like.

In [4]:
print(df_tick.head(10))

                         EURUSD.bid  EURUSD.ask       mid
Date                                                     
2019-01-01 22:02:37.254     1.14598     1.14682  1.146400
2019-01-01 22:02:38.590     1.14599     1.14682  1.146405
2019-01-01 22:02:39.138     1.14599     1.14684  1.146415
2019-01-01 22:02:55.787     1.14598     1.14684  1.146410
2019-01-01 22:03:02.060     1.14598     1.14684  1.146410
2019-01-01 22:03:12.290     1.14599     1.14684  1.146415
2019-01-01 22:03:16.253     1.14599     1.14684  1.146415
2019-01-01 22:03:58.115     1.14607     1.14691  1.146490
2019-01-01 22:03:59.146     1.14607     1.14678  1.146425
2019-01-01 22:04:00.208     1.14607     1.14684  1.146455


## Create Timestream database and table

Our next step is to create a Timestream database and table, which we'll later populate with tick data.

We need to create a session to interact with AWS, by using the boto3 Python library. This will pick up our various AWS keys which we've set earlier after installation of AWS CLI.

In [5]:
import boto3
from botocore.config import Config
session = boto3.Session()

Let's create the clients for writing and reading to Timestream using boto3. Note, old versions of boto3, won't have the `timestream-write` or `timestream-query` services.

In [6]:
# Recommended Timestream write client SDK configuration:
#  - Set SDK retry count to 10.
#  - Use SDK DEFAULT_BACKOFF_STRATEGY
#  - Set RequestTimeout to 20 seconds .
#  - Set max connections to 5000 or higher.
write_client = session.client('timestream-write', config=Config(read_timeout=20, max_pool_connections=5000,
                                                                    retries={'max_attempts': 10}))
query_client = session.client('timestream-query')

2020-10-31 12:53:31,778 - botocore.utils - INFO - IMDS ENDPOINT: http://169.254.169.254/
2020-10-31 12:53:31,784 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials


Define the database and table names. We also need to define the duration of the memory storage for Timestream (24h) and also the duration of the period for data to reside under magnetic tape (7 days)

In [7]:
DATABASE_NAME = "fxtick"
TABLE_NAME = "EURUSD"
HT_TTL_HOURS = 24
CT_TTL_DAYS = 7

Write functions for creating the database and the table.

In [8]:
def create_database():
        print("Creating Database")
        try:
            write_client.create_database(DatabaseName=DATABASE_NAME)
            print("Database [%s] created successfully." % DATABASE_NAME)
        except write_client.exceptions.ConflictException:
            print("Database [%s] exists. Skipping database creation" % DATABASE_NAME)
        except Exception as err:
            print("Create database failed:", err)

In [9]:
 def create_table():
        print("Creating table")
        retention_properties = {
            'MemoryStoreRetentionPeriodInHours': HT_TTL_HOURS,
            'MagneticStoreRetentionPeriodInDays': CT_TTL_DAYS
        }
        try:
            write_client.create_table(DatabaseName=DATABASE_NAME, TableName=TABLE_NAME,
                                     RetentionProperties=retention_properties)
            print("Table [%s] successfully created." % TABLE_NAME)
        except write_client.exceptions.ConflictException:
            print("Table [%s] exists on database [%s]. Skipping table creation" % (
                TABLE_NAME, DATABASE_NAME))
        except Exception as err:
            print("Create table failed:", err)

Let's now create the database and the table. These steps will fail, if we already have them created.

In [10]:
create_database()

Creating Database
Database [fxtick] created successfully.


In [11]:
create_table()

Creating table
Table [EURUSD] successfully created.


## Filling the table with tick data

Create a function to get the current time (in Unix milliseconds), this will be used to populate the `Time` on each record.

In [12]:
import time

def get_current_time():
    return str(int(round(time.time() * 1000)))

Write a function that takes a DataFrame and converts the selected column into a `Measure`. This is then converted into a list of dictionaries. The final step is to take the timestamp, which we've got earlier and remap as the `venue_time`. The `Time` field is the time we snap here (ie. writing time). The for loop at the end is going to work quite slow. If you were using this in production, it would be worth spending time to optimize this code. There is an AWS project on GitHub, [AWS Data Wrangler](https://github.com/awslabs/aws-data-wrangler) that makes it easier to use Pandas on AWS, but I didn't have enough time to test that.

In [13]:
import numpy as np

def convert_df_to_dict(df, column):
    
    df_conv = pd.DataFrame(index=df.index, data=df[column].values, columns=['MeasureValue'])
    df_conv.index.name = 'venue_time'
    df_conv = df_conv.reset_index()
    df_conv['venue_time'] = df_conv['venue_time'].astype(np.int64) // 10**6

    df_conv['MeasureName'] = 'mid'
    df_conv['MeasureValueType'] = 'DOUBLE'
    
    df_conv['MeasureValue'] = df_conv['MeasureValue'].apply(str)
    df_conv['venue_time'] = df_conv['venue_time'].apply(str)
    
    records = df_conv.to_dict('records')
            
    for r in records:
        r['Dimensions'] = [{'Name' : 'venue_time', 'Value' : r['venue_time']}]
        r.pop('venue_time') # Remove this, because written as a Dimension
        r['Time'] = get_current_time() 
    
    return records

This function dumps the data to Timestream.

In [14]:
 def write_dict(records):
        print("Writing records")

        try:
            result = write_client.write_records(DatabaseName=DATABASE_NAME, TableName=TABLE_NAME,
                                              Records=records, CommonAttributes={})
            print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
        except Exception as err:
            print("Error:", err)

Let's convert a small number of points in our tick data, selecting the `mid` point, into a list of records.

In [15]:
tick_records = convert_df_to_dict(df_tick[0:50], 'mid')

The data is now in a form that can be ingested by Timestream, ie. a list of dictionaries. Note, the nested `Dimensions`.

In [16]:
print(tick_records[0:5])

[{'MeasureValue': '1.146399974822998', 'MeasureName': 'mid', 'MeasureValueType': 'DOUBLE', 'Dimensions': [{'Name': 'venue_time', 'Value': '1546380157254'}], 'Time': '1604148812630'}, {'MeasureValue': '1.1464049816131592', 'MeasureName': 'mid', 'MeasureValueType': 'DOUBLE', 'Dimensions': [{'Name': 'venue_time', 'Value': '1546380158590'}], 'Time': '1604148812630'}, {'MeasureValue': '1.1464149951934814', 'MeasureName': 'mid', 'MeasureValueType': 'DOUBLE', 'Dimensions': [{'Name': 'venue_time', 'Value': '1546380159138'}], 'Time': '1604148812630'}, {'MeasureValue': '1.1464099884033203', 'MeasureName': 'mid', 'MeasureValueType': 'DOUBLE', 'Dimensions': [{'Name': 'venue_time', 'Value': '1546380175787'}], 'Time': '1604148812630'}, {'MeasureValue': '1.1464099884033203', 'MeasureName': 'mid', 'MeasureValueType': 'DOUBLE', 'Dimensions': [{'Name': 'venue_time', 'Value': '1546380182060'}], 'Time': '1604148812630'}]


Let's write our list of records to our Timestream database. Note, that sometimes we might get some error associated with the upload, such as rate limit exceeded in Jupyter notebook. If this is the case, we simply start our Jupyter notebook with this additional command line parameter `--NotebookApp.iopub_data_rate_limit=1000000000`.

This might take a while to run, particularly, if your not running it on an EC2 instance in the same availability zone. We'd also suggest parallelizing this write function, to cut down the number of records your write. There is a limit to the amount of data you push to Timestream, which can cause an exception if breached.

In [17]:
write_dict(tick_records)

Writing records
WriteRecords Status: [200]


## Querying the tick data in the database


We've already written the tick data to Timestream. Let's see if we can fetch it. We now create a Query object which runs any query we write. It will also iterate through the output to print it to screen. This code is taken from GitHub https://github.com/awslabs/amazon-timestream-tools/blob/master/sample_apps/python/QueryExample.py.

In [18]:
class Query(object):

    def __init__(self, client):
        self.client = client
        self.paginator = client.get_paginator('query')

    # See records ingested into this table so far
    SELECT_ALL = f"SELECT * FROM {DATABASE_NAME}.{TABLE_NAME}"

    def run_query(self, query_string):
        try:
            page_iterator = self.paginator.paginate(QueryString=query_string)
            for page in page_iterator:
                self._parse_query_result(page)
        except Exception as err:
            print("Exception while running query:", err)

    def _parse_query_result(self, query_result):
        column_info = query_result['ColumnInfo']

        print("Metadata: %s" % column_info)
        print("Data: ")
        for row in query_result['Rows']:
            print(self._parse_row(column_info, row))

    def _parse_row(self, column_info, row):
        data = row['Data']
        row_output = []
        for j in range(len(data)):
            info = column_info[j]
            datum = data[j]
            row_output.append(self._parse_datum(info, datum))

        return "{%s}" % str(row_output)

    def _parse_datum(self, info, datum):
        if datum.get('NullValue', False):
            return "%s=NULL" % info['Name'],

        column_type = info['Type']

        # If the column is of TimeSeries Type
        if 'TimeSeriesMeasureValueColumnInfo' in column_type:
            return self._parse_time_series(info, datum)

        # If the column is of Array Type
        elif 'ArrayColumnInfo' in column_type:
            array_values = datum['ArrayValue']
            return "%s=%s" % (info['Name'], self._parse_array(info['Type']['ArrayColumnInfo'], array_values))

        # If the column is of Row Type
        elif 'RowColumnInfo' in column_type:
            row_column_info = info['Type']['RowColumnInfo']
            row_values = datum['RowValue']
            return self._parse_row(row_column_info, row_values)

        # If the column is of Scalar Type
        else:
            return self._parse_column_name(info) + datum['ScalarValue']

    def _parse_time_series(self, info, datum):
        time_series_output = []
        for data_point in datum['TimeSeriesValue']:
            time_series_output.append("{time=%s, value=%s}"
                                      % (data_point['Time'],
                                         self._parse_datum(info['Type']['TimeSeriesMeasureValueColumnInfo'],
                                                           data_point['Value'])))
        return "[%s]" % str(time_series_output)

    def _parse_array(self, array_column_info, array_values):
        array_output = []
        for datum in array_values:
            array_output.append(self._parse_datum(array_column_info, datum))

        return "[%s]" % str(array_output)

    def run_query_with_multiple_pages(self, limit):
        query_with_limit = self.SELECT_ALL + " LIMIT " + str(limit)
        print("Starting query with multiple pages : " + query_with_limit)
        self.run_query(query_with_limit)

    def cancel_query(self):
        print("Starting query: " + self.SELECT_ALL)
        result = self.client.query(QueryString=self.SELECT_ALL)
        print("Cancelling query: " + self.SELECT_ALL)
        try:
            self.client.cancel_query(QueryId=result['QueryId'])
            print("Query has been successfully cancelled")
        except Exception as err:
            print("Cancelling query failed:", err)

    @staticmethod
    def _parse_column_name(info):
        if 'Name' in info:
            return info['Name'] + "="
        else:
            return ""

Let's instantiate the Query object.

In [19]:
query = Query(query_client)

Let's define the query, which is simply to output the time of writing, `venue_time` and the `mid` price.

In [20]:
QUERY_1 = f"""
        SELECT time, venue_time, measure_value::double
        FROM {DATABASE_NAME}.{TABLE_NAME} ORDER BY venue_time, time DESC LIMIT 1000 
        """

Now run the the query and print the output, to show the data to the user. In practice, a next obvious step would be rewrite the Query object, so that it parse the output and convert into a pandas DataFrame, rather than simply printing every row to the user.

In [21]:
query_output = query.run_query(QUERY_1)

Metadata: [{'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}}, {'Name': 'venue_time', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'measure_value::double', 'Type': {'ScalarType': 'DOUBLE'}}]
Data: 
{['time=2020-10-31 12:53:32.630000000', 'venue_time=1546380157254', 'measure_value::double=1.146399974822998']}
{['time=2020-10-31 12:53:32.630000000', 'venue_time=1546380158590', 'measure_value::double=1.1464049816131592']}
{['time=2020-10-31 12:53:32.630000000', 'venue_time=1546380159138', 'measure_value::double=1.1464149951934814']}
{['time=2020-10-31 12:53:32.630000000', 'venue_time=1546380175787', 'measure_value::double=1.1464099884033203']}
{['time=2020-10-31 12:53:32.630000000', 'venue_time=1546380182060', 'measure_value::double=1.1464099884033203']}
{['time=2020-10-31 12:53:32.630000000', 'venue_time=1546380192290', 'measure_value::double=1.1464149951934814']}
{['time=2020-10-31 12:53:32.630000000', 'venue_time=1546380196253', 'measure_value::double=1.1464149951934814']}
{['time=2

## Clean up database

Before we leave, let's define functions to delete the table and the database we've just written, so we don't incur any unnecessary storage costs on Timestream.

In [22]:
def delete_table():
        print("Deleting Table")
        try:
            result = write_client.delete_table(DatabaseName=DATABASE_NAME, TableName=TABLE_NAME)
            print("Delete table status [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
        except write_client.exceptions.ResourceNotFoundException:
            print("Table [%s] doesn't exist" % TABLE_NAME)
        except Exception as err:
            print("Delete table failed:", err)

def delete_database():
        print("Deleting Database")
        try:
            result = write_client.delete_database(DatabaseName=DATABASE_NAME)
            print("Delete database status [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
        except self.client.exceptions.ResourceNotFoundException:
            print("database [%s] doesn't exist" % DATABASE_NAME)
        except Exception as err:
            print("Delete database failed:", err)


Let's now run both functions to delete the table and then the database.

In [23]:
delete_table()

Deleting Table
Delete table status [200]


In [24]:
delete_database()

Deleting Database
Delete database status [200]


## Conclusion

We've seen how it's fairly easy to use Timestream to store tick data. I have only stored a relatively small number of points, so I've not really tested out how it scales (which is supposed to be the big benefit of using it!). As noted, to make this into production code, we'd likely to need to create highly optimized code for conversion of Pandas DataFrames, back and forth into formats useful for Timestream. It would also likely be necessary to parallelize the code both for writing to Timestream and also for querying the dataset (indeed, this is what I've done in my tcapy library, when fetching tick data).

If you have any feedback about Timestream, I'd be very interested to hear it.