In [None]:
import psycopg2
import pandas as pd
import datetime as dt
from io import StringIO

In [None]:
host="127.0.0.1"
user="postgres"
pw="postgres"
system_db="postgres"
jaffle_db="jaffle_shop"

In [None]:
def reset_db():
    conn = psycopg2.connect(f"dbname={system_db} user={user} password={pw} host={host}")
    conn.autocommit = True
    cur = conn.cursor()
    cur.execute("DROP DATABASE IF EXISTS jaffle_shop;")
    cur.execute("CREATE DATABASE jaffle_shop;")
    cur.close()
    conn.close()
    conn = psycopg2.connect(f"dbname={jaffle_db} user={user} password={pw} host={host}")
    conn.autocommit = True
    cur = conn.cursor()
    cur.execute("CREATE SCHEMA IF NOT EXISTS raw;")
    cur.close()
    conn.close()

In [None]:
def create_tables():
    conn = psycopg2.connect(f"dbname={jaffle_db} user={user} password={pw} host={host}")
    conn.autocommit = True
    cur = conn.cursor()
    cur.execute("""Drop Table IF EXISTS raw.raw_customers""")
    cur.execute("""CREATE TABLE IF NOT EXISTS raw.raw_customers(
            id INT PRIMARY KEY,
            first_name VARCHAR,
            last_name VARCHAR,
            _etl_loaded_at TIMESTAMP
        );""")
    cur.execute("""Drop Table IF EXISTS raw.raw_orders""")
    cur.execute("""CREATE TABLE IF NOT EXISTS raw.raw_orders(
            id INT PRIMARY KEY,
            user_id INT,
            order_date DATE,
            status VARCHAR,
            _etl_loaded_at TIMESTAMP
        );""")
    cur.execute("""Drop Table IF EXISTS raw.raw_payments""")
    cur.execute("""CREATE TABLE IF NOT EXISTS raw.raw_payments(
            id INT PRIMARY KEY,
            order_id INT,
            payment_method VARCHAR,
            status VARCHAR,
            amount INT,
            created DATE,
            _etl_loaded_at TIMESTAMP
        );""")
    cur.execute("""Drop Table IF EXISTS raw.raw_customer_addresses""")
    cur.execute("""CREATE TABLE IF NOT EXISTS raw.raw_customer_addresses(
            id INT PRIMARY KEY,
            user_id INT,
            city VARCHAR,
            updated_at DATE,
            _etl_loaded_at TIMESTAMP
        );""")
    cur.execute("""Drop Table IF EXISTS raw.raw_analytics""")
    cur.execute("""CREATE TABLE IF NOT EXISTS raw.raw_analytics(
            id INT PRIMARY KEY,
            customer_id INT,
            event VARCHAR,
            timestamp TIMESTAMP,
            _etl_loaded_at TIMESTAMP
        );""")
    cur.close()
    conn.close()

In [None]:
def copy_from_stringio(conn, df, table):
    """
    Here we are going save the dataframe in memory 
    and use copy_from() to copy it to the table
    """
    # save dataframe to an in memory buffer
    buffer = StringIO()
    df.to_csv(buffer, index=False, header=True)
    buffer.seek(0)
    
    cursor = conn.cursor()
    try:
        cursor.copy_expert(f"COPY {table} FROM STDIN WITH CSV HEADER DELIMITER as ','", buffer)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print(f"Uploading {table} done")
    cursor.close()

In [None]:
def upload_data():
    conn = psycopg2.connect(f"dbname={jaffle_db} user={user} password={pw} host={host}")
    payments = pd.read_csv("jaffle_shop/seeds/raw_payments.csv")
    payments["_etl_loaded_at"]=dt.datetime.now()
    copy_from_stringio(conn,payments,"raw.raw_payments")
    
    orders = pd.read_csv("jaffle_shop/seeds/raw_orders.csv")
    orders["_etl_loaded_at"]=dt.datetime.now()
    copy_from_stringio(conn,orders,"raw.raw_orders")
    
    customers = pd.read_csv("jaffle_shop/seeds/raw_customers.csv")
    customers["_etl_loaded_at"]=dt.datetime.now()
    copy_from_stringio(conn,customers,"raw.raw_customers")

    customer_addresses = pd.read_csv("jaffle_shop/seeds/raw_customer_addresses.csv")
    customer_addresses["_etl_loaded_at"]=dt.datetime.now()
    copy_from_stringio(conn,customer_addresses,"raw.raw_customer_addresses")

    analytics = pd.read_csv("jaffle_shop/seeds/raw_analytics.csv")
    analytics["_etl_loaded_at"]=dt.datetime.now()
    copy_from_stringio(conn,analytics,"raw.raw_analytics")
    conn.close()

In [None]:
def full_reset():
    print("Resetting DB...")
    reset_db()
    print("Recreating Tables...")
    create_tables()
    print("Uploading Data...")
    upload_data()

In [None]:
full_reset()