# Project Title
### Data Engineering Capstone Project
I am doing this project based on data that I have put together. I will describe the data and its sources in Step 1.

#### Project Summary
InvestSure is an investment company that manages the retirement accounts of employees of its customers. It gets a dump of many data elements in CSV format from transactional systems and has been using Excel to load this data for analysis. However, the data has now grown to a size where this approach is no longer viable. Therefore, InvestSure has hired me as a Data Engineer to analyze this data, cleanse it, build a conceptual model for analytical use of the data and load the data from Excel files into the analytical tables. InvestSure has also requested me to provide them with typical queries that they could run on this analytical model to gain insights into this data.

The project follows the steps listed below:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [74]:
# Do all imports and installs here
import pandas as pd
import re
import os
import sys
import glob
import configparser
from datetime import datetime, timedelta, date
from dateutil import parser
from pyspark.sql import SparkSession
from pyspark.sql import types as t
from pyspark.sql.functions import udf, col, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, unix_timestamp, from_unixtime
from pyspark.sql.types import *

# To suppress numeric values from being returned in exponential format
pd.options.display.float_format = '{:20,.2f}'.format

# suppress warnings from final output
import warnings
warnings.simplefilter('ignore')

# Read project data configuration entries
config = configparser.ConfigParser()
config.read_file(open('capstone_project_data.cfg'))


### Step 1: Scope the Project and Gather Data

#### Scope 
The scope of this project is to analyze the data provided by transactional systems, cleanse the data if needed and load it into an analytical data model to facilitate querying the data.

#### Describe and Gather Data 
In this section, I will describe the data including its source.

- txn.csv (fact)
    - Structure
        - txn_id
        - txn_date
        - contact_id
        - product_id
        - sales
        - redemptions
    - Source: InvestSure's transactional system captured by its trading application
    - Feed frequency: Daily

- customer.csv (dimension)
    - Structure
        - customer_id
        - customer_name
        - sector
    - Source: InvestSure's CRM system
    - Feed Frequency: Daily

- contact.csv (dimenstion)
    - Structure
        - contact_id
        - first_name
        - last_name
        - city
        - state_code
        - zip
        - country
        - latitude
        - longitude
        - customer_id
        - status
        - opportunity
    - Source: InvestSure's CRM system
    - Feed Frequency: Daily

- product.json (dimension)
    - Structure
        - product_id
        - product_name
        - tna
        - ms_rating
        - exp_ratio
        - market_cap
    - Source: Yahoo Finance
    - Feed Frequency: Daily

- sec_codes.csv (mapping table)
    - Structure
        - code
        - description
    - Source: Yahoo Finance provides description but InvestSure's systems use abbreviated codes
    - Feed Frequency: On demand and when new customers are added

- state.csv
    - Structure
        - state_code
        - state
        - region
    - Source: US Census Board
    - Feed Frequency: one time


#### Describe and Gather Data 
In the next few cells, I will gather the data and display samples for a first look at the data

In [75]:
# Loading the input data into Pandas dataframes
pd_df_customer = pd.read_csv(config['LOCAL']['INPUT_DATA_CUSTOMER'], encoding = 'ISO-8859-1')
pd_df_contact = pd.read_csv(config['LOCAL']['INPUT_DATA_CONTACT'], encoding = 'ISO-8859-1')
pd_df_product = pd.read_json(config['LOCAL']['INPUT_DATA_PRODUCT'])
pd_df_sec_codes = pd.read_csv(config['LOCAL']['INPUT_DATA_SECTOR'], encoding = 'ISO-8859-1')
pd_df_state = pd.read_csv(config['LOCAL']['INPUT_DATA_STATE'], encoding = 'ISO-8859-1')
pd_df_txn = pd.read_csv(config['LOCAL']['INPUT_DATA_TXN'], encoding = 'ISO-8859-1')

In [76]:
# Now let us look at some samples of the data that I loaded
pd_df_customer.head()

