# ETL Worfklow
---

## Setup


Importing my libraries of choice

In [1]:
import os
import pyspark.sql
import requests

Getting our SQL Server credientials ahead of time

In [2]:
db_user = os.environ.get('DB_USER')
db_password = os.environ.get('DB_PASSWORD')

Creating a spark session 

In [3]:
spark_sesh = pyspark.sql.SparkSession.builder.appName('Credit Card ETL').getOrCreate()

In [4]:
spark_sesh

## Extract 
---

I'm going to extract data from two datasets pertaining to the same bank, which are:


- The Credit Card Dataset
    - cdw_sapp_customer.JSON
    - cdw_sapp_branch.JSON 
    - cdw_sapp_credit.JSON

<br>

- The Bank Loan Application Dataset
    - Loan Application API Endpoint

📝 Notes: <br>
- The customer.JSON file and branch.JSON file contains information about the bank customer and bank branch, respectively. 
- The credit.JSON file contains information about credit card transactions
- The Loan Application is with respect to loans for purchasing homes, and includes information like whether or not the individuals were approved, gender, maritual status, and income.

Extracting the JSON Files with sparksession.read.load()


In [5]:
# JSON Files
branch_df = spark_sesh.read.load('../../data/credit_card_dataset/cdw_sapp_branch.json', format='json')  # 👀 don't forget the to specify the format
customer_df = spark_sesh.read.load('../../data/credit_card_dataset/cdw_sapp_custmer.json', format='json')
credit_df = spark_sesh.read.load('../../data/credit_card_dataset/cdw_sapp_credit.json', format='json')


Defining an API endpoint


In [6]:
LOAN_API_ENDPOINT = "https://raw.githubusercontent.com/platformps/LoanDataset/main/loan_data.json"


Creating a function to create a dataframe from an API Endpoint: <br>
 
The function takes an API Endpoint URL and the current live spark session, <br>
checks to see if the HTTP request is OK and returns a pyspark.sql DataFrame.

In [7]:
# API 

def api_check(api_endpoint: str, spark_session: pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:  # param :type -> output-type
    api = requests.get(LOAN_API_ENDPOINT)
    print(f"HTTP Status Code: {api.status_code}")
    if api.status_code == 200:
        api_df = spark_session.createDataFrame(api.json())
        return api_df



In [8]:
loan_df = api_check(LOAN_API_ENDPOINT, spark_sesh)

HTTP Status Code: 200


Let's check our DataFrames and make sure they are intact and ready for transformation

In [9]:
dataframe_dict = {}

dataframe_dict['branch'] = branch_df  # assign val to dict
dataframe_dict['credit'] = credit_df
dataframe_dict['customer'] = customer_df
dataframe_dict['loan'] = loan_df

In [10]:
for name, dataframe in dataframe_dict.items():  # k:v in (k, v)
    print(name)
    dataframe.printSchema()

branch
root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)

credit
root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)

customer
root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = tru

## Transform
---

We are given a mapping document, which tells us what kind of transformations we should make on the data before loading it onto the server. <br>
Let's take a look:

<div align='center'>
<h3>Tables</h3>
<h3>Customer Table</h3>
<img src='../../images/customer_mapping_doc.png' width=1200px>
<h3>Branch Table</h3>
<img src='../../images/branch_mapping_doc.png' width=1200px>
<h3>Credit-Card Table</h3>
<img src='../../images/creditcard_mapping_doc.png' width=1200px>
</div>

📝 Notes: <br>
We are not given any specifics for the loan information dataframe, so I'm not going to touch that dataframe or modify any data types. <br>

Furthermore, if you're following along, now would be a good time to split up your <br> views into several windows if you're using VS code or another similar IDE to <br> enhance your workflow, like this: <br>
<img src='../../images/workflow_example.png' width="1200px">

### Transforming the Customer Table 


Let's Start with Customer Table: <br>

📝Note: <br>
Just to make sure I'm following along and not making mistakes, <br>
Im using a small pandas dataframe to store the mapping document, so I can see what changes need to be made iterate through them, using pandas as a checklist of sorts.

This is completely optional, and not required at all, but I prefer it.

In [58]:
import pandas as pd
pd.set_option('display.max_colwidth', None) # ensuring pandas wont truncate col


In [59]:
customer_map = pd.read_clipboard()
customer_map


