## Using Python to Perform Extract-Transform-Load (ETL Processing)
Modern Data Warehousing and Analytics solutions frequently use languages like Python or Scala to extract data from numerous sources, including relational database management systems, NoSQL database systems, real-time streaming endpoints and Data Lakes.  These languages can then be used to perform many types of transformation before then loading the data into a variety of destinations including file systems and data warehouses. This data can then be consumed by data scientists or business analysts.

In this lab you will recreate the **Northwind_DW** dimensional database from Lab 2; however, you'll take an entirely different approach. Instead of extracting, transforming and loading the date entirely on the database system entirely using SQL data definition language (DDL) and data manipulation language (DML) statements, here you will learn to interact with the RDBMS from a remote client running Python. You will learn to fetch data into Pandas DataFrames, perform all the necessary transformations in-memory on the client, and then push the newly transformed DataFrame back to the RDBMS using a Pandas function that will create the table and fill it with data with a single operation.

### Prerequisites:
#### Import the Necessary Libraries

In [1]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine

In [2]:
#### Declare & Assign Connection Variables for the MySQL Server & Databases with which You'll be Working 

In [3]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [4]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [5]:
#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.
#Clearly, you won't get very far without having a database to work with. Here we demonstrate how we can *drop* a database if it already exists, and then *create* the new **northwind_dw2** database and *use* it as the target of all subsequent operations.

In [6]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x2076b53e3d0>

### 1.0. Create & Populate the Dimension Tables
#### 1.1. Extract Data from the Source Database Tables

In [7]:
sql_customer = "SELECT * FROM sakila.customer;"
df_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [8]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


#### 1.2. Perform Any Necessary Transformations
Pandas DataFrames enable extensive data modification capabilities. Here we will start by simply dropping features (columns) that we don't believe provide any real value to our analytics solution. Examples include columns having a high percentage of NULL values, columns having large amounts of free-text, and columns having binary large object (BLOB) data such as images or other documents. Then, we will rename the primary key column (id) to conform with data warehouse design standards.

In [9]:
drop_cols = ['last_update']
df_customer.drop(drop_cols, axis=1, inplace=True)
df_customer.rename(columns={"customer_id":"customer_key"}, inplace=True)

df_customer.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,email,address_id,active,create_date
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36


In [10]:
drop_cols = ['last_update']
df_payment.drop(drop_cols, axis=1, inplace=True)
df_payment.rename(columns={"payment_id":"payment_key","customer_id":"customer_key"}, inplace=True)

df_payment.head(2)

Unnamed: 0,payment_key,customer_key,staff_id,rental_id,amount,payment_date
0,1,1,1,76,2.99,2005-05-25 11:30:37
1,2,1,1,573,0.99,2005-05-28 10:35:23


#### 1.4. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables
Here I demonstrate how we can create an iterable data structure containing the values needed to correctly create and populate the new dimension tables. If you inspect this code listing carefully, you'll notice that it's a **list** containing a **set** *(or vector)* for each dimension table. Each **set** then contains the *table_name* we need to assign to the table, the *pandas DataFrame* we crafted to define & populate the table, and the name we need to assign to the *primary_key* column.  With this *list of sets* defined, we can then call our **set_dataframe( )** function from within a **for *loop*** to create each *dimension* table.

In [11]:
db_operation = "insert"

tables = [('dim_customer', df_customer, 'customer_key'),
          ('dim_payment', df_payment, 'payment_key')
         ]

In [12]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### 2.0. Create & Populate the Fact Table
Here we will learn two approaches to creating the *fact_orders* fact table. The first approach demonstrates that a carefully crafted SQL SELECT statement can be used to perform this task... *but what fun would that be.* Seriously though, this approach is quick and effect if you already have the query, but what if you didn't have the opportunity to view and work with the data beforehand?  What's more, you may be required to combine data from multiple sources, some of which may not be relational database management systems. Then, a simple SQL query won't do!  You would need to load the data from the various sources (e.g., database tables, CSV or JSON files, NoSQL document collections, API stream data) and then combine them into a single dataframe that you could then use to create a new database table. For this reason we'll see how we can retrieve the data, but we won't bother to use it for creating a new table... we already know how to do that using the **set_dataframe( )** function anyway.

#### 2.1. First, you could simply use the SQL SELECT statement you authored in Lab 2 

