# Final Project Code

In [14]:
# run in base directory
WORKDIR = "/home/awesome"
import os
os.chdir(WORKDIR)

# datatypes
import json
import yaml
# database connection
import psycopg2
import psycopg2.extras
import psycopg2.extensions as psql_ext
from psycopg2 import sql
# custom etl functions
from etl_process import utils as etl
# respective datasets
from etl_process.data_processing import station_info as info
# computation
import pandas as pd
# utilities
from pathlib import Path
import itertools
# typing
from typing import Union

# set up directories
WORKDIR_PATH = Path.cwd()
DATA_PATH = WORKDIR_PATH / 'etl_process' / 'processed_data'
SCHEMAS_PATH = WORKDIR_PATH / 'etl_process' / 'schemas'

PROJECT_SCHEMA = 'citibike_project'

# Set Up Database

### Connect, Set up schema

In [15]:
# PSQL db connection using psycopg2
conn = psycopg2.connect(
    dbname='new_db', 
    user='awesome_user', 
    password='awesome_password', 
    host='postgres', 
    port='5432'
)

In [16]:
etl.drop_recreate_schema(conn, PROJECT_SCHEMA)

Table 'irs_codes' dropped.
Table 'nyc_irs' dropped.
Table 'citibike_station_history' dropped.
Table 'station_info' dropped.
Table 'weather_general' dropped.
Table 'weather_precip' dropped.
Table 'weather_wind' dropped.
Table 'weather_pressure' dropped.
All tables in citibike_project dropped successfully.
Dropped Schema citibike_project.
Created Schema citibike_project.


### Clean all tables

In [17]:
schema_files = [item for item in SCHEMAS_PATH.iterdir() if item.is_file()]
tables_schemas = list(itertools.chain(*[etl.read_yaml_to_dict(schema_file)["tables"] for schema_file in schema_files]))
tables_schemas = {k: v for d in tables_schemas for k, v in d.items()}

for table_name, table_schema in tables_schemas.items():
# for table_name, table_schema in [("weather_precip", tables_schemas["weather_precip"])]:
    etl.drop_recreate_table(
        db_schema=PROJECT_SCHEMA,
        table_name=table_name,
        table_schema=table_schema,
        conn=conn,
    )


Dropping citibike_project.irs_codes
Creating citibike_project.irs_codes
Dropping citibike_project.nyc_irs
Creating citibike_project.nyc_irs
Dropping citibike_project.citibike_station_history
Creating citibike_project.citibike_station_history
Dropping citibike_project.station_info
Creating citibike_project.station_info
Dropping citibike_project.weather_general
Creating citibike_project.weather_general
Dropping citibike_project.weather_precip
Creating citibike_project.weather_precip
Dropping citibike_project.weather_wind
Creating citibike_project.weather_wind
Dropping citibike_project.weather_pressure
Creating citibike_project.weather_pressure


# Upload Data

### Aggregated Ride Data

In [None]:
traffic_data = [item for item in (DATA_PATH / "rides" / "station_traffic").iterdir() if item.is_file()]

for file in traffic_data:
    df = pd.read_csv(file)
    etl.upload_dataframe(
        conn=conn,
        dataframe=df,
        db_schema=PROJECT_SCHEMA,
        table_name="citibike_station_history",
        table_schema=tables_schemas["citibike_station_history"]
    )


### Weather Data

In [None]:
for file in [
    "weather_general",
    "weather_precip",
]:
    df = pd.read_csv(DATA_PATH / "weather" / f"{file}.csv")
    etl.upload_dataframe(
        conn=conn,
        dataframe=df,
        db_schema=PROJECT_SCHEMA,
        table_name=file,
        table_schema=tables_schemas[file]
    )

### Station Info

In [None]:
df_station_info = info.get_station_info_data()

etl.upload_dataframe(
    conn=conn,
    dataframe=df_station_info,
    db_schema=PROJECT_SCHEMA,
    table_name='station_info',
    table_schema=tables_schemas["station_info"]
)

In [None]:
# pd.read_sql_query(f"SELECT * FROM {PROJECT_SCHEMA}.station_info LIMIT 3", conn)

### IRS Data

In [None]:
for file in [
    "irs_codes",
    "nyc_irs",
]:
    df = pd.read_csv(DATA_PATH / "irs" / f"{file}.csv")
    etl.upload_dataframe(
        conn=conn,
        dataframe=df,
        db_schema=PROJECT_SCHEMA,
        table_name=file,
        table_schema=tables_schemas[file]
    )

### Close the connection

In [None]:
conn.close()