Unnamed: 0,Source Column Names,Mapping Logic,Target Table,Target Field names,Target DataType,Description
0,SSN,Direct Move,CDW_SAPP_CUSTOMER,SSN,VARCHAR,Social Security Number of the customer (National ID)
1,FIRST_NAME,Convert the Name to Title Case,CDW_SAPP_CUSTOMER,FIRST_NAME,VARCHAR,First Name of the Customer
2,MIDDLE_NAME,Convert the middle name in lower case,CDW_SAPP_CUSTOMER,MIDDLE_NAME,VARCHAR,Middle Name of the customer
3,LAST_NAME,Convert the Last Name in Title Case,CDW_SAPP_CUSTOMER,LAST_NAME,VARCHAR,Last Name of the customer
4,CREDIT_CARD_NO,Direct_move,CDW_SAPP_CUSTOMER,Credit_card_no,VARCHAR,Credit card number of customer
5,"STREET_NAME,APT_NO","Concatenate Apartment no and Street name of customer's Residence with comma as a seperator (Street, Apartment)",CDW_SAPP_CUSTOMER,FULL_STREET_ADDRESS,VARCHAR,Apartment no and Street name of customer's Residence
6,CUST_CITY,Direct Move,CDW_SAPP_CUSTOMER,CUST_CITY,VARCHAR,Customer’s Current City
7,CUST_STATE,Direct Move,CDW_SAPP_CUSTOMER,CUST_STATE,VARCHAR,Customer’s State code
8,CUST_COUNTRY,Direct move,CDW_SAPP_CUSTOMER,CUST_COUNTRY,VARCHAR,Customer’s country code
9,CUST_ZIP,Direct move,CDW_SAPP_CUSTOMER,CUST_ZIP,VARCHAR,Zip code of Customer's Country


In [60]:
customer_map['Mapping Logic'] = customer_map['Mapping Logic'].apply(lambda x: str(x).lower())
customer_map

Unnamed: 0,Source Column Names,Mapping Logic,Target Table,Target Field names,Target DataType,Description
0,SSN,direct move,CDW_SAPP_CUSTOMER,SSN,VARCHAR,Social Security Number of the customer (National ID)
1,FIRST_NAME,convert the name to title case,CDW_SAPP_CUSTOMER,FIRST_NAME,VARCHAR,First Name of the Customer
2,MIDDLE_NAME,convert the middle name in lower case,CDW_SAPP_CUSTOMER,MIDDLE_NAME,VARCHAR,Middle Name of the customer
3,LAST_NAME,convert the last name in title case,CDW_SAPP_CUSTOMER,LAST_NAME,VARCHAR,Last Name of the customer
4,CREDIT_CARD_NO,direct_move,CDW_SAPP_CUSTOMER,Credit_card_no,VARCHAR,Credit card number of customer
5,"STREET_NAME,APT_NO","concatenate apartment no and street name of customer's residence with comma as a seperator (street, apartment)",CDW_SAPP_CUSTOMER,FULL_STREET_ADDRESS,VARCHAR,Apartment no and Street name of customer's Residence
6,CUST_CITY,direct move,CDW_SAPP_CUSTOMER,CUST_CITY,VARCHAR,Customer’s Current City
7,CUST_STATE,direct move,CDW_SAPP_CUSTOMER,CUST_STATE,VARCHAR,Customer’s State code
8,CUST_COUNTRY,direct move,CDW_SAPP_CUSTOMER,CUST_COUNTRY,VARCHAR,Customer’s country code
9,CUST_ZIP,direct move,CDW_SAPP_CUSTOMER,CUST_ZIP,VARCHAR,Zip code of Customer's Country


In [61]:
customer_map['Mapping Logic'] = customer_map['Mapping Logic'].replace('direct_move', 'direct move')

In [62]:
customer_map['Target Field names'] = customer_map['Target Field names'].apply(lambda x: str(x).upper())
customer_map.head(2)

Unnamed: 0,Source Column Names,Mapping Logic,Target Table,Target Field names,Target DataType,Description
0,SSN,direct move,CDW_SAPP_CUSTOMER,SSN,VARCHAR,Social Security Number of the customer (National ID)
1,FIRST_NAME,convert the name to title case,CDW_SAPP_CUSTOMER,FIRST_NAME,VARCHAR,First Name of the Customer


Now I can see exactly what needs to be transformed. 

In [63]:
customer_transform_guide = customer_map[(customer_map['Mapping Logic'] != 'direct move')]
customer_transform_guide

