# Project Title
### Data Engineering Capstone Project
I am doing this project based on data that I have put together. I will describe the data and its sources in Step 1.

#### Project Summary
InvestSure is an investment company that manages the retirement accounts of employees of its customers. It gets a dump of many data elements in CSV format from transactional systems and has been using Excel to load this data for analysis. However, the data has now grown to a size where this approach is no longer viable. Therefore, InvestSure has hired me as a Data Engineer to analyze this data, cleanse it, build a conceptual model for analytical use of the data and load the data from Excel files into the analytical tables. InvestSure has also requested me to provide them with typical queries that they could run on this analytical model to gain insights into this data.

The project follows the steps listed below:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Do all imports and installs here
import pandas as pd
import re
import os
import sys
import glob
import configparser
from datetime import datetime, timedelta, date
from dateutil import parser
from pyspark.sql import SparkSession
from pyspark.sql import types as t
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear

# To suppress numeric values from being returned in exponential format
pd.options.display.float_format = '{:20,.2f}'.format

# suppress warnings from final output
import warnings
warnings.simplefilter("ignore")

# Read project data configuration entries
config = configparser.ConfigParser()
config.read_file(open('capstone_project_data.cfg'))


src_data/


### Step 1: Scope the Project and Gather Data

#### Scope 
The scope of this project is to analyze the data provided by transactional systems, cleanse the data if needed and load it into an analytical data model to facilitate querying the data.

#### Describe and Gather Data 
In this section, I will describe the data including its source.

- txn.csv (fact)
    - Structure
        - txn_id
        - txn_date
        - contact_id
        - product_id
        - sales
        - redemptions
    - Source: InvestSure's transactional system captured by its trading application
    - Feed frequency: Daily

- customer.csv (dimension)
    - Structure
        - customer_id
        - customer_name
        - sector
    - Source: InvestSure's CRM system
    - Feed Frequency: Daily

- contact.csv (dimenstion)
    - Structure
        - contact_id
        - first_name
        - last_name
        - city
        - state_code
        - zip
        - country
        - latitude
        - longitude
        - customer_id
        - status
        - opportunity
    - Source: InvestSure's CRM system
    - Feed Frequency: Daily

- product.json (dimension)
    - Structure
        - product_id
        - product_name
        - tna
        - ms_rating
        - exp_ratio
        - market_cap
    - Source: Yahoo Finance
    - Feed Frequency: Daily

- sec_codes.csv (mapping table)
    - Structure
        - code
        - description
    - Source: Yahoo Finance provides description but InvestSure's systems use abbreviated codes
    - Feed Frequency: On demand and when new customers are added

- state.csv
    - Structure
        - state_code
        - state
        - region
    - Source: US Census Board
    - Feed Frequency: one time


In [14]:
# Create a Spark Session
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()


In [21]:
# Read CUSTOMER data
df_customer = spark.read.format('csv').options(header='true', inferSchema='true').load(config['LOCAL']['INPUT_DATA_CUSTOMER'])

df_customer.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- sector: string (nullable = true)



In [22]:
# Display sample rows from CUSTOMER data
df_customer.show(n=5)

+-----------+--------------------+------+
|customer_id|       customer_name|sector|
+-----------+--------------------+------+
|  450056063|B.W.E CUSTOM CONS...|    HC|
|  450056064|SERGEY NIZHEGOROD...|    CS|
|  450056066| SUNRISE ANDOVER LLC|    RE|
|  450056067|474 CENTRAL BOULE...|    FS|
|  450056068|  ELITE FINISHES LLC|  TECH|
+-----------+--------------------+------+
only showing top 5 rows



In [23]:
# Read CONTACT data
df_contact = spark.read.format('csv').options(header='true', inferSchema='true').load(config['LOCAL']['INPUT_DATA_CONTACT'])

df_contact.printSchema()


root
 |-- contact_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- opportunity: integer (nullable = true)



In [24]:
# Display sample rows from CONTACT data
df_contact.show(n=5)

+----------+----------+----------+-----------+----------+-----+-------+--------+---------+-----------+------+-----------+
|contact_id|first_name| last_name|       city|state_code|  zip|country|latitude|longitude|customer_id|status|opportunity|
+----------+----------+----------+-----------+----------+-----+-------+--------+---------+-----------+------+-----------+
| 100000339|     Lyndy|   Chachas|      Omaha|        NE|68130|    USA|41.22962| -96.1815|  450058148|Active|      50000|
| 100001423|     Watts|Eifenstadt|     Weston|        FL|33326|    USA|26.09966|-80.36497|  450059017|Active|      50000|
| 100001837|  Jingfeng|    Lopina|Hunt Valley|        MD|21030|    USA|39.50004|-76.66566|  450059076|Active|     125000|
| 100002544|   Gaynell|   Vivrett|     Beloit|        WI|53511|    USA|42.49625|-89.03702|  450057762|Active|      16000|
| 100002551| Peregrino|    Valles|   New York|        NY|10036|    USA|40.75841|-73.98155|  450055995|Active|     150000|
+----------+----------+-

