## Using Python to Perform Extract-Transform-Load (ETL Processing)
Modern Data Warehousing and Analytics solutions frequently use languages like Python or Scala to extract data from numerous sources, including relational database management systems, NoSQL database systems, real-time streaming endpoints and Data Lakes.  These languages can then be used to perform many types of transformation before then loading the data into a variety of destinations including file systems and data warehouses. This data can then be consumed by data scientists or business analysts.

In this lab you will recreate the **Northwind_DW** dimensional database from Lab 2; however, you'll take an entirely different approach. Instead of extracting, transforming and loading the date entirely on the database system only using SQL data definition language (DDL) and data manipulation language (DML) statements, here you will learn to interact with the RDBMS from a remote client running Python. You will learn to fetch data into Pandas DataFrames, perform all the necessary transformations in-memory on the client, and then push the newly transformed DataFrame back to the RDBMS using a Pandas function that will create the table and fill it with data with a single operation.

### Prerequisites:
This notebook uses the SqlAlchemy database connectivity library to connect to MySQL databases; therefore, you must have first installed that libary into your python environment by executing the following command in a Terminal window.

- `python -m pip install sqlalchemy`

#### Import the Necessary Libraries

In [None]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine

#### Declare & Assign Connection Variables for the MySQL Server & Databases with which You'll be Working 

In [None]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "northwind"
dst_dbname = "northwind_dw2"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [None]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.
Clearly, you won't get very far without having a database to work with. Here we demonstrate how we can *drop* a database if it already exists, and then *create* the new **northwind_dw2** database and *use* it as the target of all subsequent operations.

In [None]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
connection.execute(f"CREATE DATABASE `{dst_dbname}`;")
connection.execute(f"USE {dst_dbname};")

connection.close()

### 1.0. Create & Populate the Dimension Tables
In any extract-transform-load (ETL) process used to populate a multi-dimensional data warehouse database it is necessary to populate the Dimension tables before attempting to populate the Fact table(s). This is because rows in the Fact table(s) will reference surrogate primary key values from the Dimension tables. If the primary key values in the Dimension tables either do not exist, or do not reflect the current state of the dimension, then the attempt to load the Fact table(s) will fail.


#### 1.1. Extract Data from the Source Database Tables
Fetch data for each dimension table (e.g., customers, employees, products, shippers) from the **northwind** database using the **get_dataframe()** function.

In [None]:
sql_customers = "SELECT * FROM northwind.customers;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

In [None]:
# Employees

In [None]:
# Products

In [None]:
# Shippers

#### 1.2. Create the Date Dimension Table
At this point, we have to **execute the script from Lab 2c** that creates and populates a **Date Dimension** table.  Be certain to target this script to the new data warehouse database we just created **(northwind_dw2)**.  Later in this notebook we will integrate the **dim_date** table with the fact table by performing **lookup operations** to retrieve the surrogate primary keys from the **dim_date** table that correspond with each **date** typed column in the fact table (e.g., order_date).

#### 1.3. Perform Any Necessary Transformations
Pandas DataFrames enable extensive data modification capabilities. Here we will start by simply dropping features (columns) that we don't believe provide any real value to our analytics solution. Examples include columns having a high percentage of NULL values, columns having large amounts of free-text, and columns having binary large object (BLOB) data such as images or other documents. Then, we will rename the primary key column from the source (id) to serve as the business key for future lookup operations. Finally, we will *insert* a new primary key column that contains and ever-increasing numeric value.  It should be named after the entity (e.g., customer, product) followed by "**_key**" to conform with data warehouse design standards.

In [None]:
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['email_address','home_phone','mobile_phone','web_page','notes','attachments']
df_customers.drop(drop_cols, axis=1, inplace=True)

# 2. Rename the "id" column to reflect the entity as it will serve as the business key for lookup operations
df_customers.rename(columns={"id":"customer_id"}, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_customers.head(2)

In [None]:
# Employees

In [None]:
# Products

In [None]:
# Shippers

#### 1.4. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables
Here we demonstrate how an iterable data structure can be created containing the values needed to correctly create and populate the new dimension tables. If you inspect this code listing carefully, you'll notice that it's a **list** containing a **set** *(or vector)* for each dimension table. Each **set** then contains the *table_name* we need to assign to the table, the *pandas DataFrame* we crafted to define & populate the table, and the name we need to assign to the *primary_key* column.  With this *list of sets* defined, we can then call our **set_dataframe( )** function from within a **for *loop*** to create each *dimension* table.

In [None]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_employees', df_employees, 'employee_key'),
          ('dim_products', df_products, 'product_key'),
          ('dim_shippers', df_shippers, 'shipper_key')]