Unnamed: 0,Source Column Names,Mapping Logic,Target Table,Target Field names,Target DataType,Description
1,FIRST_NAME,convert the name to title case,CDW_SAPP_CUSTOMER,FIRST_NAME,VARCHAR,First Name of the Customer
2,MIDDLE_NAME,convert the middle name in lower case,CDW_SAPP_CUSTOMER,MIDDLE_NAME,VARCHAR,Middle Name of the customer
3,LAST_NAME,convert the last name in title case,CDW_SAPP_CUSTOMER,LAST_NAME,VARCHAR,Last Name of the customer
5,"STREET_NAME,APT_NO","concatenate apartment no and street name of customer's residence with comma as a seperator (street, apartment)",CDW_SAPP_CUSTOMER,FULL_STREET_ADDRESS,VARCHAR,Apartment no and Street name of customer's Residence
10,CUST_PHONE,change the format of phone number to (xxx)xxx-xxxx,CDW_SAPP_CUSTOMER,CUST_PHONE,VARCHAR,Contact Number of the customer


In [64]:
customer_transform_guide.set_index('Source Column Names', inplace=True)

In [65]:
customer_transform_guide.filter(like='NAME', axis=0)

Unnamed: 0_level_0,Mapping Logic,Target Table,Target Field names,Target DataType,Description
Source Column Names,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
FIRST_NAME,convert the name to title case,CDW_SAPP_CUSTOMER,FIRST_NAME,VARCHAR,First Name of the Customer
MIDDLE_NAME,convert the middle name in lower case,CDW_SAPP_CUSTOMER,MIDDLE_NAME,VARCHAR,Middle Name of the customer
LAST_NAME,convert the last name in title case,CDW_SAPP_CUSTOMER,LAST_NAME,VARCHAR,Last Name of the customer
"STREET_NAME,APT_NO","concatenate apartment no and street name of customer's residence with comma as a seperator (street, apartment)",CDW_SAPP_CUSTOMER,FULL_STREET_ADDRESS,VARCHAR,Apartment no and Street name of customer's Residence


#### Transforming the Customers' Name

In [370]:
customer_df_backup = customer_df


In [23]:
import pyspark.sql.functions as F

In [371]:
customer_df = customer_df.withColumns({'FIRST_NAME':F.initcap(customer_df['FIRST_NAME']), 'LAST_NAME':F.initcap(customer_df['LAST_NAME'])})

In [372]:
customer_df = customer_df.withColumn('MIDDLE_NAME', F.lower(customer_df['MIDDLE_NAME']))

Confirming Transformation is correct

In [373]:
customer_df['FIRST_NAME', 'LAST_NAME', 'MIDDLE_NAME'].show(2)

+----------+---------+-----------+
|FIRST_NAME|LAST_NAME|MIDDLE_NAME|
+----------+---------+-----------+
|      Alec|   Hooper|         wm|
|      Etta|   Holman|    brendan|
+----------+---------+-----------+
only showing top 2 rows



#### Transforming the Address

The easiest way to do this is to generate a SQL temp-table that is attached to the current spark session. <br>
I'm operating on the temp-table with SQL instead of Python here and its much easier and cleaner.

In [374]:
customer_df.createOrReplaceGlobalTempView('customer')

In [375]:
customer_df = spark_sesh.sql("""
SELECT
    *,
    CONCAT(STREET_NAME, ", ", APT_NO) AS FULL_STREET_ADDRESS
FROM
    global_temp.customer
""")

📝Notes: <br>
It might be *expensive* to concatenate the columns apt_no and street_name in one dataframe and then include them *again*<br>
by using the `select *` statement and dropping the extraneous columns, but with this current dataset the operations are pretty fast, and I'd rather save time using `*` and `.drop()` than including each and every column manually. <br>

Either way, we still have more transformations left and pyspark's dataframes are immutable, so we generate a new copy of a dataframe every time.

In [376]:
customer_df[['APT_NO', 'STREET_NAME', 'FULL_STREET_ADDRESS']].show(2, truncate=False)

+------+-----------------+----------------------+
|APT_NO|STREET_NAME      |FULL_STREET_ADDRESS   |
+------+-----------------+----------------------+
|656   |Main Street North|Main Street North, 656|
|829   |Redwood Drive    |Redwood Drive, 829    |
+------+-----------------+----------------------+
only showing top 2 rows



In [377]:
customer_df = customer_df.drop('APT_NO', 'STREET_NAME')

The address is now concatenated and the other two columns are dropped.

In [378]:
customer_df[['FULL_STREET_ADDRESS']].show(2, truncate=False)

+----------------------+
|FULL_STREET_ADDRESS   |
+----------------------+
|Main Street North, 656|
|Redwood Drive, 829    |
+----------------------+
only showing top 2 rows



In [379]:
customer_df.printSchema()

root
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)



#### Transforming the  Customers' Phone number

In [66]:
customer_transform_guide.filter(like='PHONE', axis=0)

