In [None]:
!pip install sqlalchemy-aurora-data-api


In [None]:
import logging
import tempfile
import os
import json
from pprint import pprint

import pandas as pd
import boto3
import sagemaker

from sqlalchemy import create_engine
from sqlalchemy import exc


# DAO

In [None]:
def get_engine(db_config, echo=False):
     # require Data Api - https://pypi.org/project/sqlalchemy-aurora-data-api/
    # aws rds modify-db-cluster --db-cluster-identifier DB_CLUSTER_NAME --enable-http-endpoint --apply-immediately
    # may have to run -it twice
    # aws secretsmanager create-secret --name rds-db-credentials/MY_DB
    # aws secretsmanager put-secret-value --secret-id rds-db-credentials/MY_DB --secret-string "$(jq -n '.username=env.PGUSER | .password=env.PGPASSWORD')"
    # aws rds-data execute-statement --resource-arn RESOURCE_ARN --secret-arn SECRET_ARN --sql "select * from pg_catalog.pg_tables"
    # mysql show tables; or  SELECT table_name FROM user_tables;
    # db demo must exists already aws rds-data execute-statement  ...CREATE DATABASE IF NOT EXISTS demo;"

    logger.info('Creating engine')
    engine = create_engine(db_config["db_url"],  # demo if exists
                           echo=echo,
                           connect_args=dict(aurora_cluster_arn=db_config["cluster_arn"], secret_arn=db_config["secret_arn"]))

    logger.info('Engine created')

    #with engine.connect() as conn:
        #for result in conn.execute("CREATE DATABASE IF NOT EXISTS demo;"):
        #    print(result)

    return engine


In [None]:
def store(df, engine, schema=None):
    logger.info('Entering store')

    logger.info('Connecting...')
    #_wait_for_serverless(engine)
    with engine.connect() as conn:
        logger.info('Connecting...')

        logger.info(f'Dumping data {df.shape[0]} rows')
        df.to_sql('transactions', con=conn, schema=schema, if_exists='append')
        # TODO error maagemebt



In [None]:
def query(query):
    sql_df = pd.read_sql(
        query,
        con=engine,
        parse_dates=[
            'created_at',
            'updated_at'
        ]
    )
    return sql_df

In [None]:
def export_to_s3(df, name):
    # create a temporary directory using the context manager
    with tempfile.TemporaryDirectory() as tmpdirname:
        local_path = os.path.join(tmpdirname, f"{name}.parquet.gzip")

        df.to_parquet(local_path, compression='gzip')  

        data_uri = sagemaker.s3.S3Uploader.upload(
            local_path=local_path,
            desired_s3_uri=base_uri,
        )

In [None]:
def read_from_s3(name):
    with tempfile.TemporaryDirectory() as tmpdirname:
        local_path = os.path.join(tmpdirname, f"{name}.parquet.gzip")
        sagemaker.s3.S3Downloader.download(
            f"{base_uri}/{name}.parquet.gzip", local_path
        )
        df = pd.read_parquet(local_path)  
    return df

In [None]:
config_file_name = "../secure.config.json"
with open(config_file_name) as f:
    config = json.load(f)

db_config = config["db"]
pprint(db_config)

In [None]:

logging.basicConfig()
logger = logging.getLogger(__name__)

In [None]:
logger.setLevel(logging.ERROR)
logging.getLogger("sqlalchemy.engine.Engine").setLevel(logging.ERROR)

In [None]:
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name

In [None]:
engine = get_engine(db_config, echo=False)

In [None]:
default_bucket = sagemaker_session.default_bucket()
base_uri = f"s3://{default_bucket}/labbench/seasons"
print(f"{base_uri=}")

# end of initialization

# get data of a day

In [None]:
query("SELECT * FROM transactions where dt like '2023-01-01%'")

# min and max dates

In [None]:
query("SELECT min(dt) FROM transactions")

In [None]:
query("SELECT max(dt) FROM transactions")

# generate days 

In [None]:
start_date = '2022-08-17'
end_date = '2023-01-16'
days = pd.date_range(start_date, end_date, freq='d')

In [None]:
days

In [None]:
#days = ['2022-08-17', '2023-01-16']

In [None]:
for day in days:
    day_filter = str(day)[0:10]
    df_n = query(f"SELECT count(*) FROM transactions where dt like '{day_filter}%'" )
    print(f"{day}: {df_n['count(*)'][0]}")

# export as parquet onto S3 - then read parquet from S3

In [None]:
period = "2022-09-01"
name = f"seasons-{period}"

In [None]:
export_df = query(f"SELECT * FROM transactions where dt like '{period}%'")
export_to_s3(export_df, name)

In [None]:
read_from_s3(name)

# export db day by day

In [None]:
start_date = '2022-08-17'
end_date = '2023-01-16'
days = pd.date_range(start_date, end_date, freq='d')

In [None]:
for day in days:
    day_filter = str(day)[0:10]
    name = f"seasons-{day_filter}"
    export_df = query(f"SELECT * FROM transactions where dt like '{day_filter}%'")
    export_to_s3(export_df, name)
    print(f"{day_filter=} exported")

In [None]:
for day in days:
    day_filter = str(day)[0:10]
    name = f"seasons-{day_filter}"
    df = read_from_s3(name)
    print(f"{day_filter=} {df.shape}")


In [None]:
for day in days:
    day_filter = str(day)[0:10]
    name = f"seasons-{day_filter}"
    df = read_from_s3(name)
    print(df.head(10))


# purge db for 2022

In [None]:
query("DELETE FROM transactions where dt between '2022-01-01' and '2023-01-01'")

In [None]:
query("SELECT min(dt) FROM transactions")