Unnamed: 0,customer_id,customer_name,sector
0,450056063,B.W.E CUSTOM CONSTRUCTION LLC,HC
1,450056064,SERGEY NIZHEGORODTSEV PUBLISHING LLC,CS
2,450056066,SUNRISE ANDOVER LLC,RE
3,450056067,474 CENTRAL BOULEVARD LLC,FS
4,450056068,ELITE FINISHES LLC,TECH


In [77]:
pd_df_contact.head()

Unnamed: 0,contact_id,first_name,last_name,city,state_code,zip,country,latitude,longitude,customer_id,status,opportunity
0,100000339,Lyndy,Chachas,Omaha,NE,68130,USA,41.23,-96.18,450058148,Active,50000
1,100001423,Watts,Eifenstadt,Weston,FL,33326,USA,26.1,-80.36,450059017,Active,50000
2,100001837,Jingfeng,Lopina,Hunt Valley,MD,21030,USA,39.5,-76.67,450059076,Active,125000
3,100002544,Gaynell,Vivrett,Beloit,WI,53511,USA,42.5,-89.04,450057762,Active,16000
4,100002551,Peregrino,Valles,New York,NY,10036,USA,40.76,-73.98,450055995,Active,150000


In [78]:
pd_df_product.head()

Unnamed: 0,exp_ratio,market_cap,ms_rating,product_id,product_name,tna
0,0.05,Giant,5,VFIAX,Vanguard 500 Index Admiral,163456368456
1,0.05,Giant,4,VTSAX,Vanguard Total Stock Mkt Idx Adm,136131758268
2,0.04,Giant,5,VINIX,Vanguard Institutional Index I,110407917518
3,0.16,Giant,4,VTSMX,Vanguard Total Stock Mkt Idx Inv,98869371846
4,0.02,Giant,5,VIIIX,Vanguard Institutional Index Instl Pl,93192353649


In [79]:
pd_df_sec_codes.head()

Unnamed: 0,code,description
0,FS,Financial Services
1,RE,Real Estate
2,HC,Healthcare
3,UT,Utilities
4,CS,Communication Services


In [80]:
pd_df_state.head()

Unnamed: 0,state_code,state,region
0,AL,Alabama,Southern
1,AK,Alaska,Pacific
2,AZ,Arizona,Pacific
3,AR,Arkansas,Southern
4,CA,California,Pacific


In [81]:
pd_df_txn.head()

Unnamed: 0,txn_id,txn_date,contact_id,product_id,sales,redemptions
0,422909780,1/2/2015,992808564,VIVAX,46892.19,0.0
1,422909781,1/2/2015,261785827,SOPAX,0.0,-33424.78
2,422909782,1/2/2015,389127962,BAICX,14230.85,0.0
3,422909783,1/2/2015,101692476,SGROX,94046.93,0.0
4,422909784,1/2/2015,327754553,FXSIX,22038.86,0.0


### Step 2: Explore and Assess the Data
#### Step 2.1: Explore the Data 
Most of the data from the sources is clean as it comes from application systems which have curated the data well. However, there are a few issues with the data:

- Transaction Data: 
    - This has two columns - one for sales of mutual funds and another for redemptions. For any given transaction, only one of these values is non-zero which means that a transaction is either a sale or a redemption. While this makes it easy for sales and redemptions to be summed up independently, this is not a good model because you will not be able to get Net Sales for any given period without summing up each of those columns and aggregating them. Also, everytime we have to infer the transaction type, we will have to look at one column or another.
    - Also, the TXN_DATE column is a string representation of date and needs to be converted to a date representation so that time series analysis can be done on this data

- Customer Data: Each customer in the customer data belongs to a sector. Unfortunately, the CRM system maintains a sector code (an abbreviation of the sector) against each customer and has given us a mapping of those codes to names. This degree of normalization is not necessary for our analytical purposes as the only place where we are using this map is in customer data. 


#### Step 2.2: Set up the first part of the data pipleline
In order to perform the data cleaning listed above, I would first like to load the data as is into Spark Dataframes. This will be the first step in setting up a data pipeline. Although this step truly belongs in Step 4: Run ETL to Model the Data, I feel that doing it at this stage is crucial.