Unnamed: 0_level_0,Mapping Logic,Target Table,Target Field names,Target DataType,Description
Source Column Names,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
CUST_PHONE,change the format of phone number to (xxx)xxx-xxxx,CDW_SAPP_CUSTOMER,CUST_PHONE,VARCHAR,Contact Number of the customer


In [50]:
customer_df[['CUST_PHONE']].show(2)

+----------+
|CUST_PHONE|
+----------+
|   1237818|
|   1238933|
+----------+
only showing top 2 rows



Here we are only required to add an artificial area code and the parenthesis and a dash, so I'll do just that.


Using Pyspark.sql's  user-defined function this time:

In [380]:
phone_refactor = F.udf(lambda phone: "(555)" + str(phone)[:3] + "-" + str(phone)[3:])
# phone[0, 3) + phone[3:]
# note the slice includes the end of the string with [:]

In [381]:
customer_df = customer_df.withColumn('CUST_PHONE', phone_refactor(customer_df['CUST_PHONE']))

In [382]:
customer_df[['CUST_PHONE']].show(2)

+-------------+
|   CUST_PHONE|
+-------------+
|(555)123-7818|
|(555)123-8933|
+-------------+
only showing top 2 rows



Checking the types, you can see I transformed the phone number to a more-appropriate string datatype while modifying the phone number <br>
(It was previously long int type).

In [54]:
customer_df.dtypes

[('CREDIT_CARD_NO', 'string'),
 ('CUST_CITY', 'string'),
 ('CUST_COUNTRY', 'string'),
 ('CUST_EMAIL', 'string'),
 ('CUST_PHONE', 'string'),
 ('CUST_STATE', 'string'),
 ('CUST_ZIP', 'string'),
 ('FIRST_NAME', 'string'),
 ('LAST_NAME', 'string'),
 ('LAST_UPDATED', 'string'),
 ('MIDDLE_NAME', 'string'),
 ('SSN', 'bigint'),
 ('FULL_STREET_ADDRESS', 'string')]

In [383]:
customer_df = customer_df.withColumn('SSN', customer_df['SSN'].cast('string'))

In [76]:
customer_df[['LAST_UPDATED']].show(2, truncate=False)

+-----------------------------+
|LAST_UPDATED                 |
+-----------------------------+
|2018-04-21T12:49:02.000-04:00|
|2018-04-21T12:49:02.000-04:00|
+-----------------------------+
only showing top 2 rows



In [384]:
customer_df= customer_df.withColumn('LAST_UPDATED', customer_df['LAST_UPDATED'].cast('timestamp'))

In [385]:
customer_df[['LAST_UPDATED']].show(2, truncate=False)

+-------------------+
|LAST_UPDATED       |
+-------------------+
|2018-04-21 12:49:02|
|2018-04-21 12:49:02|
+-------------------+
only showing top 2 rows



#### Confirming Types for Customer One Last Time

Here I'm just comparing the expected vs actual datatypes and making sure they are at least somewhat similar  - pyspark.sql does not have all of the exact datatypes that most sql database engines have. 
<br><br>
Also, not to mention pyspark.sql datatypes don't really have the ability to set constraints on what goes into their columns. <br>
For example, I created a database where CUST_STATE is VARCHAR(2), and while VARCHAR(2) and VARCHAR(32) take up the exact <br>
same amount of disk space, VARCHAR(2) can potentially help save us from some data-entry headaches later down the line.

To look at my SQL table creation, click <a href='https://github.com/The-Captain-William/Per-Scholas-Capstone/blob/main/main/ETL/ETL_Part_1_building_the_database.ipynb'>here</a>. 

In [142]:
def compare_types(mapping_requirements: pd.DataFrame, column_names: str, dataframe: pyspark.sql.DataFrame) -> pd.DataFrame:
    """
    Mapping Requirements -> mapping requirements in the form of a dataframe\n
    Column Names -> Name of the column on the Mapping Requirements df that contains the column names\n
    Dataframe -> actual dataframe we are transforming, must be pyspark.sql df (for now)\n
    """
    mapping_column_names = ", ".join(name for name in mapping_requirements.columns)
    pyspark_df_column_names = ", ".join(name[0] for name in dataframe.dtypes)
    types_df = pd.DataFrame(dataframe.dtypes)  # make df of d-types
    dataframe_var_name_col = 'Current Column Names'
    dataframe_var_name_type = 'Current Column Types'
    types_df.columns = [dataframe_var_name_col, dataframe_var_name_type]  # set column names
    mapping_df = pd.merge(mapping_requirements, types_df, left_on=column_names, right_on=dataframe_var_name_col, how='outer')  # pd merge
    # print(f"Mapping Document Total Columns: {len(mapping_requirements.columns)}")
    # print(f"Mapping Document Columns: {mapping_column_names}")
    # print("")
    # print(f"DataFrame Total Columns: {len(dataframe.dtypes)}")
    # print(f"DataFrame Column Names: {pyspark_df_column_names}")
    return mapping_df 