In [26]:
# Read PRODUCT data (note that this data is in JSON format)
df_product = spark.read.format('json').options(header='true', inferSchema='true', multiline='true').load(config['LOCAL']['INPUT_DATA_PRODUCT'])

df_product.printSchema()

root
 |-- exp_ratio: double (nullable = true)
 |-- market_cap: string (nullable = true)
 |-- ms_rating: long (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- tna: long (nullable = true)



In [27]:
# Display sample rows from PRODUCT data
df_product.show(n=5)

+---------+----------+---------+----------+--------------------+------------+
|exp_ratio|market_cap|ms_rating|product_id|        product_name|         tna|
+---------+----------+---------+----------+--------------------+------------+
|     0.05|     Giant|        5|     VFIAX|Vanguard 500 Inde...|163456368456|
|     0.05|     Giant|        4|     VTSAX|Vanguard Total St...|136131758268|
|     0.04|     Giant|        5|     VINIX|Vanguard Institut...|110407917518|
|     0.16|     Giant|        4|     VTSMX|Vanguard Total St...| 98869371846|
|     0.02|     Giant|        5|     VIIIX|Vanguard Institut...| 93192353649|
+---------+----------+---------+----------+--------------------+------------+
only showing top 5 rows



In [29]:
# Read SECTOR CODES data
df_sec_codes = spark.read.format('csv').options(header='true', inferSchema='true').load(config['LOCAL']['INPUT_DATA_SECTOR'])

df_sec_codes.printSchema()


root
 |-- code: string (nullable = true)
 |-- description: string (nullable = true)



In [30]:
# Display sample rows from SECTOR CODE data
df_sec_codes.show(n=5)

+----+--------------------+
|code|         description|
+----+--------------------+
|  FS|  Financial Services|
|  RE|         Real Estate|
|  HC|          Healthcare|
|  UT|           Utilities|
|  CS|Communication Ser...|
+----+--------------------+
only showing top 5 rows



In [31]:
# Read STATE data
df_state = spark.read.format('csv').options(header='true', inferSchema='true').load(config['LOCAL']['INPUT_DATA_STATE'])

df_state.printSchema()


root
 |-- state_code: string (nullable = true)
 |-- state: string (nullable = true)
 |-- region: string (nullable = true)



In [32]:
# Display sample rows from STATE data
df_state.show(n=5)

+----------+----------+--------+
|state_code|     state|  region|
+----------+----------+--------+
|        AL|   Alabama|Southern|
|        AK|    Alaska| Pacific|
|        AZ|   Arizona| Pacific|
|        AR|  Arkansas|Southern|
|        CA|California| Pacific|
+----------+----------+--------+
only showing top 5 rows



In [33]:
# Read TRANSACTION data
df_txn = spark.read.format('csv').options(header='true', inferSchema='true').load(config['LOCAL']['INPUT_DATA_TXN'])

df_txn.printSchema()

root
 |-- txn_id: integer (nullable = true)
 |-- txn_date: string (nullable = true)
 |-- contact_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- redemptions: double (nullable = true)



In [34]:
# Display sample rows from TRANSACTION data
df_txn.show(n=5)

+---------+--------+----------+----------+---------+-----------+
|   txn_id|txn_date|contact_id|product_id|    sales|redemptions|
+---------+--------+----------+----------+---------+-----------+
|422909780|  1/2/15| 992808564|     VIVAX|46892.193|        0.0|
|422909781|  1/2/15| 261785827|     SOPAX|      0.0| -33424.776|
|422909782|  1/2/15| 389127962|     BAICX|14230.848|        0.0|
|422909783|  1/2/15| 101692476|     SGROX|94046.931|        0.0|
|422909784|  1/2/15| 327754553|     FXSIX|22038.856|        0.0|
+---------+--------+----------+----------+---------+-----------+
only showing top 5 rows



In [20]:
# Extract columns to create CUSTOMER table
customer_columns = ["customer_id", "customer_name", "sector"]

# Write CUSTOMER table to parquet file
customer_table = df_spark_customer.select(customer_columns)
customer_table.write.parquet(config['LOCAL']['OUTPUT_DATA_CUSTOMER'])

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
# Performing cleaning tasks here





### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.