In [98]:
# Create a Spark Session
spark = SparkSession \
    .builder \
    .config('spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0') \
    .getOrCreate()


In [99]:
# Read CUSTOMER data
df_customer = spark.read.format('csv').options(header='true', inferSchema='true').load(config['LOCAL']['INPUT_DATA_CUSTOMER'])

df_customer.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- sector: string (nullable = true)



In [100]:
# Display sample rows from CUSTOMER data
df_customer.show(n=5)

+-----------+--------------------+------+
|customer_id|       customer_name|sector|
+-----------+--------------------+------+
|  450056063|B.W.E CUSTOM CONS...|    HC|
|  450056064|SERGEY NIZHEGOROD...|    CS|
|  450056066| SUNRISE ANDOVER LLC|    RE|
|  450056067|474 CENTRAL BOULE...|    FS|
|  450056068|  ELITE FINISHES LLC|  TECH|
+-----------+--------------------+------+
only showing top 5 rows



In [101]:
# Read CONTACT data
df_contact = spark.read.format('csv').options(header='true', inferSchema='true').load(config['LOCAL']['INPUT_DATA_CONTACT'])

df_contact.printSchema()


root
 |-- contact_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- opportunity: integer (nullable = true)



In [102]:
# Display sample rows from CONTACT data
df_contact.show(n=5)

+----------+----------+----------+-----------+----------+-----+-------+--------+---------+-----------+------+-----------+
|contact_id|first_name| last_name|       city|state_code|  zip|country|latitude|longitude|customer_id|status|opportunity|
+----------+----------+----------+-----------+----------+-----+-------+--------+---------+-----------+------+-----------+
| 100000339|     Lyndy|   Chachas|      Omaha|        NE|68130|    USA|41.22962| -96.1815|  450058148|Active|      50000|
| 100001423|     Watts|Eifenstadt|     Weston|        FL|33326|    USA|26.09966|-80.36497|  450059017|Active|      50000|
| 100001837|  Jingfeng|    Lopina|Hunt Valley|        MD|21030|    USA|39.50004|-76.66566|  450059076|Active|     125000|
| 100002544|   Gaynell|   Vivrett|     Beloit|        WI|53511|    USA|42.49625|-89.03702|  450057762|Active|      16000|
| 100002551| Peregrino|    Valles|   New York|        NY|10036|    USA|40.75841|-73.98155|  450055995|Active|     150000|
+----------+----------+-

In [103]:
# Read PRODUCT data (note that this data is in JSON format)
df_product = spark.read.format('json').options(header='true', inferSchema='true', multiline='true').load(config['LOCAL']['INPUT_DATA_PRODUCT'])

df_product.printSchema()