In [386]:
compare_types(customer_map, 'Target Field names', customer_df)\
    [['Source Column Names', 'Mapping Logic', 'Target Field names', 'Current Column Names', 'Target DataType', 'Current Column Types',]]

Unnamed: 0,Source Column Names,Mapping Logic,Target Field names,Current Column Names,Target DataType,Current Column Types
0,SSN,direct move,SSN,SSN,VARCHAR,string
1,FIRST_NAME,convert the name to title case,FIRST_NAME,FIRST_NAME,VARCHAR,string
2,MIDDLE_NAME,convert the middle name in lower case,MIDDLE_NAME,MIDDLE_NAME,VARCHAR,string
3,LAST_NAME,convert the last name in title case,LAST_NAME,LAST_NAME,VARCHAR,string
4,CREDIT_CARD_NO,direct move,CREDIT_CARD_NO,CREDIT_CARD_NO,VARCHAR,string
5,"STREET_NAME,APT_NO","concatenate apartment no and street name of customer's residence with comma as a seperator (street, apartment)",FULL_STREET_ADDRESS,FULL_STREET_ADDRESS,VARCHAR,string
6,CUST_CITY,direct move,CUST_CITY,CUST_CITY,VARCHAR,string
7,CUST_STATE,direct move,CUST_STATE,CUST_STATE,VARCHAR,string
8,CUST_COUNTRY,direct move,CUST_COUNTRY,CUST_COUNTRY,VARCHAR,string
9,CUST_ZIP,direct move,CUST_ZIP,CUST_ZIP,VARCHAR,string


### Transforming the Branch Table

Let's take a look at the branch table:

In [170]:
branch_map = pd.read_clipboard()
", ".join(name for name in branch_map.columns)

'Source File Column Names, Mapping Logic, Target Table, Target Field names, Target DataType, Description'

In [172]:
branch_map = compare_types(branch_map, 'Target Field names', branch_df)
branch_map

Unnamed: 0,Source File Column Names,Mapping Logic,Target Table,Target Field names,Target DataType,Description,Current Column Names_x,Current Column Types_x,Current Column Names_y,Current Column Types_y
0,BRANCH_CODE,Direct move,cdw_sapp_branch,BRANCH_CODE,INT,Uniquely identifies a branch of the retail store,BRANCH_CODE,bigint,BRANCH_CODE,bigint
1,BRANCH_NAME,Direct move,cdw_sapp_branch,BRANCH_NAME,VARCHAR,Name of the Branch,BRANCH_NAME,string,BRANCH_NAME,string
2,BRANCH_STREET,Direct move,cdw_sapp_branch,BRANCH_STREET,VARCHAR,Street Address,BRANCH_STREET,string,BRANCH_STREET,string
3,BRANCH_CITY,Direct move,cdw_sapp_branch,BRANCH_CITY,VARCHAR,City name where the branch is located,BRANCH_CITY,string,BRANCH_CITY,string
4,BRANCH_STATE,Direct move,cdw_sapp_branch,BRANCH_STATE,VARCHAR,State name where the branch is located,BRANCH_STATE,string,BRANCH_STATE,string
5,BRANCH_ZIP,If the source value is null load default (99999) value else Direct move,cdw_sapp_branch,BRANCH_ZIP,VARCHAR,Zip postal code(default value 999999),BRANCH_ZIP,bigint,BRANCH_ZIP,bigint
6,BRANCH_PHONE,Change the format of phone number to (XXX)XXX-XXXX,cdw_sapp_branch,BRANCH_PHONE,VARCHAR,Phone number of the branch,BRANCH_PHONE,string,BRANCH_PHONE,string
7,LAST_UPDATED,Direct move,cdw_sapp_branch,LAST_UPDATED,TIMESTAMP,Record inserted / modification date.,LAST_UPDATED,string,LAST_UPDATED,string


In [191]:
branch_map['Mapping Logic'] = branch_map['Mapping Logic'].apply(lambda x: str(x).strip().lower())
branch_map[branch_map['Mapping Logic'] != 'direct move']

Unnamed: 0,Source File Column Names,Mapping Logic,Target Table,Target Field names,Target DataType,Description,Current Column Names_x,Current Column Types_x,Current Column Names_y,Current Column Types_y
5,BRANCH_ZIP,if the source value is null load default (99999) value else direct move,cdw_sapp_branch,BRANCH_ZIP,VARCHAR,Zip postal code(default value 999999),BRANCH_ZIP,bigint,BRANCH_ZIP,bigint
6,BRANCH_PHONE,change the format of phone number to (xxx)xxx-xxxx,cdw_sapp_branch,BRANCH_PHONE,VARCHAR,Phone number of the branch,BRANCH_PHONE,string,BRANCH_PHONE,string


