# Chapter 5
## Persisting Time Series Data to Databases

## Technical Requirements

In [2]:
# !conda install -c conda-forge yfinance -y

In [1]:
import yfinance as yf
import pandas as pd

amzn = yf.Ticker("AMZN")
amzn_hist = amzn.history(start="2019-01-01", end="2023-12-31")
amzn_hist.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1258 entries, 2019-01-02 00:00:00-05:00 to 2023-12-29 00:00:00-05:00
Data columns (total 7 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   Open          1258 non-null   float64
 1   High          1258 non-null   float64
 2   Low           1258 non-null   float64
 3   Close         1258 non-null   float64
 4   Volume        1258 non-null   int64  
 5   Dividends     1258 non-null   float64
 6   Stock Splits  1258 non-null   float64
dtypes: float64(6), int64(1)
memory usage: 78.6 KB


In [2]:
amzn_hist.index = amzn_hist.index.tz_localize(None).normalize()
amzn_hist = amzn_hist[['Open', 'High', 'Low', 'Close', 'Volume']]

In [3]:
amzn_hist.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1258 entries, 2019-01-02 to 2023-12-29
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   Open    1258 non-null   float64
 1   High    1258 non-null   float64
 2   Low     1258 non-null   float64
 3   Close   1258 non-null   float64
 4   Volume  1258 non-null   int64  
dtypes: float64(4), int64(1)
memory usage: 59.0 KB


In [4]:
print(amzn_hist)

                  Open        High         Low       Close     Volume
Date                                                                 
2019-01-02   73.260002   77.667999   73.046501   76.956497  159662000
2019-01-03   76.000504   76.900002   74.855499   75.014000  139512000
2019-01-04   76.500000   79.699997   75.915497   78.769501  183652000
2019-01-07   80.115501   81.727997   79.459503   81.475502  159864000
2019-01-08   83.234497   83.830498   80.830498   82.829002  177628000
...                ...         ...         ...         ...        ...
2023-12-22  153.770004  154.350006  152.710007  153.419998   29514100
2023-12-26  153.559998  153.979996  153.029999  153.410004   25067200
2023-12-27  153.559998  154.779999  153.119995  153.339996   31434700
2023-12-28  153.720001  154.080002  152.949997  153.380005   27057000
2023-12-29  153.100006  153.889999  151.029999  151.940002   39823200

[1258 rows x 5 columns]


### get_stock_data() function

In [20]:
import yfinance as yf
import pandas as pd

def get_stock_data(ticker, start, end=None):
    """Fetch stock data and return cleaned DataFrame"""
    stock_data = yf.Ticker(ticker)
    hist = stock_data.history(start=start, end=end)

    # Standardize date format and select key columns
    hist.index = hist.index.tz_localize(None).normalize()
    return hist[['Open', 'High', 'Low', 'Close', 'Volume']]
    

In [21]:
amzn_hist = get_stock_data('AMZN', '2019-01-01', '2023-12-31')
amzn_hist.head()

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2019-01-02,73.260002,77.667999,73.046501,76.956497,159662000
2019-01-03,76.000504,76.900002,74.855499,75.014,139512000
2019-01-04,76.5,79.699997,75.915497,78.769501,183652000
2019-01-07,80.115501,81.727997,79.459503,81.475502,159864000
2019-01-08,83.234497,83.830498,80.830498,82.829002,177628000


# Recipe 1: Writing Data to Relational Databases

In the *Reading data from relational database* recipe in **Chapter 3**, *Reading Time Series Data from Databases*, you installed `sqlalchemy` and `psycopg` for the read engine. For this recipe, you will be using these two libraries again.  


## Writing Data to PostgreSQL

In [7]:
#!conda install sqlalchemy psycopg -y

In [8]:
# import configparser
# config = configparser.ConfigParser()
# config.read('database.cfg')

# params = dict(config['POSTGRESQL'])
import pandas as pd
import sqlalchemy
import psycopg

print(f"""
{pd.__version__}
{sqlalchemy.__version__}
{psycopg.__version__}
""")


2.3.3
2.0.43
3.2.10



In [9]:
amzn_hist = get_stock_data('AMZN', '2019-01-01', '2023-12-31')
print(amzn_hist.head())

                 Open       High        Low      Close     Volume
Date                                                             
2019-01-02  73.260002  77.667999  73.046501  76.956497  159662000
2019-01-03  76.000504  76.900002  74.855499  75.014000  139512000
2019-01-04  76.500000  79.699997  75.915497  78.769501  183652000
2019-01-07  80.115501  81.727997  79.459503  81.475502  159864000
2019-01-08  83.234497  83.830498  80.830498  82.829002  177628000


In [10]:
amzn_hist.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1258 entries, 2019-01-02 to 2023-12-29
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   Open    1258 non-null   float64
 1   High    1258 non-null   float64
 2   Low     1258 non-null   float64
 3   Close   1258 non-null   float64
 4   Volume  1258 non-null   int64  
dtypes: float64(4), int64(1)
memory usage: 59.0 KB


In [11]:
amzn_hist.reset_index().dtypes

Date      datetime64[ns]
Open             float64
High             float64
Low              float64
Close            float64
Volume             int64
dtype: object

In [12]:
from sqlalchemy import create_engine, URL
from configparser import ConfigParser

config = ConfigParser()
config.read('database.cfg')
params = dict(config['POSTGRESQL'])
params

{'host': '127.0.0.1',
 'username': 'postgres',
 'password': 'password',
 'port': '5432'}

In [13]:
url = URL.create('postgresql+psycopg', **params)
print(url)

postgresql+psycopg://postgres:***@127.0.0.1:5432


In [14]:
URL

sqlalchemy.engine.url.URL

In [15]:
# conn = engine.connect()
# cursor = conn.execute('select * from MSFT;')
# cursor.fetchone()
engine = create_engine(url)
print(engine)

Engine(postgresql+psycopg://postgres:***@127.0.0.1:5432)


In [16]:
amzn_hist.shape

(1258, 5)

In [17]:
amzn_hist.to_sql('amzn',
            engine,
            index=True,
            index_label='Date',
            if_exists='replace')

-1

In [18]:
with engine.connect() as connection:
    amzn_hist.to_sql('amzn',
                    connection,
                    index=True,
                    index_label='Date',
                    if_exists='replace')


In [19]:
amzn_hist.shape

(1258, 5)

In [20]:
# from sqlalchemy import text

# with engine.connect() as connection:
#     result = connection.execute(text("select username from users"))
#     for row in result:
#         print("username:", row.username)

from sqlalchemy import text

query = """
SELECT EXISTS (
   SELECT FROM information_schema.tables 
   WHERE  table_schema = 'public'
   AND    table_name   = 'amzn'
   );"""

with engine.connect() as conn:
    result = conn.execute(text(query))
    print(result.fetchone())

(True,)


In [21]:
query = """
SELECT column_name, data_type 
FROM information_schema.columns
WHERE table_name = 'amzn';
"""
with engine.connect() as conn:
    result = conn.execute(text(query))
    for row in result:
        print(row)

('Date', 'timestamp without time zone')
('Open', 'double precision')
('High', 'double precision')
('Low', 'double precision')
('Close', 'double precision')
('Volume', 'bigint')


In [22]:
text

<function sqlalchemy.sql._elements_constructors.text(text: 'str') -> 'TextClause'>

In [23]:

query = "select count(*) from amzn;"

with engine.connect() as conn:
    result = conn.execute(text(query))
    print(result.fetchone())

(1258,)


In [24]:
amzn_hist_2024 = get_stock_data('AMZN', '2024-01-01', '2024-12-31')
print(amzn_hist_2024.head())

                  Open        High         Low       Close    Volume
Date                                                                
2024-01-02  151.539993  152.380005  148.389999  149.929993  47339400
2024-01-03  149.199997  151.050003  148.330002  148.470001  49425500
2024-01-04  145.589996  147.380005  144.050003  144.570007  56039800
2024-01-05  144.690002  146.589996  144.529999  145.240005  45153100
2024-01-08  146.740005  149.399994  146.149994  149.100006  46757100


In [25]:
print(amzn_hist_2024.shape)

(251, 5)


In [26]:
amzn_hist.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1258 entries, 2019-01-02 to 2023-12-29
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   Open    1258 non-null   float64
 1   High    1258 non-null   float64
 2   Low     1258 non-null   float64
 3   Close   1258 non-null   float64
 4   Volume  1258 non-null   int64  
dtypes: float64(4), int64(1)
memory usage: 59.0 KB


In [27]:
amzn_hist_2024.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 251 entries, 2024-01-02 to 2024-12-30
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   Open    251 non-null    float64
 1   High    251 non-null    float64
 2   Low     251 non-null    float64
 3   Close   251 non-null    float64
 4   Volume  251 non-null    int64  
dtypes: float64(4), int64(1)
memory usage: 11.8 KB


In [28]:
with engine.connect() as connection:
    amzn_hist_2024.to_sql('amzn',
                    connection,
                    index=True,
                    index_label='Date',
                    if_exists='append')

In [29]:
query = "select count(*) from amzn;"

with engine.connect() as conn:
    result = conn.execute(text(query))
    print(result.fetchone())

(1509,)


**Chunking**

In [30]:
with engine.connect() as connection:
    amzn_hist.to_sql('amzn',
                    connection,
                    chunksize=500,
                    if_exists='append')

## There is more

### Writing to AWS Redshift SQLAlchemy

In [135]:
# !conda install -c conda-forge psycopg2 sqlalchemy-redshift -y

In [137]:
# !pip install redshift_connector -q
# !pip install sqlalchemy-redshift -q
# !pip install psycopg2-binary -q
#!pip install sqlalchemy --force

In [32]:
import pandas as pd
import sqlalchemy
import psycopg2

print(f"""
{pd.__version__}
{sqlalchemy.__version__}
{psycopg2.__version__}
""")


2.3.3
2.0.43
2.9.10 (dt dec pq3 ext lo64)



In [33]:
from configparser import ConfigParser
config = ConfigParser()
config.read('database.cfg')
config.sections()
params = dict(config['AWS'])
#params

In [34]:
from sqlalchemy import create_engine, URL

url = URL.create('redshift+psycopg2', **params)
url

redshift+psycopg2://awsuser:***@redshift-cluster-1.cltc17lacqp7.us-east-1.redshift.amazonaws.com:5439/dev

In [36]:
aws_engine = create_engine(url)
aws_engine

  aws_engine = create_engine(url)


Engine(redshift+psycopg2://awsuser:***@redshift-cluster-1.cltc17lacqp7.us-east-1.redshift.amazonaws.com:5439/dev)

In [3]:
import yfinance as yf

amzn = yf.Ticker("AMZN")
amzn_hist = amzn.history(period="5y")
amzn_hist = amzn_hist[['Open', 'High', 'Low', 'Close', 'Volume']]
amzn_hist.head()

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2020-10-07 00:00:00-04:00,156.75,160.0,156.619507,159.7845,86188000
2020-10-08 00:00:00-04:00,161.249496,161.664505,158.749496,159.527496,63482000
2020-10-09 00:00:00-04:00,160.5,164.449493,159.891495,164.332504,98158000
2020-10-12 00:00:00-04:00,167.496994,174.811996,166.977493,172.1465,167284000
2020-10-13 00:00:00-04:00,173.399506,174.619003,171.210999,172.181503,114894000


In [4]:
amzn_hist.tail()

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2025-10-01 00:00:00-04:00,217.360001,222.149994,216.610001,220.630005,43933800
2025-10-02 00:00:00-04:00,221.009995,222.809998,218.949997,222.410004,41258600
2025-10-03 00:00:00-04:00,223.440002,224.199997,219.339996,219.509995,43639000
2025-10-06 00:00:00-04:00,221.0,221.729996,216.029999,220.899994,43599500
2025-10-07 00:00:00-04:00,220.880005,222.889999,220.169998,220.610001,17727244


In [5]:
amzn_hist.shape

(1256, 5)

In [6]:
amzn_hist = amzn_hist.reset_index()
amzn_hist.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume
0,2020-10-07 00:00:00-04:00,156.75,160.0,156.619507,159.7845,86188000
1,2020-10-08 00:00:00-04:00,161.249496,161.664505,158.749496,159.527496,63482000
2,2020-10-09 00:00:00-04:00,160.5,164.449493,159.891495,164.332504,98158000
3,2020-10-12 00:00:00-04:00,167.496994,174.811996,166.977493,172.1465,167284000
4,2020-10-13 00:00:00-04:00,173.399506,174.619003,171.210999,172.181503,114894000


In [41]:
amzn_hist.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1256 entries, 0 to 1255
Data columns (total 6 columns):
 #   Column  Non-Null Count  Dtype                           
---  ------  --------------  -----                           
 0   Date    1256 non-null   datetime64[ns, America/New_York]
 1   Open    1256 non-null   float64                         
 2   High    1256 non-null   float64                         
 3   Low     1256 non-null   float64                         
 4   Close   1256 non-null   float64                         
 5   Volume  1256 non-null   int64                           
dtypes: datetime64[ns, America/New_York](1), float64(4), int64(1)
memory usage: 59.0 KB


In [42]:
with aws_engine.connect() as conn:
    print("Connection successful")

Connection successful


In [43]:
from sqlalchemy import text

with aws_engine.connect() as conn:
    # Manually drop table using Redshift-compatible syntax
    conn.execute(text("DROP TABLE IF EXISTS public.amzn CASCADE"))
    conn.commit()
    
    # Now create fresh table
    amzn_hist.to_sql('amzn', 
                    con=conn, 
                    schema='public',
                    index=False,
                    if_exists='replace')

In [44]:
from sqlalchemy import text

query = "select count(*) from amzn;"
with aws_engine.connect() as conn:
    result = conn.execute(text(query))
    print(result.fetchone())

(1256,)


### Writing to AWS Redshift using Redshift_Connector 

In [45]:
import redshift_connector

from configparser import ConfigParser
config = ConfigParser()
config.read('database.cfg')
config.sections()
params2 = dict(config['AWS2'])

conn = redshift_connector.connect(**params2)

In [46]:
cursor = conn.cursor()
cursor.write_dataframe(amzn_hist, 'amzn')
conn.commit()

In [47]:
conn.close()

In [48]:
from sqlalchemy import text

query = "select count(*) from amzn;"
with aws_engine.connect() as conn:
    result = conn.execute(text(query))
    print(result.fetchone())

(2512,)


### Writing to AWS Redshift using AWSWrangler

In [50]:
#!pip install 'awswrangler[redshift]' -q

In [2]:
# note here index=False because 
import redshift_connector
import awswrangler as wr
from configparser import ConfigParser

wr.__version__

'3.13.0'

In [7]:
config = ConfigParser()
config.read('database.cfg')
config.sections()
params2 = dict(config['AWS2'])

In [10]:
conn = redshift_connector.connect(**params2)
wr.redshift.to_sql(
    df=amzn_hist,
    table='amzn',
    schema='public',
    con=conn,
    mode='overwrite'
)

In [None]:
conn = redshift_connector.connect(**params)
wr.redshift.to_sql(
    df=amzn_hist,
    table='amzn',
    schema='public',
    con=conn,
    mode='overwrite'
)

# Recipe 2: Storing Data to MongoDB

In the *Reading data from a document database* recipe in **Chapter 3**, *Reading Time Series Data from Databases*, we installed `pymongo`. For this recipe, you will be using that same
library again.

* To install using Conda, run the following:

```
conda install -c anaconda pymongo -y
```

* To install using pip, run the following:

```
python -m pip install pymongo
```

In [16]:
import pandas as pd
from pymongo import MongoClient

In [17]:
client = MongoClient('mongodb://localhost:27017',
                    username='admin',
                    password='password')

In [18]:
db = client['stock_data']

In [19]:
ts = db.create_collection(
    name="daily_data",
    timeseries={
        "timeField": "Date",
        "metaField": "symbol",
        "granularity": "hours"
    }
)


In [22]:
amzn_hist = get_stock_data('AMZN', '2019-01-01', '2024-12-31')
amzn_hist.head()

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2019-01-02,73.260002,77.667999,73.046501,76.956497,159662000
2019-01-03,76.000504,76.900002,74.855499,75.014,139512000
2019-01-04,76.5,79.699997,75.915497,78.769501,183652000
2019-01-07,80.115501,81.727997,79.459503,81.475502,159864000
2019-01-08,83.234497,83.830498,80.830498,82.829002,177628000


In [23]:
amzn_hist['symbol'] = "AMZN"
amzn_hist = amzn_hist.reset_index()
amzn_hist['Date'] = pd.to_datetime(amzn_hist['Date'])
amzn_records = amzn_hist.to_dict(orient='records')

amzn_records[0:2]

[{'Date': Timestamp('2019-01-02 00:00:00'),
  'Open': 73.26000213623047,
  'High': 77.66799926757812,
  'Low': 73.04650115966797,
  'Close': 76.95649719238281,
  'Volume': 159662000,
  'symbol': 'AMZN'},
 {'Date': Timestamp('2019-01-03 00:00:00'),
  'Open': 76.00050354003906,
  'High': 76.9000015258789,
  'Low': 74.85549926757812,
  'Close': 75.01399993896484,
  'Volume': 139512000,
  'symbol': 'AMZN'}]

In [24]:
len(amzn_records)

1509

In [25]:
amzn_hist = amzn_hist.reset_index()
amzn_records = []
for idx, row in amzn_hist.iterrows():
    doc = {
        "Date": pd.to_datetime(row['Date']),
        "symbol": "AMZN", 
        "High": row['High'],
        "Low": row['Low'],
        "Close": row['Close'],
        "Open": row['Open'],
        "Volume": row['Volume']
    }
    amzn_records.append(doc)
amzn_records[0:2]

[{'Date': Timestamp('2019-01-02 00:00:00'),
  'symbol': 'AMZN',
  'High': 77.66799926757812,
  'Low': 73.04650115966797,
  'Close': 76.95649719238281,
  'Open': 73.26000213623047,
  'Volume': 159662000},
 {'Date': Timestamp('2019-01-03 00:00:00'),
  'symbol': 'AMZN',
  'High': 76.9000015258789,
  'Low': 74.85549926757812,
  'Close': 75.01399993896484,
  'Open': 76.00050354003906,
  'Volume': 139512000}]

In [26]:
len(amzn_records)

1509

In [27]:
result = ts.insert_many(amzn_records)

In [28]:
result.acknowledged

True

In [29]:
client.list_database_names()

['admin', 'config', 'local', 'stock_data']

In [30]:
db.list_collection_names()

['daily_data', 'system.buckets.daily_data', 'system.views']

In [31]:
msft_hist = get_stock_data('MSFT', '2019-01-01', '2024-12-31')

msft_hist['symbol'] = "MSFT"
msft_hist = msft_hist.reset_index()
msft_hist['Date'] = pd.to_datetime(msft_hist['Date'])
msft_records = msft_hist.to_dict(orient='records')

result = ts.insert_many(msft_records)

In [32]:
ts.count_documents({})

3018

In [34]:
db.list_collection_names()

['daily_data', 'system.buckets.daily_data', 'system.views']

In [35]:
ts.find_one()

{'Date': datetime.datetime(2019, 1, 2, 0, 0),
 'symbol': 'AMZN',
 '_id': ObjectId('68e548d2b38011f8ebb22938'),
 'Close': 76.95649719238281,
 'High': 77.66799926757812,
 'Low': 73.04650115966797,
 'Open': 73.26000213623047,
 'Volume': 159662000}

In [36]:
# filter documents that are greater than August 1, 2020
# and retrieve the first record
import datetime
ts.find_one({'Date': {'$gt': datetime.datetime(2020, 8,1)}})

{'Date': datetime.datetime(2020, 8, 3, 0, 0),
 'symbol': 'AMZN',
 'High': 159.1999969482422,
 'Low': 155.1999969482422,
 'Open': 159.02549743652344,
 'Volume': 101494000,
 '_id': ObjectId('68e548d2b38011f8ebb22ac7'),
 'Close': 155.59449768066406}

In [37]:
ts.count_documents({})

3018

In [38]:
from datetime import datetime

# Define date range (adjust dates as needed)
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 1, 31)

# Query for MSFT stock data within the date range
results = ts.find({
    "symbol": "MSFT",
    "Date": {"$gte": start_date, "$lte": end_date}
})

msft_df = (pd.DataFrame(results)
             .set_index('Date')
             .drop(columns=['_id', 'symbol']))
print(msft_df.head())

                  High         Low        Open    Volume       Close
Date                                                                
2024-01-02  371.070940  362.058226  369.057139  25258600  366.105560
2024-01-03  368.464820  363.775843  364.269419  23083500  365.838989
2024-01-04  368.306880  362.453068  365.908104  20901500  363.213165
2024-01-05  367.280232  361.791663  364.229932  21004600  363.025604
2024-01-08  370.379953  364.269471  364.555724  23134000  369.876495


In [39]:
msft_avg_close = ts.aggregate([
    {"$group": 
         {"_id": "$symbol", 
          "avgClose": 
                  {"$avg": "$Close"}}
    }
])

for doc in msft_avg_close:
    print(doc)

{'_id': 'MSFT', 'avgClose': 260.00717730361}
{'_id': 'AMZN', 'avgClose': 137.1042866286574}


In [40]:
type(result)

pymongo.results.InsertManyResult

### InsertOneResult

In [41]:
one_record = amzn_records[0]
one_record

{'Date': Timestamp('2019-01-02 00:00:00'),
 'symbol': 'AMZN',
 'High': 77.66799926757812,
 'Low': 73.04650115966797,
 'Close': 76.95649719238281,
 'Open': 73.26000213623047,
 'Volume': 159662000,
 '_id': ObjectId('68e548d2b38011f8ebb22938')}

In [42]:
result_id = ts.insert_one(one_record)

In [43]:
result_id

InsertOneResult(ObjectId('68e548d2b38011f8ebb22938'), acknowledged=True)

In [44]:
result_id.inserted_id

ObjectId('68e548d2b38011f8ebb22938')

## There is more

In [45]:
amzn = yf.Ticker("AMZN")
amzn_hist = amzn.history(period="5y")
amzn_hist = amzn_hist[['Open', 
                       'High', 
                       'Low', 
                       'Close', 
                       'Volume']].reset_index()
amzn_hist['Date'] = pd.to_datetime(amzn_hist['Date'])
amzn_hist.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume
0,2020-10-07 00:00:00-04:00,156.75,160.0,156.619507,159.7845,86188000
1,2020-10-08 00:00:00-04:00,161.249496,161.664505,158.749496,159.527496,63482000
2,2020-10-09 00:00:00-04:00,160.5,164.449493,159.891495,164.332504,98158000
3,2020-10-12 00:00:00-04:00,167.496994,174.811996,166.977493,172.1465,167284000
4,2020-10-13 00:00:00-04:00,173.399506,174.619003,171.210999,172.181503,114894000


### MongoDB Bucketing
* Bucketing strategy page 135

In [46]:
db = client['stock_data']
db.list_collection_names()

['daily_data', 'system.buckets.daily_data', 'system.views']

In [47]:
db = client['stock_data']
bucket = db.create_collection(name='stock_bucket')

In [48]:
bucket = db["stock_bucket"]

In [49]:
amzn_hist = get_stock_data('AMZN', start='2019-01-01', end='2025-03-31')
amzn_hist = amzn_hist[['Open', 
                       'High', 
                       'Low', 
                       'Close', 
                       'Volume']].reset_index()

amzn_hist['month'] = amzn_hist['Date'].dt.month
amzn_hist['year'] = amzn_hist['Date'].dt.year
print(amzn_hist.tail())

           Date        Open        High         Low       Close    Volume  \
1564 2025-03-24  200.000000  203.639999  199.949997  203.259995  41625400   
1565 2025-03-25  203.600006  206.210007  203.220001  205.710007  31171200   
1566 2025-03-26  205.839996  206.009995  199.929993  201.130005  32855300   
1567 2025-03-27  200.889999  203.789993  199.279999  201.360001  27317700   
1568 2025-03-28  198.419998  199.259995  191.880005  192.720001  52548200   

      month  year  
1564      3  2025  
1565      3  2025  
1566      3  2025  
1567      3  2025  
1568      3  2025  


In [50]:
print(amzn_hist['month'].unique())
print(amzn_hist['year'].unique())

[ 1  2  3  4  5  6  7  8  9 10 11 12]
[2019 2020 2021 2022 2023 2024 2025]


In [51]:
len(amzn_hist.groupby(['year', 'month']))

75

In [52]:
# number of unique year/month combinations in the data
unique_combinations = amzn_hist.groupby(['year', 'month'])
print(f"Buckets to create: {len(unique_combinations)}")

Buckets to create: 75


In [53]:
# Group by year and month, then process each group
for (year, month), group_data in amzn_hist.groupby(['year', 'month']):
    record = {
        'month': int(month),
        'year': int(year),
        'symbol': 'AMZN',
        'price': [float(price) for price in group_data['Close'].values]
    }
    bucket.insert_one(record)

In [54]:
bucket.count_documents({})

75

In [55]:
print('without bucketing: ', 
      db.daily_data.count_documents({}))
print('with bucketing: ', 
      db.stock_bucket.count_documents({}))

without bucketing:  3019
with bucketing:  75


In [56]:
results = pd.DataFrame(bucket.find({'year':2025, 'month': 1}))
results['price'].to_dict()[0]

[220.22000122070312,
 224.19000244140625,
 227.61000061035156,
 222.11000061035156,
 222.1300048828125,
 218.94000244140625,
 218.4600067138672,
 217.75999450683594,
 223.35000610351562,
 220.66000366210938,
 225.94000244140625,
 230.7100067138672,
 235.00999450683594,
 235.4199981689453,
 234.85000610351562,
 235.4199981689453,
 238.14999389648438,
 237.07000732421875,
 234.63999938964844,
 237.67999267578125]

In [57]:
len(bucket.find_one({'month': 1, 'year':2025})['price'])

20

In [58]:
db['daily_data'].find_one({})

{'Date': datetime.datetime(2019, 1, 2, 0, 0),
 'symbol': 'AMZN',
 '_id': ObjectId('68e548d2b38011f8ebb22938'),
 'Close': 76.95649719238281,
 'High': 77.66799926757812,
 'Low': 73.04650115966797,
 'Open': 73.26000213623047,
 'Volume': 159662000}

# Recipe 3: Storing Data to Time Series Database (InfluxDB)

In [59]:
from influxdb_client_3 import InfluxDBClient3

import pandas as pd
from  pathlib import Path

In [60]:
path = Path('../../datasets/Ch5/ExtraSensory/')

In [61]:
file = '0A986513-7828-4D53-AA1F-E02D6DF9561B.features_labels.csv.gz'

In [62]:
columns = ['timestamp',
           'watch_acceleration:magnitude_stats:mean']

df = pd.read_csv(path.joinpath(file),
                usecols=columns,
                compression='gzip')
df = df.bfill()
df.columns = ['timestamp','wacc']
df.shape

(3960, 2)

In [63]:
df['timestamp'] = pd.to_datetime(df['timestamp'],
                                  origin='unix',
                                  unit='s',
                                  utc=True)


In [64]:
print(df.head())

                  timestamp         wacc
0 2015-12-08 19:06:37+00:00   995.369977
1 2015-12-08 19:07:37+00:00   995.369977
2 2015-12-08 19:08:37+00:00   995.369977
3 2015-12-08 19:09:37+00:00   996.406005
4 2015-12-08 19:10:55+00:00  1034.180063


In [65]:
df.set_index('timestamp', inplace=True)

In [66]:
print(df.head())

                                  wacc
timestamp                             
2015-12-08 19:06:37+00:00   995.369977
2015-12-08 19:07:37+00:00   995.369977
2015-12-08 19:08:37+00:00   995.369977
2015-12-08 19:09:37+00:00   996.406005
2015-12-08 19:10:55+00:00  1034.180063


In [67]:
n = len(df)
half = n // 2

batch1 = df.iloc[:half, :].copy()
batch2 = df.iloc[half:, :].copy()


In [68]:
token= 'apiv3_Ee_uB-Z50N3NjVHS65kurr-11I1v8SRyd2Fe_-rriqlkdr4vXSA-H5BoiY_z8HOj83cGRVG0dFf_Sb4hAYhOWQ'

client = InfluxDBClient3(host="http://localhost:8181",
                         token=token)

In [69]:
client

<influxdb_client_3.InfluxDBClient3 at 0x147001be0>

In [70]:
client.write(batch1, 
            database='extrasensory',
            data_frame_measurement_name='watch_acc')

In [71]:
query = "SELECT * FROM watch_acc"

sensor_df = client.query(database='extrasensory', 
                         query=query, 
                         language="sql", 
                         mode='pandas')
print(sensor_df.shape)

(3960, 2)


In [72]:
client.write(batch2,
             database='extrasensory',
             data_frame_measurement_name="watch_acc")

In [73]:
query = "SELECT * FROM watch_acc"

sensor_df = client.query(database='extrasensory', 
                         query=query, 
                         language="sql", 
                         mode='pandas')
print(sensor_df.shape)

(3960, 2)


In [74]:
print(sensor_df.head())


                 time         wacc
0 2015-12-08 19:06:37   995.369977
1 2015-12-08 19:07:37   995.369977
2 2015-12-08 19:08:37   995.369977
3 2015-12-08 19:09:37   996.406005
4 2015-12-08 19:10:55  1034.180063


In [75]:
sensor_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3960 entries, 0 to 3959
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   time    3960 non-null   datetime64[ns]
 1   wacc    3960 non-null   float64       
dtypes: datetime64[ns](1), float64(1)
memory usage: 62.0 KB


In [76]:
client.close()

In [77]:
?client.write

[31mSignature:[39m client.write(record=[38;5;28;01mNone[39;00m, database=[38;5;28;01mNone[39;00m, **kwargs)
[31mDocstring:[39m
Write data to InfluxDB.

:param record: The data point(s) to write.
:type record: object or list of objects
:param database: The database to write to. If not provided, uses the database provided during initialization.
:type database: str
:param kwargs: Additional arguments to pass to the write API.
[31mFile:[39m      ~/Repos/MyWork/Write/Time-Series-Analysis-with-Python-Cookbook-Second-Edition/code/Ch5/dev1/lib/python3.13/site-packages/influxdb_client_3/__init__.py
[31mType:[39m      method

In [42]:
?client.write_file

[31mSignature:[39m
client.write_file(
    file,
    measurement_name=[38;5;28;01mNone[39;00m,
    tag_columns=[38;5;28;01mNone[39;00m,
    timestamp_column=[33m'time'[39m,
    database=[38;5;28;01mNone[39;00m,
    file_parser_options=[38;5;28;01mNone[39;00m,
    **kwargs,
)
[31mDocstring:[39m
Write data from a file to InfluxDB.

:param file: The file to write.
:type file: str
:param measurement_name: The name of the measurement.
:type measurement_name: str
:param tag_columns: Tag columns.
:type tag_columns: list
:param timestamp_column: Timestamp column name. Defaults to 'time'.
:type timestamp_column: str
:param database: The database to write to. If not provided, uses the database provided during initialization.
:type database: str
:param file_parser_options: Function for providing additional arguments for the file parser.
:type file_parser_options: callable
:param kwargs: Additional arguments to pass to the write API.
[31mFile:[39m      /opt/anaconda3/envs/dbs/lib/py

### There is more

In [78]:
client = InfluxDBClient3(host="http://localhost:8181",
                         token=token)

In [79]:
client._write_api.write(bucket='extrasensory', 
                        record=df, 
                        data_frame_measurement_name='wacc')

In [80]:
query = "SELECT * FROM wacc"

sensor_df = client.query(database='extrasensory', 
                         query=query, 
                         language="sql",
                        mode='pandas')
sensor_df.shape

(3960, 2)

In [81]:
sensor_df.head()

Unnamed: 0,time,wacc
0,2015-12-08 19:06:37,995.369977
1,2015-12-08 19:07:37,995.369977
2,2015-12-08 19:08:37,995.369977
3,2015-12-08 19:09:37,996.406005
4,2015-12-08 19:10:55,1034.180063


**With Clause**

In [82]:
with InfluxDBClient3(
    token=token,
    host="http://localhost:8181",
    database="sensor") as client:

    client.write(
        df,
        data_frame_measurement_name="wacc")


In [83]:
client = InfluxDBClient3(host="http://localhost:8181",
                         database="sensor",
                         token=token)

query = "SELECT * FROM wacc"

sensor_df = client.query(query=query, language="sql", mode='pandas')
print(sensor_df.head())

                 time         wacc
0 2015-12-08 19:06:37   995.369977
1 2015-12-08 19:07:37   995.369977
2 2015-12-08 19:08:37   995.369977
3 2015-12-08 19:09:37   996.406005
4 2015-12-08 19:10:55  1034.180063


# Recipe 4: Storing Data in Snowflake

To connect to Snowflake, you will need to install the Snowflake Python connector.

* To install using `Conda`, run the following:

```
conda install -c conda-forge snowflake-sqlalchemy snowflake-connector-python
```
* To install using `pip`, run the following:

```
pip install "snowflake-connector-python[pandas]"
pip install --upgrade snowflake-sqlalchemy
```

### Using `write_pandas`

In [89]:
import pandas as pd
from snowflake.connector.pandas_tools import pd_writer
from configparser import ConfigParser
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine


config = ConfigParser()
config.read('database.cfg')
config.sections()
params = dict(config['SNOWFLAKE'])


In [90]:
amzn_hist = get_stock_data('AMZN', '2019-01-01', '2025-03-31')
amzn_hist.head()

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2019-01-02,73.260002,77.667999,73.046501,76.956497,159662000
2019-01-03,76.000504,76.900002,74.855499,75.014,139512000
2019-01-04,76.5,79.699997,75.915497,78.769501,183652000
2019-01-07,80.115501,81.727997,79.459503,81.475502,159864000
2019-01-08,83.234497,83.830498,80.830498,82.829002,177628000


In [91]:
amzn_hist = amzn_hist.reset_index()

In [92]:
amzn_hist.shape

(1569, 6)

In [93]:
from snowflake import connector
from snowflake.connector.pandas_tools import pd_writer, write_pandas

con = connector.connect(**params)
cursor = con.cursor()

success, nchunks, nrows, copy_into = write_pandas(
                                            con, 
                                            amzn_hist, 
                                            auto_create_table=True,
                                            table_name='AMAZON', 
                                            table_type='temporary',
                                            overwrite=True)


In [94]:
print('success: ', success)
print('number of chunks: ', nchunks)
print('number of rows: ', nrows)
print('COPY INTO output', copy_into)

success:  True
number of chunks:  1
number of rows:  1569
COPY INTO output [('snowpark_temp_stage_rdq85ilzyo/file0.txt', 'LOADED', 1569, 1569, 1, 0, None, None, None, None)]


In [95]:
cursor.execute('SELECT count(*) FROM AMAZON;')
count = cursor.fetchone()[0]
print(count)

1569


## Using SQL ALchemy 

In [96]:
import pandas as pd
from snowflake.connector.pandas_tools import pd_writer
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine


In [97]:
url = URL(**params)
engine = create_engine(url)

In [98]:
amzn_hist.to_sql(
    'amazon',
    engine,
    index=False,
    if_exists='replace'
)

1569

In [99]:
%%time
try:
    amzn_hist.to_sql(
    'amazon_alchemy',
    engine,
    index=False,
    if_exists='replace'
)
except:
    print('failed to write')

CPU times: user 92.7 ms, sys: 12 ms, total: 105 ms
Wall time: 11.9 s


In [100]:
pd.read_sql_table('amazon_alchemy', 
                  con=engine).info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1569 entries, 0 to 1568
Data columns (total 6 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   Date    1569 non-null   datetime64[ns]
 1   Open    1569 non-null   float64       
 2   High    1569 non-null   float64       
 3   Low     1569 non-null   float64       
 4   Close   1569 non-null   float64       
 5   Volume  1569 non-null   float64       
dtypes: datetime64[ns](1), float64(5)
memory usage: 73.7 KB


In [101]:
%%time
try:
    amzn_hist.to_sql(
    'amazon_alchemy',
    engine,
    index=False,
    if_exists='replace',
    method=pd_writer
)
except:
    print('failed to write')

CPU times: user 153 ms, sys: 24.6 ms, total: 178 ms
Wall time: 10.3 s


In [102]:
query = 'SELECT * FROM AMAZON;'
try:
    snow_df = pd.read_sql(query, engine, index_col='Date')
    snow_df.info()
except:
    print('failed to query')

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1569 entries, 2019-01-02 to 2025-03-28
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   Open    1569 non-null   float64
 1   High    1569 non-null   float64
 2   Low     1569 non-null   float64
 3   Close   1569 non-null   float64
 4   Volume  1569 non-null   int64  
dtypes: float64(4), int64(1)
memory usage: 73.5 KB


In [103]:
snow_df.head()

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2019-01-02,73.260002,77.667999,73.046501,76.956497,159662000
2019-01-03,76.000504,76.900002,74.855499,75.014,139512000
2019-01-04,76.5,79.699997,75.915497,78.769501,183652000
2019-01-07,80.115501,81.727997,79.459503,81.475502,159864000
2019-01-08,83.234497,83.830498,80.830498,82.829002,177628000


## Using Snowpark

In [104]:
from snowflake.snowpark import Session
import pandas as pd

In [105]:
session = Session.builder.configs(params).create()

In [106]:
amzn_snowpark_df = session.create_dataframe(amzn_hist)

In [107]:
amzn_snowpark_df

<snowflake.snowpark.table.Table at 0x173ef63c0>

In [108]:
# example using overwrite

amzn_snowpark_df.write.mode("overwrite").save_as_table("amazon_snowpark")

In [109]:
amzn_df = session.table("amazon_snowpark")

In [110]:
df = amzn_df.to_pandas()
df.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume
0,2019-01-02,73.260002,77.667999,73.046501,76.956497,159662000
1,2019-01-03,76.000504,76.900002,74.855499,75.014,139512000
2,2019-01-04,76.5,79.699997,75.915497,78.769501,183652000
3,2019-01-07,80.115501,81.727997,79.459503,81.475502,159864000
4,2019-01-08,83.234497,83.830498,80.830498,82.829002,177628000


In [111]:
df.shape

(1569, 6)

## There is more
### Using `write_pandas`

In [112]:

snowpark_df = session.write_pandas(amzn_hist, 
                               table_name="amazon_temp", 
                               auto_create_table=True, 
                               table_type="temp")

In [113]:
snowpark_df

<snowflake.snowpark.table.Table at 0x173f19810>