In [None]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### 2.0. Create & Populate the Fact Table
Here we will learn two approaches to creating the *fact_orders* fact table. The first approach demonstrates that a carefully crafted SQL SELECT statement can be used to perform this task... *but what fun would that be.* Seriously though, this approach is quick and effect if you already have the query, but what if you didn't have the opportunity to view and work with the data beforehand?  What's more, you may be required to combine data from multiple sources, some of which may not be relational database management systems. Then, a simple SQL query won't do!  You would need to load the data from the various sources (e.g., database tables, CSV or JSON files, NoSQL document collections, API stream data) and then combine them into a single dataframe that you could then use to create a new database table. For this reason we'll see how we can retrieve the data, but we won't bother to use it for creating a new table... we already know how to do that using the **set_dataframe( )** function anyway.

### First, you *could* simply use the SQL SELECT statement you authored in Lab 2 
Just as we could create a new table using the SQL "CREATE TABLE AS SELECT..." or CTAS construct, it is possible to create new tables simply by crafting a new SQL result set and using it to populate a new Pandas DataFrame.  **However, this wouldn't demonstrate the power inherent to Pandas DataFrames.**

In [None]:
sql_fact_orders = """
    SELECT o.id AS order_id,
        od.id AS order_detail_id,
        o.customer_id,
        o.employee_id,
        od.product_id,
        o.shipper_id,
        o.order_date,
        o.paid_date,
        o.shipped_date,
        o.payment_type,
        od.quantity,
        od.unit_price,
        od.discount,
        o.shipping_fee,
        o.taxes,
        o.tax_rate,
        os.status_name AS order_status,
        ods.status_name AS order_details_status
    FROM northwind.orders AS o
    INNER JOIN northwind.orders_status AS os
    ON o.status_id = os.id
    RIGHT OUTER JOIN northwind.order_details AS od
    ON o.id = od.order_id
    INNER JOIN northwind.order_details_status AS ods
    ON od.status_id = ods.id;
"""

df_fact_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_fact_orders)
df_fact_orders.head(2)

### Instead, implement the solution using Pandas DataFrames to craft the table
This is where we get to the point of this lab... *transforming in-memory data.*   First, we'll query the source **northwind** database to fill a *dataframe* for each of the source tables we need to create our *fact_orders* fact table; orders, orders_status, order_details and order_details_status. Then, we'll learn how to *join* those *dataframes* using the <a href="https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge.html">**merge( )**</a> method of the Pandas DataFrame.  We'll make any additional changes that we expect to see reflected in the *fact* table in our new MySQL database, including the addition of **foreign key references** to the dimension tables, and then we'll push the *dataframe* back to the MySQL server to create and populate the new *fact* table.

#### 2.1. Get all the data from each of the four tables involved

In [None]:
sql_orders = "SELECT * FROM northwind.orders;"
df_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_orders)
df_orders.rename(columns={"id":"order_id"}, inplace=True)
df_orders.head(2)

In [None]:
# 1. SELECT all columns from the northwind.orders_status table to create the "df_order_status" dataframe
# 2. Rename the "id" column to "status_id"
# 3. Display the first two rows of the DataFrame to validate your work

In [None]:
# 1. SELECT all columns from the northwind.order_details table to create the "df_order_details" dataframe
# 2. Rename the "id" column to "order_detail_id"
# 3. Display the first two rows of the DataFrame to validate your work

In [None]:
# 1. SELECT all columns from the northwind.order_details_status table to create the "df_order_details_status" dataframe
# 2. Rename the "id" column to "status_id"
# 3. Display the first two rows of the DataFrame to validate your work

#### 2.2. Get the order_status column.
Here we use the dataframe's <a href="https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge.html">**merge(** *'left dataframe', 'right dataframe', on='key column', how='left' | 'right' | 'inner'* **)**</a> method to **inner join** the *orders* and the *orders_status* dataframes **on** the *status_id* column.  We then use the dataframe's **rename( )** method to rename the *status_name* column to *order_status*, and use the dataframe's **drop( )** method to remove the *status_id* column.