In [254]:
branch_df_backup = branch_df

#### Transforming the Branch Phone Number

In [196]:
branch_df[['BRANCH_PHONE']].show(3)

+------------+
|BRANCH_PHONE|
+------------+
|  1234565276|
|  1234618993|
|  1234985926|
+------------+
only showing top 3 rows



In [197]:
len('1234565276')

10

In [198]:
branch_phone_refactor= F.udf(lambda phone_number: "(" + phone_number[:3] + ")" + phone_number[3:])
branch_df = branch_df.withColumn('BRANCH_PHONE', branch_phone_refactor(branch_df['BRANCH_PHONE']))
branch_df[['BRANCH_PHONE']].show(3)

+------------+
|BRANCH_PHONE|
+------------+
|(123)4565276|
|(123)4618993|
|(123)4985926|
+------------+
only showing top 3 rows



#### Transforming the Branch Zip

In [203]:
branch_map[['Mapping Logic', 'Source File Column Names']].iloc[[5]]

Unnamed: 0,Mapping Logic,Source File Column Names
5,if the source value is null load default (99999) value else direct move,BRANCH_ZIP


I don't think there are any NaN or Null values but I'm checking anyway

In [204]:
branch_df[['BRANCH_ZIP']].show(10)

+----------+
|BRANCH_ZIP|
+----------+
|     55044|
|     60142|
|     11419|
|     32068|
|     19406|
|      7501|
|     14534|
|      6109|
|     44070|
|      8844|
+----------+
only showing top 10 rows



In [226]:
branch_df[['BRANCH_ZIP']].createOrReplaceGlobalTempView('zip')

In [237]:
spark_sesh.sql("select * from global_temp.zip where branch_zip = null").show()

+----------+
|BRANCH_ZIP|
+----------+
+----------+



In [239]:
branch_df[['BRANCH_ZIP']].describe().show()

+-------+------------------+
|summary|        BRANCH_ZIP|
+-------+------------------+
|  count|               115|
|   mean|  38975.2347826087|
| stddev|23938.156819564818|
|    min|              2155|
|    max|             98908|
+-------+------------------+



In [245]:
branch_df = branch_df.fillna({'BRANCH_ZIP':99999})  # fillna can take dict {col:replacement value} and can take multiple k:v pairs
branch_df[['BRANCH_ZIP']].describe().show()

# this operation is potentially expensive
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.fillna.html?highlight=fillna

+-------+------------------+
|summary|        BRANCH_ZIP|
+-------+------------------+
|  count|               115|
|   mean|  38975.2347826087|
| stddev|23938.156819564818|
|    min|              2155|
|    max|             98908|
+-------+------------------+



#### Casting Correct Types for the Branch Zip

In [253]:
#branch_map.columns
branch_map[['Target Field names', 'Current Column Names_x', 'Target DataType', 'Current Column Types_y']]

Unnamed: 0,Target Field names,Current Column Names_x,Target DataType,Current Column Types_y
0,BRANCH_CODE,BRANCH_CODE,INT,bigint
1,BRANCH_NAME,BRANCH_NAME,VARCHAR,string
2,BRANCH_STREET,BRANCH_STREET,VARCHAR,string
3,BRANCH_CITY,BRANCH_CITY,VARCHAR,string
4,BRANCH_STATE,BRANCH_STATE,VARCHAR,string
5,BRANCH_ZIP,BRANCH_ZIP,VARCHAR,bigint
6,BRANCH_PHONE,BRANCH_PHONE,VARCHAR,string
7,LAST_UPDATED,LAST_UPDATED,TIMESTAMP,string


This time I'm using the `withColumns` method and using a dictionary instead of the `withColumn` method, <br>
Considering the bulk of the transformations are over with.

In [255]:
branch_df = branch_df.withColumns({'BRANCH_ZIP':branch_df['BRANCH_ZIP'].cast('string'),
                                   'LAST_UPDATED':branch_df['LAST_UPDATED'].cast('timestamp')})
branch_df.printSchema()

root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: string (nullable = false)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [257]:
branch_df[['LAST_UPDATED']].show(2)

+-------------------+
|       LAST_UPDATED|
+-------------------+
|2018-04-18 16:51:47|
|2018-04-18 16:51:47|
+-------------------+
only showing top 2 rows



