# Persisting Time Series Data to Databases

In [12]:
import pandas as pd
from sqlalchemy import create_engine
import pandas_datareader.data as web

engine = create_engine("postgresql://postgres:password@localhost:5432/postgres")

In [13]:
engine

Engine(postgresql://postgres:***@localhost:5432/postgres)

In [14]:
amzn_df_2020 = web.get_data_yahoo('AMZN', 
                                  start='2020-01-01', 
                                  end='2020-12-31')

In [4]:
amzn_df_2020.head()

Unnamed: 0_level_0,High,Low,Open,Close,Volume,Adj Close
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2020-01-02,1898.01001,1864.150024,1875.0,1898.01001,4029000,1898.01001
2020-01-03,1886.199951,1864.5,1864.5,1874.969971,3764400,1874.969971
2020-01-06,1903.689941,1860.0,1860.0,1902.880005,4061800,1902.880005
2020-01-07,1913.890015,1892.040039,1904.5,1906.859985,4044900,1906.859985
2020-01-08,1911.0,1886.439941,1898.040039,1891.969971,3508000,1891.969971


In [5]:
amzn_df_2020.shape

(253, 6)

In [6]:
amzn_df_2020.to_sql('amazon',
                    engine,
                    if_exists='replace')

In [7]:
query = '''
SELECT EXISTS (
   SELECT FROM information_schema.tables 
   WHERE  table_schema = 'public'
   AND    table_name   = 'amazon'
   );
'''
engine.execute(query).fetchone()

(True,)

In [8]:
query = '''
select count(*) from amazon;
'''
engine.execute(query).fetchone()

(253,)

In [15]:
amzn_df_2021 = web.get_data_yahoo('AMZN', 
                                start='2021-01-01', 
                                end='2021-06-01')

In [10]:
amzn_df_2021.to_sql('amazon',
                    engine,
                    if_exists='append')

In [11]:
amzn_df_2021.shape

(103, 6)

In [12]:
query = '''
select count(*) from amazon;
'''
engine.execute(query).fetchone()

(356,)

### Writing to MySQL 

In [18]:
engine = create_engine("mysql+pymysql://root:password@localhost:3306/stocks")
amzn_df_2020.to_sql('amazon',
                    engine,
                    if_exists='replace')

query = '''
select count(*) from amazon;
'''
engine.execute(query).fetchone()

(253,)

In [19]:
amzn_df_2021.to_sql('amazon',
                    engine,
                    if_exists='append')

query = '''
select count(*) from amazon;
'''
engine.execute(query).fetchone()

(356,)

# Storing Data to MongoDB

In [16]:
import pandas as pd
from pymongo import MongoClient


In [17]:
client = MongoClient('mongodb://localhost:27017')

In [18]:
db = client['stocks']
collection = db['amazon']

In [19]:
amzn_df_2020

Unnamed: 0_level_0,High,Low,Open,Close,Volume,Adj Close
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2020-01-02,1898.010010,1864.150024,1875.000000,1898.010010,4029000,1898.010010
2020-01-03,1886.199951,1864.500000,1864.500000,1874.969971,3764400,1874.969971
2020-01-06,1903.689941,1860.000000,1860.000000,1902.880005,4061800,1902.880005
2020-01-07,1913.890015,1892.040039,1904.500000,1906.859985,4044900,1906.859985
2020-01-08,1911.000000,1886.439941,1898.040039,1891.969971,3508000,1891.969971
...,...,...,...,...,...,...
2020-12-24,3202.000000,3169.000000,3193.899902,3172.689941,1451900,3172.689941
2020-12-28,3304.000000,3172.689941,3194.000000,3283.959961,5686800,3283.959961
2020-12-29,3350.649902,3281.219971,3309.939941,3322.000000,4872900,3322.000000
2020-12-30,3342.100098,3282.469971,3341.000000,3285.850098,3209300,3285.850098


In [20]:
amzn_records = amzn_df_2020.reset_index().to_dict(orient='records')

In [21]:
len(amzn_records)

253

In [22]:
#amzn_df_2020.reset_index().to_dict()

In [23]:
amzn_records[0:1]

[{'Date': Timestamp('2020-01-02 00:00:00'),
  'High': 1898.010009765625,
  'Low': 1864.1500244140625,
  'Open': 1875.0,
  'Close': 1898.010009765625,
  'Volume': 4029000,
  'Adj Close': 1898.010009765625}]

In [24]:
collection.insert_many(amzn_records)

<pymongo.results.InsertManyResult at 0x7fc7d503b0c0>

In [25]:
client.list_database_names()

['admin', 'config', 'local', 'stocks']

In [26]:
db.list_collection_names()

['amazon']

In [27]:
collection.find_one()

{'_id': ObjectId('615d482dad102432df7ea582'),
 'Date': datetime.datetime(2020, 1, 2, 0, 0),
 'High': 1898.010009765625,
 'Low': 1864.1500244140625,
 'Open': 1875.0,
 'Close': 1898.010009765625,
 'Volume': 4029000,
 'Adj Close': 1898.010009765625}

In [28]:
# filter documents that are greater than August 1, 2020
# and retrieve the first record
import datetime
collection.find_one({'Date': {'$gt': datetime.datetime(2020, 8,1)}})

{'_id': ObjectId('615d482dad102432df7ea615'),
 'Date': datetime.datetime(2020, 8, 3, 0, 0),
 'High': 3184.0,
 'Low': 3104.0,
 'Open': 3180.510009765625,
 'Close': 3111.889892578125,
 'Volume': 5074700,
 'Adj Close': 3111.889892578125}

In [29]:
collection.count_documents({})

253

### InsertOneResult

In [30]:
one_record = (amzn_df_2021.reset_index()
                          .iloc[0]
                          .to_dict())
one_record

{'Date': Timestamp('2021-01-04 00:00:00'),
 'High': 3272.0,
 'Low': 3144.02001953125,
 'Open': 3270.0,
 'Close': 3186.6298828125,
 'Volume': 4411400,
 'Adj Close': 3186.6298828125}

In [31]:
result_id = collection.insert_one(one_record)

In [32]:
result_id

<pymongo.results.InsertOneResult at 0x7fc7d3afa9c0>

In [33]:
result_id.inserted_id

ObjectId('615d483bad102432df7ea67f')

In [34]:
# list(collection.find({'Date': {'$gt': datetime.datetime(2020, 8,1)}}, {'Close': 1 }))

### MongoDB Bucketing

In [77]:
bucket = db['stocks_bucket']
amzn_df_2020['month'] = amzn_df_2020.index.month

In [78]:
for month in amzn_df_2020.index.month.unique():
    record = {}
    record['month'] = month
    record['symbol'] = 'AMZN'
    record['price'] = list(amzn_df_2020[amzn_df_2020['month'] == month]['Close'].values)
    bucket.insert_many([record])

In [79]:
bucket.count_documents({})

12

In [80]:
bucket.find_one({'month': 6})

{'_id': ObjectId('615d4e67ad102432df7ea691'),
 'month': 6,
 'symbol': 'AMZN',
 'price': [2471.0400390625,
  2472.409912109375,
  2478.39990234375,
  2460.60009765625,
  2483.0,
  2524.06005859375,
  2600.860107421875,
  2647.449951171875,
  2557.9599609375,
  2545.02001953125,
  2572.679931640625,
  2615.27001953125,
  2640.97998046875,
  2653.97998046875,
  2675.010009765625,
  2713.820068359375,
  2764.409912109375,
  2734.39990234375,
  2754.580078125,
  2692.8701171875,
  2680.3798828125,
  2758.820068359375]}

### MongoDB Time Series Collection

In [174]:
ts = db.create_collection(name = "stocks_ts", 
                         capped =  False,
                         timeseries = {"timeField": "date", 
                                       "metaField": "metadata"})

In [175]:
[i for i in db.list_collections() if i['name'] =='stocks_ts']

[{'name': 'stocks_ts',
  'type': 'timeseries',
  'options': {'timeseries': {'timeField': 'date',
    'metaField': 'metadata',
    'granularity': 'seconds',
    'bucketMaxSpanSeconds': 3600}},
  'info': {'readOnly': False}}]

In [176]:
cols = ['Close']
records = []
for month in amzn_df_2020[cols].iterrows():
    records.append(
        {'metadata': 
                 {'ticker': 'AMZN', 'type': 'close'},
         'date': month[0],
         'price': month[1]['Close']})

In [177]:
records[0:1]

[{'metadata': {'ticker': 'AMZN', 'type': 'close'},
  'date': Timestamp('2020-01-02 00:00:00'),
  'price': 1898.010009765625}]

In [178]:
ts.insert_many(records)

<pymongo.results.InsertManyResult at 0x7fc7d2e29800>

In [179]:
ts.find_one({})

{'date': datetime.datetime(2020, 1, 2, 0, 0),
 'metadata': {'ticker': 'AMZN', 'type': 'close'},
 'price': 1898.010009765625,
 '_id': ObjectId('615d5badad102432df7eb371')}

# Storing Data to Time Series Database (InfluxDB)

In [225]:
from influxdb_client import InfluxDBClient, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
import pandas as pd
from  pathlib import Path

In [226]:
path = Path('../../datasets/Ch5/ExtraSensory/')

In [227]:
file = '0A986513-7828-4D53-AA1F-E02D6DF9561B.features_labels.csv.gz'

In [228]:
columns = ['timestamp',
           'watch_acceleration:magnitude_stats:mean']

df = pd.read_csv(path.joinpath(file),
                usecols=columns)
df = df.fillna(method='backfill')
df.columns = ['timestamp','acc']
df.shape


(3960, 2)

In [229]:
df.head()

Unnamed: 0,timestamp,acc
0,1449601597,995.369977
1,1449601657,995.369977
2,1449601717,995.369977
3,1449601777,996.406005
4,1449601855,1034.180063


In [230]:
df['timestamp'] = pd.to_datetime(df['timestamp'],
                                  origin='unix',
                                  unit='s',
                                  utc=True)


In [231]:
df.head()

Unnamed: 0,timestamp,acc
0,2015-12-08 19:06:37+00:00,995.369977
1,2015-12-08 19:07:37+00:00,995.369977
2,2015-12-08 19:08:37+00:00,995.369977
3,2015-12-08 19:09:37+00:00,996.406005
4,2015-12-08 19:10:55+00:00,1034.180063


In [232]:
df.set_index('timestamp', inplace=True)

In [233]:
df.head()

Unnamed: 0_level_0,acc
timestamp,Unnamed: 1_level_1
2015-12-08 19:06:37+00:00,995.369977
2015-12-08 19:07:37+00:00,995.369977
2015-12-08 19:08:37+00:00,995.369977
2015-12-08 19:09:37+00:00,996.406005
2015-12-08 19:10:55+00:00,1034.180063


In [234]:
bucket = "stocks_ts"
org = "my-org"
token = "c5c0JUoz-joisPCttI6hy8aLccEyaflyfNj1S_Kff34N_4moiCQacH8BLbLzFu4qWTP8ibSk3JNYtv9zlUwxeA=="
client = InfluxDBClient(url="http://localhost:8086", token=token)


In [235]:
client.health()

{'checks': [],
 'commit': '741389781e',
 'message': 'ready for queries and writes',
 'name': 'influxdb',
 'status': 'pass',
 'version': '2.0.5'}

In [236]:
writer = client.write_api(WriteOptions(SYNCHRONOUS,
                     batch_size=500,
                     max_retries=5_000))


writer.write(bucket=bucket,
                org=org,
                record=df,
                write_precision='ns',
                data_frame_measurement_name='acc',
                data_frame_tag_columns=[])

In [237]:
query = '''
         from(bucket: "stocks_ts")
         |> range(start: 2015-12-08)
         '''

result = client.query_api()

influx_df = result.query_data_frame(
                             org=org,
                             query=query,
                             data_frame_index='_time')


In [238]:
influx_df.shape

(3960, 7)

In [239]:
influx_df.columns

Index(['result', 'table', '_start', '_stop', '_value', '_field',
       '_measurement'],
      dtype='object')

In [240]:
influx_df.head()

Unnamed: 0_level_0,result,table,_start,_stop,_value,_field,_measurement
_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2015-12-08 19:06:37+00:00,_result,0,2015-12-08 00:00:00+00:00,2021-10-06 09:09:35.934953+00:00,995.369977,acc,acc
2015-12-08 19:07:37+00:00,_result,0,2015-12-08 00:00:00+00:00,2021-10-06 09:09:35.934953+00:00,995.369977,acc,acc
2015-12-08 19:08:37+00:00,_result,0,2015-12-08 00:00:00+00:00,2021-10-06 09:09:35.934953+00:00,995.369977,acc,acc
2015-12-08 19:09:37+00:00,_result,0,2015-12-08 00:00:00+00:00,2021-10-06 09:09:35.934953+00:00,996.406005,acc,acc
2015-12-08 19:10:55+00:00,_result,0,2015-12-08 00:00:00+00:00,2021-10-06 09:09:35.934953+00:00,1034.180063,acc,acc


In [253]:
buck = client.buckets_api()

In [259]:
orgs = client.organizations_api()

In [260]:
orgs.find_organizations()

[{'created_at': datetime.datetime(2021, 4, 29, 4, 39, 12, 12543, tzinfo=tzutc()),
  'description': '',
  'id': 'a3012d3fefd8cacb',
  'links': {'_self': '/api/v2/orgs/a3012d3fefd8cacb',
            'buckets': '/api/v2/buckets?org=my-org',
            'dashboards': '/api/v2/dashboards?org=my-org',
            'labels': '/api/v2/orgs/a3012d3fefd8cacb/labels',
            'members': '/api/v2/orgs/a3012d3fefd8cacb/members',
            'owners': '/api/v2/orgs/a3012d3fefd8cacb/owners',
            'secrets': '/api/v2/orgs/a3012d3fefd8cacb/secrets',
            'tasks': '/api/v2/tasks?org=my-org'},
  'name': 'my-org',
  'status': 'active',
  'updated_at': datetime.datetime(2021, 4, 29, 4, 39, 12, 12543, tzinfo=tzutc())}]

In [257]:

buck.create_bucket(bucket_name='test',org_id="a3012d3fefd8cacb")

{'created_at': datetime.datetime(2021, 10, 6, 9, 28, 55, 728822, tzinfo=tzutc()),
 'description': None,
 'id': 'df17bea4ea61514e',
 'labels': [],
 'links': {'_self': '/api/v2/buckets/df17bea4ea61514e',
           'labels': '/api/v2/buckets/df17bea4ea61514e/labels',
           'members': '/api/v2/buckets/df17bea4ea61514e/members',
           'org': '/api/v2/orgs/a3012d3fefd8cacb',
           'owners': '/api/v2/buckets/df17bea4ea61514e/owners',
           'write': '/api/v2/write?org=a3012d3fefd8cacb&bucket=df17bea4ea61514e'},
 'name': 'test',
 'org_id': 'a3012d3fefd8cacb',
 'retention_rules': [{'every_seconds': 0,
                      'shard_group_duration_seconds': 604800,
                      'type': 'expire'}],
 'rp': None,
 'type': 'user',
 'updated_at': datetime.datetime(2021, 10, 6, 9, 28, 55, 728843, tzinfo=tzutc())}

In [255]:
buck.find_buckets()

{'buckets': [{'created_at': datetime.datetime(2021, 5, 2, 5, 46, 43, 442760, tzinfo=tzutc()),
              'description': None,
              'id': '0c85863de71971cd',
              'labels': [],
              'links': {'_self': '/api/v2/buckets/0c85863de71971cd',
                        'labels': '/api/v2/buckets/0c85863de71971cd/labels',
                        'members': '/api/v2/buckets/0c85863de71971cd/members',
                        'org': '/api/v2/orgs/a3012d3fefd8cacb',
                        'owners': '/api/v2/buckets/0c85863de71971cd/owners',
                        'write': '/api/v2/write?org=a3012d3fefd8cacb&bucket=0c85863de71971cd'},
              'name': 'tscookbook',
              'org_id': 'a3012d3fefd8cacb',
              'retention_rules': [{'every_seconds': 0,
                                   'shard_group_duration_seconds': 604800,
                                   'type': 'expire'}],
              'rp': None,
              'type': 'user',
              'updat