# Database ETL Tools

This notebook contains Database ETL examples.  

In this notebook we will only demo the **Redshift** functions. Similar functions exist in this package for other database types as well:

* MySQL
* Oracle
* Postgres
* Teradata
* SqlServer

# Connecting to a Database

In [3]:
import os
import pandas as pd
import pprint

import warnings
warnings.filterwarnings('ignore')

There are two functions that offer connectivity to Redshift:
* **conn_rs_pg**: uses the Postgres **psycopg2** package
* **conn_rs_sa**: uses the **sqlalchemy** package


To do all of the ETL examples, you will need a **cursor** and a **conn** object which each function provides.

For the remainder of the exercises in Redshift we will be using the **conn_rs_pg** package, but both examples are below.

In [4]:
#import the Redshift connection class
from pymagic.db_conn_tools import Redshift

#sqlalchemy
cursor_rs, conn_rs = Redshift.conn_rs_sa(
    host=os.environ['example_rs_host'],
    db=os.environ['example_rs_db'],
    user=os.environ['example_rs_user'],
    pwd=os.environ['example_rs_pwd'],
    port=os.environ['example_rs_port']
)

#psycopg2
cursor_rs, conn_rs = Redshift.conn_rs_pg(
    host=os.environ['example_rs_host'],
    db=os.environ['example_rs_db'],
    user=os.environ['example_rs_user'],
    pwd=os.environ['example_rs_pwd'],
    port=os.environ['example_rs_port']
)

# Running a Query

The **run_query_rs** executes a SQL statement on the server.  
All ETL CRUD operations use this under the hood.

In [5]:
#import the Redshift ETL class
from pymagic.db_etl_tools import Redshift

In [6]:
Redshift.run_query_rs(
    conn=conn_rs,
    sql="select 'hello world'"
)

