# 0.2-agifford-TestReadWriteToPostgres
This notebook tests reading and writing to a postgres database and templates out functions and methods to formalize the process for creating features datasets.

In [None]:
from dotenv import find_dotenv, load_dotenv
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Float
from sqlalchemy.orm import sessionmaker

import pandas as pd
import numpy as np
import os
import json
load_dotenv(find_dotenv())

In [None]:
FEATURE_STORE_HOST = os.getenv("FEATURE_STORE_URI", "localhost")
FEATURE_STORE_PORT = os.getenv("FEATURE_STORE_PORT", "5432")
FEATURE_STORE_PW = os.getenv("FEATURE_STORE_PW")

Make connection to database.

In [None]:
DATABASE_URI = f'postgresql+psycopg2://postgres:{FEATURE_STORE_PW}@{FEATURE_STORE_HOST}:{FEATURE_STORE_PORT}/feature_store'
engine = sa.create_engine(
    DATABASE_URI, 
    executemany_mode='values',
    executemany_values_page_size=10000, 
    executemany_batch_page_size=500
)
Base = declarative_base()

In [None]:
def recreate_database():
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)

Create model of table to make/insert into.

In [None]:
# this table model will house ALL data, with a column that specifies whether the data is
# training, validation, or testing
# we can select the appropriate rows of data for modeling by knowing the columns:
# `featurize_id`: the id of the featurization run
# `dataset_group`: training, validation, or testing
class NaiveFreqFeats(Base):
    __tablename__ = 'naive_frequency_features'
    id = Column(Integer, primary_key=True)
    featurize_id = Column(Integer)
    file = Column(String)
    dataset_group = Column(String)
    added_datetime = Column(DateTime)
    window_size = Column(Integer)
    t_index = Column(Integer)
    label = Column(String)
    label_group = Column(String)
    accel_x_0 = Column(Float)
    accel_x_1 = Column(Float)
    accel_x_2 = Column(Float)
    accel_x_4 = Column(Float)
    accel_x_5 = Column(Float)
    accel_x_7 = Column(Float)
    accel_x_9 = Column(Float)
    accel_x_12 = Column(Float)
    accel_x_16 = Column(Float)
    accel_y_0 = Column(Float)
    accel_y_1 = Column(Float)
    accel_y_2 = Column(Float)
    accel_y_3 = Column(Float)
    accel_y_4 = Column(Float)
    accel_y_5 = Column(Float)
    accel_y_6 = Column(Float)
    accel_y_7 = Column(Float)
    accel_y_8 = Column(Float)
    accel_y_9 = Column(Float)
    accel_y_10 = Column(Float)
    accel_y_11 = Column(Float)
    accel_y_12 = Column(Float)
    accel_z_0 = Column(Float)
    accel_z_1 = Column(Float)
    accel_z_2 = Column(Float)
    accel_z_3 = Column(Float)
    accel_z_4 = Column(Float)
    accel_z_5 = Column(Float)
    accel_z_8 = Column(Float)
    accel_z_12 = Column(Float)
    accel_z_14 = Column(Float)
    gyro_x_0 = Column(Float)
    gyro_x_1 = Column(Float)
    gyro_x_2 = Column(Float)
    gyro_x_3 = Column(Float)
    gyro_x_4 = Column(Float)
    gyro_x_5 = Column(Float)
    gyro_x_6 = Column(Float)
    gyro_x_7 = Column(Float)
    gyro_x_8 = Column(Float)
    gyro_x_9 = Column(Float)
    gyro_x_10 = Column(Float)
    gyro_x_11 = Column(Float)
    gyro_x_12 = Column(Float)
    gyro_x_13 = Column(Float)
    gyro_y_0 = Column(Float)
    gyro_y_1 = Column(Float)
    gyro_y_2 = Column(Float)
    gyro_y_3 = Column(Float)
    gyro_y_4 = Column(Float)
    gyro_y_5 = Column(Float)
    gyro_y_6 = Column(Float)
    gyro_y_7 = Column(Float)
    gyro_y_8 = Column(Float)
    gyro_y_10 = Column(Float)
    gyro_y_11 = Column(Float)
    gyro_z_0 = Column(Float)
    gyro_z_1 = Column(Float)
    gyro_z_3 = Column(Float)
    gyro_z_4 = Column(Float)
    gyro_z_5 = Column(Float)
    gyro_z_8 = Column(Float)
    gyro_z_14 = Column(Float)
    gyro_z_17 = Column(Float)
    gyro_z_18 = Column(Float)
    
    def __repr__(self):
        return (
            f"<NaiveFreqFeats(featurize_id='{self.featurize_id}', file='{self.file}', "
            f"dataset_group={self.dataset_group}, added_datetime={self.added_datetime}), "
            f"window_size={self.window_size},  t_index={self.t_index}>"
        )