In [None]:
df_orders = pd.merge(df_orders, df_orders_status, on='status_id', how='inner')
df_orders.rename(columns={"status_name":"order_status"}, inplace=True)
df_orders.drop(['status_id'], axis=1, inplace=True)
df_orders.head(2)

#### 2.3. Get the order_details_status column.
Here we **repeat the sequence of operations we used in the previous step** to *inner join* the *order_details* and *order_details_status* dataframes for the sake of including the *order_details_status* column in place of the *status_id* column.

#### 2.4. Join the Orders and OrderDetails DataFrames
In this step we can now easily join the *orders* and *order_details* dataframes. Since each **order** (the *left* dataframe) can have many **order details** (the *right* dataframe), we'll need to implement a **right** *outer join* **on** the *order_id* column.

In [None]:
df_fact_orders.shape

#### 2.5. Lookup the Primary Keys from the Dimension Tables
Just as we did in **Lab 1**, we need to establish **foreign key relationships** between the newly-crafted **Fact table** and each of the **Dimension tables**.

##### 2.5.1. First, fetch the Surrogate Primary Key and the Business Key from each of the Dimension tables using the **get_dataframe()** function.

In [None]:
# Select 'customer_key' and 'customer_id' from northwind_dw2.dim_customers

In [None]:
# Employees

In [None]:
# Products

In [None]:
# Shippers

##### 2.5.2. Next, using the Business Keys, lookup the corresponding Surrogate Primary Key values in the Dimension tables

In [None]:
# 1. Modify 'df_fact_orders' by merging it with 'df_dim_customers' on the 'customer_id' column
# 2. Drop the 'customer_id' column
# 3. Display the first 2 rows of the dataframe to validate your work

In [None]:
# Repeat for the Employees dimension

In [None]:
# Repeat for the Product dimension

In [None]:
# Repeat for the Shipper dimension

##### 2.5.3. Lookup the DateKeys from the Date Dimension Table.
First, fetch the Surrogate Primary Key (date_key) and the Business Key (full_date) from the Date Dimension table using the **get_dataframe()** function. Be certain to cast the **full_date** column to the **datetime64[ns]** data type using the **.astype()** function that is native to Pandas DataFrame columns. Also, extract the **date** portion using the **.dt.date** attribute of the **datetime64[ns]** datatype.

In [None]:
sql_dim_date = "SELECT date_key, full_date FROM northwind_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Next, for each **date** typed column in the Fact table, lookup the corresponding Primary Key column. Be certain to cast each **date** column to the **datetime64[ns]** data type using the **.astype()** function that's native to Pandas DataFrame columns. Also, extract the **date** portion using the **.dt.date** attribute.

In [None]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "order_date" Column.
df_dim_order_date = df_dim_date.rename(columns={"date_key" : "order_date_key", "full_date" : "order_date"})
df_fact_orders.order_date = df_fact_orders.order_date.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_order_date, on='order_date', how='left')
df_fact_orders.drop(['order_date'], axis=1, inplace=True)
df_fact_orders.head(2)

In [None]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "paid_date" Column.

In [None]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "shipped_date" Column.

#### 2.6. Perform any Additional Transformations
In this step we can prepare the DataFrame so that it defines exactly what we want to see created in the database.  Issues may include dropping unwanted columns, reordering the columns, and in our case, creating a new column to serve as the primary key.

In [None]:
# 1. Drop the columns of no particular interest
# 2. Reorder the remaining columns
# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
# 4. Display the first 2 rows of the dataframe to validate your work

#### 2.7. Write the DataFrame Back to the Database

In [None]:
table_name = "fact_orders"
primary_key = "fact_order_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_name, primary_key, db_operation)

### 3.0. Demonstrate that the New Data Warehouse Exists and Contains the Correct Data
To demonstrate the viability of your solution, author a SQL SELECT statement that returns:
- Each Customer’s Last Name
- The total amount of the order quantity associated with each customer
- The total amount of the order unit price associated with each customer

**NOTE:** *Remember that a string typed variable whose value is contained by triple-quotes (""" ... """) can preserve multi-line formatting, and that a string variable has an intrinsic **.format()** function that accepts ordered parameters that will replace tokens (e.g., {0}) in the formatted string.*  

In [None]:
sql_test = """

""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

### 3.1 Extra Credit: Author a Query that Returns the Total Shipping Fee per Shipper

In [None]:
sql_test2 = """

""".format(dst_dbname)

df_test2 = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test2)
df_test2.head()