### Transforming the Credit Card Table

Again, I'm just going to make a mapping table in pandas so I don't make mistakes. 

In [272]:
credit_map = pd.read_clipboard()
credit_map

Unnamed: 0,Source File Column Names,Mapping Logic,Target Field names,Target DataType,Description
0,CREDIT_CARD_NO,Direct Move,CUST_CC_NO,VARCHAR,Credit card number of customer
1,"DAY, MONTH, YEAR","Convert DAY, MONTH, and YEAR into a TIMEID (YYYYMMDD)",TIMEID,DATE,Surrogate key of the period(time) table
2,CUST_SSN,Direct Move,CUST_SSN,VARCHAR,Surrogate key of the customer table. Used to uniquely identify a row.
3,BRANCH_CODE,Direct Move,BRANCH_CODE,INT,Surrogate key of the branch table
4,TRANSACTION_TYPE,Direct Move,TRANSACTION_TYPE,VARCHAR,Transaction Type
5,TRANSACTION_VALUE,Direct Move,TRANSACTION_VALUE,DOUBLE,Customer’s Current City
6,TRANSACTION_ID,Direct move,TRANSACTION_ID,INT,Uniquely identifies a transaction


In [275]:
credit_df.dtypes

[('BRANCH_CODE', 'bigint'),
 ('CREDIT_CARD_NO', 'string'),
 ('CUST_SSN', 'bigint'),
 ('DAY', 'bigint'),
 ('MONTH', 'bigint'),
 ('TRANSACTION_ID', 'bigint'),
 ('TRANSACTION_TYPE', 'string'),
 ('TRANSACTION_VALUE', 'double'),
 ('YEAR', 'bigint')]

In [351]:
credit_df_backup = credit_df
#credit_df = credit_df_backup # 🔨 break in-case of emergency 🚨


#### Creating a DATE Column for the Transaction Table from three seperate columns


In [326]:
credit_df[['YEAR', 'MONTH', 'DAY']].show(4)

+----+-----+---+
|YEAR|MONTH|DAY|
+----+-----+---+
|2018|    2| 14|
|2018|    3| 20|
|2018|    7|  8|
|2018|    4| 19|
+----+-----+---+
only showing top 4 rows



I want to create strings of the y/m/d and then add a zero if the number is 1 through 9. <br>
This will make it easier for the last step when I want to convert the string to a date. 

In [328]:
add_zeroes_bby = F.udf(lambda num: "0" + str(num) if num <= 9 else str(num))

In [329]:
credit_df = credit_df.withColumns(
    {"YEAR":add_zeroes_bby(credit_df["YEAR"]),
     "MONTH":add_zeroes_bby(credit_df["MONTH"]),
     "DAY":add_zeroes_bby(credit_df["DAY"])}
)

In [331]:
credit_df.createOrReplaceGlobalTempView('credit')

In [333]:
credit_df[['YEAR', 'MONTH', 'DAY']].show(4)

+----+-----+---+
|YEAR|MONTH|DAY|
+----+-----+---+
|2018|   02| 14|
|2018|   03| 20|
|2018|   07| 08|
|2018|   04| 19|
+----+-----+---+
only showing top 4 rows



Making a temp table

In [336]:
credit_df = spark_sesh.sql("""
    SELECT 
        *, 
        CONCAT(YEAR, "-", MONTH, "-", DAY) AS TIMEID
    FROM
        global_temp.credit
""")

In [339]:
credit_df[['TIMEID']].show(2)

+----------+
|    TIMEID|
+----------+
|2018-02-14|
|2018-03-20|
+----------+
only showing top 2 rows



In [None]:
credit_df = credit_df.drop('YEAR', 'MONTH', 'DAY')


### Comparing Types and Mapping Table to Current Customer Table

The mapping table is a little janky and needs some transformation itself, lol.<br>

In [349]:
credit_map['Target Field names'] = credit_map['Target Field names'].apply(lambda x: str(x).strip())

You can see here everything looks pretty good, except we need to:
- Change CREDIT_CARD_NO to CUST_CC_NO
- Change CUST_SSN to a a string type
- Change our newly created TIMEID column type from string to date

In [350]:
compare_types(credit_map, 'Target Field names', credit_df)