Make table in database, drop previous version of database if already exists.

In [None]:
# WARNING!: DO NOT RUN THIS FUNCTION IF YOU HAVE FEATURIZED DATA IN YOUR DATABASE THAT
# YOU STILL WANT TO KEEP. THIS FUNCTION WILL DROP THE TABLE AND RECREATE AN EMPTY VERSION
recreate_database()

Get the feature columns by measurement for the naive_frequency_features table

In [None]:
with open("../../src/features/frequency_features.json", "r", encoding="utf-8") as infile:
    FEATURES = json.load(infile)
    
cols = ["accel_x", "accel_y", "accel_z", "gyro_x", "gyro_y", "gyro_z"]
feat_cols = [
col + "_" + str(int(freq_feat)) for col in cols for freq_feat in FEATURES[col]
]

Insert a single row of data into the table.

In [None]:
row = NaiveFreqFeats(
    featurize_id=1,
    file="testing.PARQUET",
    dataset_group="train",
    added_datetime=pd.to_datetime("now", utc=True),
    window_size=151,
    t_index=1,
    label="foo",
    label_group="foo group",
    **{
        col: 1. for col in feat_cols
    }
)

In [None]:
Session = sessionmaker(bind=engine)

with Session() as session:
    session.add(row)
    session.commit()

Check if a particular featurize_id exists in the database

In [None]:
metadata = sa.schema.MetaData(bind=engine)
table = sa.Table("naive_frequency_features", metadata, autoload=True)
with Session() as session:
    featurize_ids = session.query(table.c.featurize_id).distinct().all()


In [None]:
id_to_search = 1
any([r[0]==id_to_search for r in featurize_ids])

Delete existing dataset by matching `featurize_id`.

In [None]:
with Session() as session:
    session.query(table).filter(table.c.featurize_id==1).delete()
    session.commit()
    # session.commit()

In [None]:
with Session() as session:
    featurize_ids = session.query(table.c.featurize_id).distinct().all()
featurize_ids

Delete the test data and recreate the empty table.

In [None]:
# WARNING!: DO NOT RUN THIS FUNCTION IF YOU HAVE FEATURIZED DATA IN YOUR DATABASE THAT
# YOU STILL WANT TO KEEP. THIS FUNCTION WILL DROP THE TABLE AND RECREATE AN EMPTY VERSION
recreate_database()

Run a bulk insert of data into the table.

In [None]:
with Session() as session:
    for chunk in range(0, 100000, 10000):
        session.bulk_save_objects(
            [
                NaiveFreqFeats(
                    featurize_id=1,
                    file="testing.PARQUET",
                    dataset_group="train",
                    added_datetime=pd.to_datetime("now", utc=True),
                    window_size=151,
                    t_index=i,
                    label="foo",
                    label_group="foo group",
                    **{
                        col: np.random.random() for col in feat_cols
                    }
                )
                for i in range(chunk, min(chunk + 10000, 100000))
            ],
            return_defaults=False
        )
    session.commit()

Delete the test data and recreate the empty table.

In [None]:
# WARNING!: DO NOT RUN THIS FUNCTION IF YOU HAVE FEATURIZED DATA IN YOUR DATABASE THAT
# YOU STILL WANT TO KEEP. THIS FUNCTION WILL DROP THE TABLE AND RECREATE AN EMPTY VERSION
recreate_database()

Try bulk insert with data from a pandas dataframe.

In [None]:
metadata = sa.schema.MetaData(bind=engine)
table = sa.Table("naive_frequency_features", metadata, autoload=True)

data = dict(
    featurize_id=[1 for _ in range(10000)],
    file=["testing.PARQUET" for _ in range(10000)],
    dataset_group=["train" for _ in range(10000)],
    added_datetime=[pd.to_datetime("now", utc=True) for _ in range(10000)],
    window_size=[151  for _ in range(10000)],
    t_index=[_  for _ in range(10000)],
    label=["foo" for _ in range(10000)],
    label_group=["foo group" for _ in range(10000)],
    **{
        col: [np.random.random() for _ in range(10000)] for col in feat_cols
    }
)
df = pd.DataFrame(data=data)
records = df.to_dict(orient="records")

with Session() as session:
    session.execute(table.insert(), records)
    session.commit()

Delete the test data and recreate the empty table.

In [None]:
# WARNING!: DO NOT RUN THIS FUNCTION IF YOU HAVE FEATURIZED DATA IN YOUR DATABASE THAT
# YOU STILL WANT TO KEEP. THIS FUNCTION WILL DROP THE TABLE AND RECREATE AN EMPTY VERSION
recreate_database()