# New Schemas/Tables

From an existing data source, create new schemas/tables and insert the new data

When creating new schemas and tables we need to `commit` changes with `psycopg2` connections

In [None]:
import psycopg2
import pandas as pd
from aws_secrets import get_secret

In [None]:
def create_rw_conn(secrets):
    # pass along secrets to pyscopg2
    ENDPOINT = secrets['ENDPOINT']
    PORT = secrets['PORT']
    USER = secrets['USER']
    PASSWORD = secrets['PASSWORD']
    DATABASE = secrets['DATABASE']
    
    # create connection string    
    conn = psycopg2.connect(host=ENDPOINT, port=PORT, user=USER, 
        database=DATABASE, password=PASSWORD, sslmode='prefer', 
        sslrootcert="[full path]rds-combined-ca-bundle.pem")
    return conn


def fetch_results(sql, conn):
    cur = conn.cursor()
    cur.execute(sql)
    columns = [desc[0] for desc in cur.description]
    df = pd.DataFrame(cur.fetchall(), columns=columns)
    cur.close()
    return df


def exec_sql(sql, conn, commit_changes=False):
    cur = conn.cursor()
    cur.execute(sql)
    if commit_changes:
        conn.commit() # <- We MUST commit to reflect the inserted data
    cur.close()
    return "Success :)"

In [None]:
# create connection
secrets = get_secret()
conn = create_rw_conn(secrets=secrets)

## Load new data

In [None]:
new_weather_data = pd.read_csv("new-weather-data.csv", dtype={'fips_code':str})
new_weather_data

In [None]:
new_weather_data['state'].value_counts()

In [None]:
# remap state
new_weather_data['state'] = new_weather_data['state'].map({'CA': 'california', 'TX': 'texas', 'NY': 'newyork'})
new_weather_data.head()

In [None]:
# new_weather_data['state'] = new_weather_data['state'].map({'california': 'CA', 'texas': 'TX', 'newyork': 'NY'})

### Create new schemas and tables

In [None]:
sql_create_schema = """
    CREATE SCHEMA IF NOT EXISTS {state};
    """

In [None]:
sql_create_table = """
    CREATE TABLE IF NOT EXISTS {state}.weather_county (
                id serial PRIMARY KEY,
                fips_code varchar(10) UNIQUE,
                county_name varchar(20),
                temperature numeric NOT NULL,
                fahrenheit bool DEFAULT true,
                updated_at timestamp default now(),
                created_at timestamp default now()
            );
"""

In [None]:
grouped = new_weather_data.groupby(by='state')

In [None]:
for state, df in grouped:
    # don't forget to commit changes!!
    exec_sql(sql_create_schema.format(state=state), conn=conn, commit_changes=True)
    exec_sql(sql_create_table.format(state=state), conn=conn, commit_changes=True)
    print("[{}] schema + table created".format(state))

### Check if new schemas/tables were successfully created

In [None]:
sql = """
    SELECT s.catalog_name, s.schema_owner, s.schema_name, t.table_name
    FROM information_schema.schemata s
    JOIN information_schema.tables t ON s.catalog_name = t.table_catalog and s.schema_name = t.table_schema
    WHERE schema_owner = 'johnnyboycurtis';
    """

fetch_results(sql=sql, conn=conn)

In [None]:
# close connection to prevent conflicts
conn.close()

### Insert new data/update existing data

In [None]:
from psycopg2.extras import execute_values

In [None]:
# new connection
conn = create_rw_conn(secrets=secrets)

In [None]:
sql_insert = """    
    INSERT INTO {state}.weather_county(fips_code, county_name, temperature)
    VALUES %s
    ON CONFLICT (fips_code) DO UPDATE
    SET
        temperature=excluded.temperature,
        updated_at=NOW()
    ;"""

In [None]:
# test
#new_weather_data = new_weather_data.loc[new_weather_data['state'] == 'texas']
#new_weather_data

In [None]:
grouped = new_weather_data.groupby(by='state')

conn = create_rw_conn(secrets=secrets)

In [None]:
for state, df in grouped:
    # select only the neccessary columns
    df = df[['fips_code', 'county_name', 'temperature']]
    print("[{}] upsert...".format(state))
    # convert dataframe into list of lists for `execute_values`
    data = [tuple(x) for x in df.values.tolist()]
    cur = conn.cursor()
    execute_values(cur, sql_insert.format(state=state), data)
    conn.commit() # <- We MUST commit to reflect the inserted data
    print("[{}] changes were commited...".format(state))
    cur.close()

### Review data

In [None]:
sql = "SELECT * FROM california.weather_county;"
df = fetch_results(sql=sql, conn=conn)
df.head(10)