Unnamed: 0,Source File Column Names,Mapping Logic,Target Field names,Target DataType,Description,Current Column Names,Current Column Types
0,CREDIT_CARD_NO,Direct Move,CUST_CC_NO,VARCHAR,Credit card number of customer,,
1,"DAY, MONTH, YEAR","Convert DAY, MONTH, and YEAR into a TIMEID (YYYYMMDD)",TIMEID,DATE,Surrogate key of the period(time) table,TIMEID,string
2,CUST_SSN,Direct Move,CUST_SSN,VARCHAR,Surrogate key of the customer table. Used to uniquely identify a row.,CUST_SSN,bigint
3,BRANCH_CODE,Direct Move,BRANCH_CODE,INT,Surrogate key of the branch table,BRANCH_CODE,bigint
4,TRANSACTION_TYPE,Direct Move,TRANSACTION_TYPE,VARCHAR,Transaction Type,TRANSACTION_TYPE,string
5,TRANSACTION_VALUE,Direct Move,TRANSACTION_VALUE,DOUBLE,Customer’s Current City,TRANSACTION_VALUE,double
6,TRANSACTION_ID,Direct move,TRANSACTION_ID,INT,Uniquely identifies a transaction,TRANSACTION_ID,bigint
7,,,,,,CREDIT_CARD_NO,string


Casting datatypes with our friend `withColumns`

In [364]:
credit_df = credit_df.withColumns(
    {"CREDIT_CARD_NO": credit_df['CREDIT_CARD_NO'].cast('string'),
     "TIMEID":credit_df['TIMEID'].cast('date'),
     "CUST_SSN":credit_df['CUST_SSN'].cast('string')}
)



Renaming the `CREDIT_CARD_NO` column 

In [365]:
credit_df = credit_df.withColumnRenamed('CREDIT_CARD_NO', 'CUST_CC_NO')

In [366]:
credit_df.printSchema()

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CUST_CC_NO: string (nullable = true)
 |-- CUST_SSN: string (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- TIMEID: date (nullable = true)



In [367]:
compare_types(credit_map, 'Target Field names', credit_df)

Unnamed: 0,Source File Column Names,Mapping Logic,Target Field names,Target DataType,Description,Current Column Names,Current Column Types
0,CREDIT_CARD_NO,Direct Move,CUST_CC_NO,VARCHAR,Credit card number of customer,CUST_CC_NO,string
1,"DAY, MONTH, YEAR","Convert DAY, MONTH, and YEAR into a TIMEID (YYYYMMDD)",TIMEID,DATE,Surrogate key of the period(time) table,TIMEID,date
2,CUST_SSN,Direct Move,CUST_SSN,VARCHAR,Surrogate key of the customer table. Used to uniquely identify a row.,CUST_SSN,string
3,BRANCH_CODE,Direct Move,BRANCH_CODE,INT,Surrogate key of the branch table,BRANCH_CODE,bigint
4,TRANSACTION_TYPE,Direct Move,TRANSACTION_TYPE,VARCHAR,Transaction Type,TRANSACTION_TYPE,string
5,TRANSACTION_VALUE,Direct Move,TRANSACTION_VALUE,DOUBLE,Customer’s Current City,TRANSACTION_VALUE,double
6,TRANSACTION_ID,Direct move,TRANSACTION_ID,INT,Uniquely identifies a transaction,TRANSACTION_ID,bigint


In [369]:
credit_df[['TIMEID']].show(2)

+----------+
|    TIMEID|
+----------+
|2018-02-14|
|2018-03-20|
+----------+
only showing top 2 rows



We are DONE transforming, its time to Load!

## Load

In [392]:
customer_df_backup = customer_df
branch_df_backup = branch_df
credit_df_backup = credit_df

Here Im assigning dataframes to a dictionary and passing them as such. If the dataset was really huge I'd probably pass them a different way.

In [394]:
dataframe_dict = {}
dataframe_dict['cdw_sapp_customer'] = customer_df
dataframe_dict['cdw_sapp_branch'] = branch_df
dataframe_dict['cdw_sapp_credit_card'] = credit_df
dataframe_dict['cdw_sapp_loan_application'] = loan_df

In [None]:
from pyspark.sql import DataFrameWriter 


In [407]:
def push_to_server(dictionary: dict, mode='append'):
    for table_name, data_frame in dictionary.items():
        data_frame.write.jdbc(
            url='jdbc:mysql://localhost:3306/db_capstone',
            table=table_name,
            mode=mode,
            properties={"user":db_user, "password":db_password, "driver":"com.mysql.cj.jdbc.Driver"}
)



In [408]:
push_to_server(dataframe_dict, 'append')

📝 Notes:
- Be mindful that 'overwrite' will not just overwrite the data in your SQL server, but the data TYPES as well, <br>
causing potential performance losses and wasted disk space if your SQL database tables have already been optimized. 

Here is a snapshot of the final result!
<div>
<img src='../../images/loading_pyspark_dataframes.png'>
</div>

Let's not forget to close our pyspark connection!

In [409]:
spark_sesh.stop()