[31mRuntime: 0.0020340333333333334


# Creating a Table from Pandas DataFrame

For the following exercises, we will go ahead and do a somewhat advanced task.  We will create some tables with data for us to play with for the subsequent examples.

Let's read in some data to play with.

In [7]:
import seaborn as sns;

df = sns.load_dataset('flights')
df['month'] = df['month'].astype(str)
df['year_month'] = df['year'].astype(str) + "_" + df['month']

In [8]:
df.head()

Unnamed: 0,year,month,passengers,year_month
0,1949,January,112,1949_January
1,1949,February,118,1949_February
2,1949,March,132,1949_March
3,1949,April,129,1949_April
4,1949,May,121,1949_May


In [9]:
sql = Redshift.make_df_tbl_rs(
    df=df,
    tbl_name="flights"
);

In [10]:
pprint.pprint(sql)

('CREATE TABLE flights ( year INTEGER, month VARCHAR(9), passengers INTEGER, '
 'year_month VARCHAR(14) )')


In [11]:
try:
    #if table already exists, lets drop it
    Redshift.run_query_rs(
        sql="drop table flights",
        conn=conn_rs
    )
except:
    conn_rs.commit()
    
#create the table
Redshift.run_query_rs(
    sql=sql,
    conn=conn_rs
    )   

[31mRuntime: 0.021303766666666668
[31mRuntime: 0.002461483333333333


# Inserting from Pandas DataFrame

Now that we've create a table in the database based on our Pandas DataFrame, let's actually insert data into it.

In [12]:
Redshift.insert_df_rs(
    cursor=cursor_rs,
    conn=conn_rs,
    df=df,
    tbl_name="flights"
)

[31mRuntime: 0.054683


# Reading Data

Reading data from a database uses Pandas' read_sql_query function with the SQL and database connection objects as parameters.

Let's read the data we loaded back in.

In [13]:
df = pd.read_sql_query(
    sql="select * from flights",
    con=conn_rs
)

In [14]:
df.tail()

Unnamed: 0,year,month,passengers,year_month
139,1960,August,606,1960_August
140,1960,September,508,1960_September
141,1960,October,461,1960_October
142,1960,November,390,1960_November
143,1960,December,432,1960_December


# Value Inserts

Sometimes we want to do a simple INSERT of values into a table row.

In [15]:
sql = Redshift.insert_val_rs(
    col_list=["year","month","passengers","year_month"],
    val_list=[1961, "January", 487,"1961_January"],
    tbl_name="flights"
)

In [16]:
pprint.pprint(sql)

('\n'
 '        INSERT INTO flights \n'
 '        (\n'
 '            year, month, passengers, year_month\n'
 '        ) values (\n'
 "            1961, 'January', 487, '1961_January'\n"
 '        )\n'
 '        ')


In [17]:
Redshift.run_query_rs(
    sql=sql,
    conn=conn_rs
)

[31mRuntime: 0.0022647833333333334


In [18]:
df = pd.read_sql_query(
    sql="select * from flights",
    con=conn_rs
)

In [19]:
df.tail()

Unnamed: 0,year,month,passengers,year_month
140,1960,September,508,1960_September
141,1960,October,461,1960_October
142,1960,November,390,1960_November
143,1960,December,432,1960_December
144,1961,January,487,1961_January


# Upserts

While an UPSERT with a given row of values might be useful from time to time, it is more common to have a "stage" table act as the source for new data to be inserted/updated on a target table.

For this example we will create a new table, our target table.

This table will have some "outdated" values and also will not have the "latest" data from our "stage" table.

The goal is to update the "outdated" or incorrect values from the stage table and insert the latest values into the target table.

In [20]:
# create the 'outdated' target table
# the target table only has data through 1959 
# the target table's passenger figures are also incorrect:
# (80% for months ending in 'y')

sql = '''
CREATE table flights_tgt 
as 
select 
src.year,
src.month,
cast(
        (
            case when right(src.month,1) = 'y' 
            then (src.passengers * 0.8) 
            else src.passengers 
            end
        ) as integer) as passengers, 
src.year_month
from flights src
where src.year < 1960
'''

try:
    Redshift.run_query_rs(
        sql="drop table flights_tgt",
        conn=conn_rs
    )
except:
    conn_rs.commit()

Redshift.run_query_rs(
    sql=sql,
    conn=conn_rs
)

[31mRuntime: 0.0017045833333333333
[31mRuntime: 0.003262066666666667


In [21]:
df_tgt = pd.read_sql_query(
    sql="select * from flights_tgt",
    con=conn_rs
)

df_tgt.tail()

Unnamed: 0,year,month,passengers,year_month
127,1959,March,406,1959_March
128,1959,May,336,1959_May
129,1959,July,438,1959_July
130,1959,September,463,1959_September
131,1959,November,362,1959_November


In [22]:
sql_update, sql_insert = Redshift.upsert_tbl_rs(
    src_tbl="flights", 
    tgt_tbl="flights_tgt", 
    src_join_cols=["year","month","year_month"], 
    src_insert_cols=["year","month","passengers","year_month"],
    src_update_cols=["passengers"], 
    update_compare_cols=["passengers"]
)

Below we have our two UPSERT sql statements, an UPDATE and an INSERT.

In [23]:
sql_update

'                 /* Update records*/                  UPDATE flights_tgt                  SET passengers = s.passengers                 FROM flights s                  WHERE flights_tgt.year = s.year AND  flights_tgt.month = s.month AND  flights_tgt.year_month = s.year_month  AND s.passengers != flights_tgt.passengers                 '

In [24]:
sql_insert

'             /* Insert records*/             INSERT INTO flights_tgt             SELECT s.year, s.month, s.passengers, s.year_month             FROM flights s              LEFT JOIN flights_tgt t              ON t.year = s.year AND  t.month = s.month AND  t.year_month = s.year_month              WHERE t.year IS NULL AND  t.month IS NULL AND  t.year_month IS NULL              '

Next we just need to run them in this order to update our 'outdated' records and insert our 'missing' records.

In [25]:
Redshift.run_query_rs(
    sql=sql_update,
    conn=conn_rs
)

[31mRuntime: 0.00237265


In [26]:
Redshift.run_query_rs(
    sql=sql_insert,
    conn=conn_rs
)

[31mRuntime: 0.0032544333333333333


Nowe we see that the two tables reflect the same data! The UPSERT was successful.

In [27]:
df_tgt = pd.read_sql_query(
    sql="select * from flights_tgt",
    con=conn_rs
)

df_src = pd.read_sql_query(
    sql="select * from flights",
    con=conn_rs
)


In [28]:
(
    df_src.sort_values(by=["year","month"]).reset_index(drop=True) == \
    df_tgt.sort_values(by=["year","month"]).reset_index(drop=True)
).all()

year          True
month         True
passengers    True
year_month    True
dtype: bool