In [None]:
import nest_asyncio
nest_asyncio.apply()

In [1]:
# Run dependencies
%run transformation.ipynb

500 columns processed...
1000 columns processed...
1500 columns processed...
500 columns processed...
1000 columns processed...
1500 columns processed...


# Loading

In [None]:
# *******************************************LOADING*******************************************
import psycopg2
engine=create_engine(f"postgresql://{connection_string}")

In [None]:
# Load into SQL
holiday_df.to_sql(name="holiday", con=engine, if_exists='append', index=False)

walmart.to_sql(name="walmart", con=engine, if_exists='append', index=False)

stock.to_sql(name="stock", con=engine, if_exists='append', index=False)

state_id.to_sql(name="state_id", con=engine, if_exists='append', index=False)

store.to_sql(name="store", con=engine, if_exists='append', index=False)

marketShare_final.to_sql(name="marketshare", con=engine, if_exists='append', index=False)

In [None]:
# Add primary keys
engine.execute('ALTER TABLE "store" ADD PRIMARY KEY (id)')
engine.execute('ALTER TABLE "walmart" ADD PRIMARY KEY ("ID")')
engine.execute('ALTER TABLE "holiday" ADD PRIMARY KEY (Date)')
engine.execute('ALTER TABLE "marketshare" ADD PRIMARY KEY (CITY)')
engine.execute('ALTER TABLE "state_id" ADD PRIMARY KEY (state_id)')
engine.execute('ALTER TABLE "stock" ADD PRIMARY KEY (Date)')

In [None]:
# Create csv files
df_sales.to_csv(('Resources/clean/d3_sales.csv'),index=False)
df_price_changes.to_csv(('Resources/clean/d3_price_changes.csv'),index=False)
df_sales_categories.to_csv(('Resources/clean/d3_categories.csv'),index=False)
df_sales_stores.to_csv(('Resources/clean/d3_stores.csv'),index=False)
df_sales_items.to_csv(('Resources/clean/d3_items.csv'),index=False)
df_ecomm.to_csv(('Resources/clean/d7_ecomm.csv'),index=False)

In [None]:
import io
import getpass
# this code loops through the folder of cleaned .csv files and loads them to PostgreSQL
# this is over 20 faster than using sqlalchemy and df.to_sql for long tables

# files are read to memory using StringIO in the io package
# 'copy [table] from stdin' in PostgreSQL, which directly from memory on the local computer

folder_name = os.path.join('Resources/clean')

conn_host = 'otto.db.elephantsql.com'
conn_dbname = 'ofiglsqd'
conn_user = 'ofiglsqd'

#conn_pass = getpass.getpass(prompt='password: ')

# loop through .csv files in the output folder
for file in os.listdir(folder_name):

    print('\n\n' + str(datetime.utcnow()) + ' ' + str(file) + ' to be loaded')

    print(str(datetime.utcnow()) + ' reading file to dataframe...')
    
    # read .csv file into dataframe
    df = pd.read_csv(os.path.join(folder_name, file), na_values=['nan','NA','NaN'])
    
    print(str(datetime.utcnow()) + ' completed')
    
    print(df.info(memory_usage='deep'))
    
    # 
    with psycopg2.connect(host=conn_host, dbname=conn_dbname, user=conn_user, password=conn_pass) as conn:
        conn.autocommit = True

        table_name = file.split('.csv')[0].lower().replace('-','_')

        output = io.StringIO()

        print(str(datetime.utcnow()) + ' reading file to memory using StringIO...')

        df.to_csv(output, sep='|', header=False, index=False)
        output.seek(0)

        print(str(datetime.utcnow()) + ' completed')

        print(str(datetime.utcnow()) + ' generating the create table statement...')
        
        qry = pd.io.sql.get_schema(df, table_name, con=conn)

        qry = qry.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')

        for key in df.columns:
            if pd.api.types.infer_dtype(df[key], skipna=True) == 'boolean':
                start = qry.find(key)
                end = start + qry[start:].find(',')
                print(start, end)
                qry = qry[:start] + key + '" BOOLEAN' + qry[end:]
        try:
            with conn.cursor() as cur:
                print(str(datetime.utcnow()) + ' completed')
                print(qry)
                
                print(str(datetime.utcnow()) + ' executing the create table statement...')
                cur.execute(qry)
                print(str(datetime.utcnow()) + ' completed')
                
                print(str(datetime.utcnow()) + ' loading table to database...')
                cur.copy_expert("""COPY %s FROM STDIN WITH (FORMAT csv, DELIMITER '|', QUOTE '"')""" % table_name, output)
                print(str(datetime.utcnow()) + ' completed')

        except Exception as e:
            print('Error:\n' + str(e))


In [None]:
# Add primary key
engine.execute('ALTER TABLE "d3_sales" ADD PRIMARY KEY ("index")')
engine.execute('ALTER TABLE "d3_stores" ADD PRIMARY KEY (store_id)')
engine.execute('ALTER TABLE "d3_price_changes" ADD PRIMARY KEY (index)')


In [None]:
# Add primary key
engine.execute('ALTER TABLE "d3_categories" ADD PRIMARY KEY (cat_id)')
engine.execute('ALTER TABLE "d3_items" ADD PRIMARY KEY (item_id)')
engine.execute('ALTER TABLE "d7_ecomm" ADD PRIMARY KEY (ecomm_id)')

In [None]:
# Add financial data tables
revenue_df.to_sql(name='revenue', con=engine, if_exists='append', index=True)
netincome_df.to_sql(name='net_income', con=engine, if_exists='append', index=True)
earnings_df.to_sql(name='earnings', con=engine, if_exists='append', index=True)

In [None]:
# Add primary key
engine.execute('ALTER TABLE "earnings" ADD PRIMARY KEY (date)')
engine.execute('ALTER TABLE "net_income" ADD PRIMARY KEY (date)')
engine.execute('ALTER TABLE "revenue" ADD PRIMARY KEY (date)')

In [None]:
# Check tables in database
engine.table_names()

In [None]:
# Close connection
engine.dispose()