root
 |-- exp_ratio: double (nullable = true)
 |-- market_cap: string (nullable = true)
 |-- ms_rating: long (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- tna: long (nullable = true)



In [104]:
# Display sample rows from PRODUCT data
df_product.show(n=5)

+---------+----------+---------+----------+--------------------+------------+
|exp_ratio|market_cap|ms_rating|product_id|        product_name|         tna|
+---------+----------+---------+----------+--------------------+------------+
|     0.05|     Giant|        5|     VFIAX|Vanguard 500 Inde...|163456368456|
|     0.05|     Giant|        4|     VTSAX|Vanguard Total St...|136131758268|
|     0.04|     Giant|        5|     VINIX|Vanguard Institut...|110407917518|
|     0.16|     Giant|        4|     VTSMX|Vanguard Total St...| 98869371846|
|     0.02|     Giant|        5|     VIIIX|Vanguard Institut...| 93192353649|
+---------+----------+---------+----------+--------------------+------------+
only showing top 5 rows



In [105]:
# Read SECTOR CODES data
df_sec_codes = spark.read.format('csv').options(header='true', inferSchema='true').load(config['LOCAL']['INPUT_DATA_SECTOR'])

df_sec_codes.printSchema()


root
 |-- code: string (nullable = true)
 |-- description: string (nullable = true)



In [106]:
# Display sample rows from SECTOR CODE data
df_sec_codes.show(n=5)

+----+--------------------+
|code|         description|
+----+--------------------+
|  FS|  Financial Services|
|  RE|         Real Estate|
|  HC|          Healthcare|
|  UT|           Utilities|
|  CS|Communication Ser...|
+----+--------------------+
only showing top 5 rows



In [107]:
# Read STATE data
df_state = spark.read.format('csv').options(header='true', inferSchema='true').load(config['LOCAL']['INPUT_DATA_STATE'])

df_state.printSchema()


root
 |-- state_code: string (nullable = true)
 |-- state: string (nullable = true)
 |-- region: string (nullable = true)



In [108]:
# Display sample rows from STATE data
df_state.show(n=5)

+----------+----------+--------+
|state_code|     state|  region|
+----------+----------+--------+
|        AL|   Alabama|Southern|
|        AK|    Alaska| Pacific|
|        AZ|   Arizona| Pacific|
|        AR|  Arkansas|Southern|
|        CA|California| Pacific|
+----------+----------+--------+
only showing top 5 rows



In [109]:
# Read TRANSACTION data
df_txn = spark.read.format('csv').options(header='true', inferSchema='true').load(config['LOCAL']['INPUT_DATA_TXN'])

df_txn.printSchema()

root
 |-- txn_id: integer (nullable = true)
 |-- txn_date: string (nullable = true)
 |-- contact_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- redemptions: double (nullable = true)



In [110]:
# Display sample rows from TRANSACTION data
df_txn.show(n=5)

+---------+--------+----------+----------+---------+-----------+
|   txn_id|txn_date|contact_id|product_id|    sales|redemptions|
+---------+--------+----------+----------+---------+-----------+
|422909780|1/2/2015| 992808564|     VIVAX|46892.193|        0.0|
|422909781|1/2/2015| 261785827|     SOPAX|      0.0| -33424.776|
|422909782|1/2/2015| 389127962|     BAICX|14230.848|        0.0|
|422909783|1/2/2015| 101692476|     SGROX|94046.931|        0.0|
|422909784|1/2/2015| 327754553|     FXSIX|22038.856|        0.0|
+---------+--------+----------+----------+---------+-----------+
only showing top 5 rows



#### Step 2.3: Cleaning Steps
- Transaction Data: 
    - I propose to merge the two columns (sales and redemptions) into one and call it "txn_amount" and add a new column called "txn_type" which will have the value SALE or REDEMPTION. This makes analyses much easier.
    - I am also planning to convert the string representation of TXN_DATE into a date representation so that we can run time series analysis on this data

- Customer Data: I am planning to replace the sector code in customer data with the sector name as that will be more meaningful for analyses and will avoid the necessity to do an extra join everytime we want to do sector analysis.

- Calendar Data: Finally, I would like to break down the components of the transaction date into its constituent elements (such year, month, quarter, etc.) and store it in its own table. This will help us do time series analyses on the transaction data.


In [111]:
# Clean up Transaction table by combining sales and redemptions into a new column called txn_amount, adding a txn_type and dropping the original sales and redemptions columns
# We will begin by defining a UDF to determine txn_type
def get_txn_type(value):
  if   value < 0: 
      return 'REDEMPTION'
  else:
      return 'SALE'
    
# Convert to a UDF Function by passing in the function and return type of function
udf_get_txn_type = udf(get_txn_type, StringType())

# Add txn_amount column by adding up values in sales and redemptions columns
# This is ok because only one of the columns has a value for any given transaction
df_txn = df_txn.withColumn('txn_amount', col('sales') + col('redemptions'))

# Add txn_type column using the UDF to determine based on the txn_amount
df_txn = df_txn.withColumn('txn_type', udf_get_txn_type(col('txn_amount')))

# Convert the string representation of TXN_DATE to a date representation
df_txn = df_txn.withColumn('txn_date', from_unixtime(unix_timestamp(col('txn_date'), 'MM/dd/yyyy')))

# Finally, let us drop the sales and redemptions columns
columns_to_drop = ['sales', 'redemptions']
df_txn = df_txn.drop(*columns_to_drop)

df_txn.show()


+---------+-------------------+----------+----------+----------+----------+
|   txn_id|           txn_date|contact_id|product_id|txn_amount|  txn_type|
+---------+-------------------+----------+----------+----------+----------+
|422909780|2015-01-02 00:00:00| 992808564|     VIVAX| 46892.193|      SALE|
|422909781|2015-01-02 00:00:00| 261785827|     SOPAX|-33424.776|REDEMPTION|
|422909782|2015-01-02 00:00:00| 389127962|     BAICX| 14230.848|      SALE|
|422909783|2015-01-02 00:00:00| 101692476|     SGROX| 94046.931|      SALE|
|422909784|2015-01-02 00:00:00| 327754553|     FXSIX| 22038.856|      SALE|
|422909785|2015-01-02 00:00:00| 589519954|     FKUTX|   3853.98|      SALE|
|422909786|2015-01-02 00:00:00| 120792413|     AMECX|-53742.108|REDEMPTION|
|422909787|2015-01-02 00:00:00| 648471340|     NICSX|-33163.295|REDEMPTION|
|422909788|2015-01-02 00:00:00| 483200396|     REREX| 39700.248|      SALE|
|422909789|2015-01-02 00:00:00| 719279386|     SHRAX|  30871.19|      SALE|
|422909790|2

In [112]:
# Replace the sector code in customer data with the sector name from df_sec_codes by joining them
df_customer = df_sec_codes.join(df_customer, df_customer.sector == df_sec_codes.code, 'inner') \
    .distinct() \
    .select(col('customer_id'), col('customer_name'), col('description')) \
    .selectExpr('customer_id as customer_id', 'customer_name as customer_name', 'description as sector')
df_customer.show()


+-----------+--------------------+--------------------+
|customer_id|       customer_name|              sector|
+-----------+--------------------+--------------------+
|  450056095|PASSION FOR CARE LLC|Communication Ser...|
|  450056875|MUSSZETT THE POET...|  Financial Services|
|  450056528|SHOP CITY DREAMS LLC|         Real Estate|
|  450056203|DUNCAN'S CONSTRUC...|Communication Ser...|
|  450059400|RICH RHEE INTERAC...|          Healthcare|
|  450057029|273 NORTH 1ST STR...|          Technology|
|  450057381|ARTYS WINDOW TREA...|         Real Estate|
|  450057827|      OMBRE PRET LLC|  Financial Services|
|  450058574|ALFA DIGESTIVE DI...|Communication Ser...|
|  450059023|322 RAMAPO VALLEY...|          Healthcare|
|  450058304|M.I.T.O.S CONSULT...|         Real Estate|
|  450055988|INNERSOULDEVELOPM...|           Utilities|
|  450056384|     VETS PA 360 LLC|          Healthcare|
|  600429450|MONMOUTH TRUCK - ...|          Healthcare|
|  450056015|J&T FORMULA SERVI...|  Financial Se

In [113]:
# Extract date components such as day, month, year into its own dataframe
# Let us begin with getting a distinct list of dates from df_txn
df_calendar = df_txn \
    .distinct() \
    .select(col('txn_date')) \
    .selectExpr('txn_date as calendar_date')

# Let us drop the duplicate dates from the CALENDAR
df_calendar = df_calendar.drop_duplicates()

# Extract date components into separate columns
df_calendar = df_calendar.withColumn('day', dayofmonth('calendar_date'))
df_calendar = df_calendar.withColumn('month', month('calendar_date'))
df_calendar = df_calendar.withColumn('year', year('calendar_date'))
df_calendar = df_calendar.withColumn('week', weekofyear('calendar_date'))
df_calendar = df_calendar.withColumn('weekday', dayofweek('calendar_date'))

df_calendar.show()


+-------------------+---+-----+----+----+-------+
|      calendar_date|day|month|year|week|weekday|
+-------------------+---+-----+----+----+-------+
|2015-04-20 00:00:00| 20|    4|2015|  17|      2|
|2015-05-26 00:00:00| 26|    5|2015|  22|      3|
|2015-06-23 00:00:00| 23|    6|2015|  26|      3|
|2016-04-29 00:00:00| 29|    4|2016|  17|      6|
|2017-05-08 00:00:00|  8|    5|2017|  19|      2|
|2019-05-24 00:00:00| 24|    5|2019|  21|      6|
|2020-01-24 00:00:00| 24|    1|2020|   4|      6|
|2020-05-28 00:00:00| 28|    5|2020|  22|      5|
|2016-05-04 00:00:00|  4|    5|2016|  18|      4|
|2016-10-05 00:00:00|  5|   10|2016|  40|      4|
|2015-03-31 00:00:00| 31|    3|2015|  14|      3|
|2017-02-22 00:00:00| 22|    2|2017|   8|      4|
|2017-04-18 00:00:00| 18|    4|2017|  16|      3|
|2017-04-21 00:00:00| 21|    4|2017|  16|      6|
|2017-05-18 00:00:00| 18|    5|2017|  20|      5|
|2017-10-27 00:00:00| 27|   10|2017|  43|      6|
|2018-05-16 00:00:00| 16|    5|2018|  20|      4|


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
My conceptual data model will be a star schema with a single fact (transactions) and multiple dimensions that are referenced by that fact. The entities in this conceptual model are as follows:

- Transaction (Fact)
    - Contact (Dimension) linked to Transaction by CONTACT_ID
    
    - Customer (Dimension) linked to Contact by CUSTOMER_ID and indirectly linked to Transaction through the Contact
    
    - Product (Dimension) linked to Transaction by PRODUCT_ID
    
    - State (Dimension) linked to Contact by STATE_CODE  and indirectly linked to Transaction through Contact
    
    - Time (Dimension) linked to Transaction by TXN_DATE=>CALENDAR_DATE

The conceptual model is captured in an ER Diagram here: https://r766466c839826xjupyterlnnfq3jud.udacity-student-workspaces.com/lab/tree/images/DB_ERD_Capstone_Project_InvestSure.png

The data elements of each of these entities is listed below:

- Transaction (fact)
    - txn_id
    - txn_date
    - contact_id
    - product_id
    - txn_amount
    - txn_type
    
- Customer (dimension)
    - customer_id
    - customer_name
    - sector
    
- Contact (dimension)
    - contact_id
    - first_name
    - last_name
    - city
    - state_code
    - zip
    - country
    - latitude
    - longitude
    - customer_id
    - status
    - opportunity
    
- State (dimension)
    - state_code
    - state
    - region
    
- Product (dimension)
    - product_id
    - product_name
    - exp_ratio
    - market_cap
    - ms_rating
    - tna

- Caledar (dimension)
    - calendar_date
    - day
    - month
    - year
    - week
    - weekday
    
All the data will be stored in the destination in *parquet* format. Additionally, Transaction data will also be partitioned by Year and Month of the Transaction Date.


#### 3.2 Mapping Out Data Pipelines
The code snippets above have already loaded all the source data into Spark Dataframes. Additionally, the data cleaning steps above have combined and transformed this data as per suggestions in the Cleaning Steps into structures that could be persisted into tables and stored in the destination as per our Conceptual Model discussed in this section.

The flow of data pipeline is as shown below:
1. Load source data without modifications into Spark dataframes (completed in Step 2.2)
2. Clean data in these dataframes (completed as per suggestions in Step 2.3)
3. Load data from the dataframes into *parquet* tables doing the necessary transformations as defined in Step 2.3

We will be doing Step 3.2.3 in the cells under Step 4.1 - one section for each target table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [114]:
# Create CUSTOMER table
customer_columns = ['customer_id', 'customer_name', 'sector']

# Write CUSTOMER table to parquet file
customer_table = df_customer.select(customer_columns)
customer_table.write.mode('overwrite').parquet(config['LOCAL']['OUTPUT_DATA_CUSTOMER'])

In [115]:
# Create CONTACT table
contact_columns = ['contact_id', 'first_name', 'last_name', 'city', 'state_code', 'zip', 'country', 'latitude', 'longitude', 'customer_id', 'status', 'opportunity']

# Write CONTACT table to parquet file
contact_table = df_contact.select(contact_columns)
contact_table.write.mode('overwrite').parquet(config['LOCAL']['OUTPUT_DATA_CONTACT'])

In [116]:
# Create STATE table
state_columns = ['state_code', 'state', 'region']

# Write STATE table to parquet file
state_table = df_state.select(state_columns)
state_table.write.mode('overwrite').parquet(config['LOCAL']['OUTPUT_DATA_STATE'])

In [117]:
# Create PRODUCT table
product_columns = ['product_id', 'product_name', 'exp_ratio', 'market_cap', 'ms_rating', 'tna']

# Write PRODUCT table to parquet file
product_table = df_product.select(product_columns)
product_table.write.mode('overwrite').parquet(config['LOCAL']['OUTPUT_DATA_PRODUCT'])


In [118]:
# Create CALENDAR table
calendar_columns = ['calendar_date', 'day', 'month', 'year', 'week', 'weekday']

# Write CALENDAR table to parquet file
calendar_table = df_calendar.select(calendar_columns)
calendar_table.write.mode('overwrite').partitionBy('year', 'month').parquet(config['LOCAL']['OUTPUT_DATA_CALENDAR'])


In [119]:
# Create txn table
txn_columns = ['txn_id', 'txn_date', 'contact_id', 'product_id', 'txn_amount', 'txn_type']

# Write TXN table to parquet file
txn_table = df_txn.join(df_calendar, df_calendar.calendar_date == df_txn.txn_date, 'inner') \
    .select(col('txn_id'), col('txn_date'), col('contact_id'), col('product_id'), col('txn_amount'), col('txn_type'), col('year'), col('month'))
txn_table.write.mode('overwrite').partitionBy('year', 'month').parquet(config['LOCAL']['OUTPUT_DATA_TXN'])


#### 4.2 Data Quality Checks
I would like to implement the following data quality checks:
- Completeness Checks
    - Match row counts between source tables and analytical tables (after accounting for duplicate removal)
    - Confirm that CALENDAR and TXN tables have been partitioned by Year and Month (visual inspection)
    
- Data Integrity Checks
    - All foreign key values in analytical tables are matched by corresponding primary key values in the parent tables
 
These quality checks will be run by the code in the cells in this section

In [135]:
# Completeness Checks - matching row counts

# CUSTOMER table
assert len(pd_df_customer.index) == spark.read.parquet(config['LOCAL']['OUTPUT_DATA_CUSTOMER']).count()
print('Row counts for CUSTOMER table matched between source and analytical tables')


Row counts for CUSTOMER table matched between source and analytical tables


In [136]:
# CONTACT table
assert len(pd_df_contact.index) == spark.read.parquet(config['LOCAL']['OUTPUT_DATA_CONTACT']).count()
print('Row counts for CONTACT table matched between source and analytical tables')


Row counts for CONTACT table matched between source and analytical tables


In [137]:
# PRODUCT table
assert len(pd_df_product.index) == spark.read.parquet(config['LOCAL']['OUTPUT_DATA_PRODUCT']).count()
print('Row counts for PRODUCT table matched between source and analytical tables')


Row counts for PRODUCT table matched between source and analytical tables


In [138]:
# STATE table
assert len(pd_df_state.index) == spark.read.parquet(config['LOCAL']['OUTPUT_DATA_STATE']).count()
print('Row counts for STATE table matched between source and analytical tables')


Row counts for STATE table matched between source and analytical tables


In [141]:
# CALENDAR table
assert len(pd.unique(pd_df_txn['txn_date'])) == spark.read.parquet(config['LOCAL']['OUTPUT_DATA_CALENDAR']).count()
print('Row counts for CALENDAR table matched between source and analytical tables')


Row counts for STATE table matched between source and analytical tables


In [142]:
# TXN table
assert len(pd_df_txn.index) == spark.read.parquet(config['LOCAL']['OUTPUT_DATA_TXN']).count()
print('Row counts for TXN table matched between source and analytical tables')


Row counts for TXN table matched between source and analytical tables


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.