In [13]:
df_fact_payment = pd.merge(df_customer, df_payment, on='customer_key', how='inner')
df_fact_payment.insert(0, "step", range(1, df_fact_payment.shape[0]+1))

df_fact_payment.head(2)

Unnamed: 0,step,customer_key,store_id,first_name,last_name,email,address_id,active,create_date,payment_key,staff_id,rental_id,amount,payment_date
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,1,1,76,2.99,2005-05-25 11:30:37
1,2,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2,1,573,0.99,2005-05-28 10:35:23


##### 2.2.5. Get the Data from the Date Dimension Table.
First, fetch the Surrogate Primary Key (date_key) and the Business Key (full_date) from the Date Dimension table using the **get_dataframe()** function. Also, be certain to cast the **full_date** column to the **datetime64** data type using the **.astype()** function that is native to Pandas DataFrame columns.

In [14]:
df_fact_payment.create_date = df_fact_payment.create_date.astype('datetime64')
df_fact_payment.payment_date = df_fact_payment.payment_date.astype('datetime64')
df_fact_payment.head(2)

Unnamed: 0,step,customer_key,store_id,first_name,last_name,email,address_id,active,create_date,payment_key,staff_id,rental_id,amount,payment_date
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,1,1,76,2.99,2005-05-25 11:30:37
1,2,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2,1,573,0.99,2005-05-28 10:35:23


In [15]:
# Rename Foreign Key Columns
df_fact_payment.rename(columns={"create_date" : "created_date_key",
                               "payment_date" : "paid_date_key",
                               "step" : "fact_payment_key",
                               "store_id" : "store_key",
                               "first_name" : "customer_first_name",
                               "last_name" : "customer_last_name",
                               "email" : "customer_email",
                               "staff_id" : "staff_key",
                               "rental_id" : "rental_key",
                               "amount" : "paid_amount"}, inplace=True)


# Reorder the columns
ordered_columns = ["customer_key",
                  "store_key",
                  "staff_key",
                  "rental_key",
                  "created_date_key",
                  "paid_date_key",
                  "customer_first_name",
                  "customer_last_name",
                  "customer_email",
                  "paid_amount"
                  ]
df_fact_payment = df_fact_payment[ordered_columns]
df_fact_payment.insert(0, "fact_payment_key", range(1, df_fact_payment.shape[0]+1))
df_fact_payment.head(5)

Unnamed: 0,fact_payment_key,customer_key,store_key,staff_key,rental_key,created_date_key,paid_date_key,customer_first_name,customer_last_name,customer_email,paid_amount
0,1,1,1,1,76,2006-02-14 22:04:36,2005-05-25 11:30:37,MARY,SMITH,MARY.SMITH@sakilacustomer.org,2.99
1,2,1,1,1,573,2006-02-14 22:04:36,2005-05-28 10:35:23,MARY,SMITH,MARY.SMITH@sakilacustomer.org,0.99
2,3,1,1,1,1185,2006-02-14 22:04:36,2005-06-15 00:54:12,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5.99
3,4,1,1,2,1422,2006-02-14 22:04:36,2005-06-15 18:02:53,MARY,SMITH,MARY.SMITH@sakilacustomer.org,0.99
4,5,1,1,2,1476,2006-02-14 22:04:36,2005-06-15 21:08:46,MARY,SMITH,MARY.SMITH@sakilacustomer.org,9.99


In [16]:
table_name = "fact_payment"
primary_key = "fact_payment_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_payment, table_name, primary_key, db_operation)

In [17]:
sql_test2 = """
    SELECT customer.`customer_email` AS `customer_email`,
        SUM(payment.`paid_amount`) AS `paid_amount`
    FROM `{0}`.`fact_payment` AS payment
    INNER JOIN `{0}`.`fact_payment` AS customer
    ON payment.customer_key = customer.customer_key
    GROUP BY customer.`customer_email`
""".format(dst_dbname)

df_test2 = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test2)

In [18]:
df_test2.head()

Unnamed: 0,customer_email,paid_amount
0,MARY.SMITH@sakilacustomer.org,3797.76
1,PATRICIA.JOHNSON@sakilacustomer.org,3475.71
2,LINDA.WILLIAMS@sakilacustomer.org,3529.24
3,BARBARA.JONES@sakilacustomer.org,1799.16
4,ELIZABETH.BROWN@sakilacustomer.org,5495.56
