# Project Title
### Data Engineering Capstone Project

#### Project Summary
This is a data engineer project on collecting data from S&P500 stocks' data and building a data warehouse on Amazon Redshift for further analysis. It contains a pipeline from copying data into stage tables to loading data into the snowflake schema database. The process includes a data quality check after loading all the data.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

#### File and folder
1. raw include data before clean
2. cleaned_data include data after clean
3. dwh.cfg include config information for amazon services

#### Flow to run file
1. infrastructure_as_code.ipynb to create and delete cluster
2. create_tables.py to create table on database
3. etl.py to load data from S3 to database 
4. data_quality.py to check if data is loaded successfully or not


In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import functions as F
import boto3
# from IPython.display import display

### Step 1: Scope the Project and Gather Data

#### Scope 
I used data from S&P 500 stocks. The final solution is a data warehouse to analyze stock indicators. Data is loaded into a database on Amazon Redshift.

#### Describe and Gather Data
S&P 500 stocks come from kaggle. It include daily data of stocks in America.  

In [2]:
# Read data in airport codes file
pd.set_option('display.max_columns', None)
df1 = pd.read_csv('raw/sp500_companies.csv') 
df1.head()

Unnamed: 0,Exchange,Symbol,Shortname,Longname,Sector,Industry,Currentprice,Marketcap,Ebitda,Revenuegrowth,City,State,Country,Fulltimeemployees,Longbusinesssummary,Weight
0,NMS,AAPL,Apple Inc.,Apple Inc.,Technology,Consumer Electronics,138.2,2267364458496,129557000000.0,0.019,Cupertino,CA,United States,154000.0,"Apple Inc. designs, manufactures, and markets ...",0.071062
1,NMS,MSFT,Microsoft Corporation,Microsoft Corporation,Technology,Software—Infrastructure,232.9,1748608483328,97983000000.0,0.124,Redmond,WA,United States,221000.0,"Microsoft Corporation develops, licenses, and ...",0.054803
2,NMS,GOOGL,Alphabet Inc.,Alphabet Inc.,Communication Services,Internet Content & Information,95.65,1250738569216,96887000000.0,0.126,Mountain View,CA,United States,174014.0,Alphabet Inc. provides various products and pl...,0.039199
3,NMS,GOOG,Alphabet Inc.,Alphabet Inc.,Communication Services,Internet Content & Information,96.15,1250738438144,96887000000.0,0.126,Mountain View,CA,United States,174014.0,Alphabet Inc. provides various products and pl...,0.039199
4,NMS,AMZN,"Amazon.com, Inc.","Amazon.com, Inc.",Consumer Cyclical,Internet Retail,113.0,1151198822400,52620000000.0,0.072,Seattle,WA,United States,1523000.0,"Amazon.com, Inc. engages in the retail sale of...",0.03608


In [3]:
df1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 492 entries, 0 to 491
Data columns (total 16 columns):
Exchange               492 non-null object
Symbol                 492 non-null object
Shortname              492 non-null object
Longname               492 non-null object
Sector                 492 non-null object
Industry               492 non-null object
Currentprice           492 non-null float64
Marketcap              492 non-null int64
Ebitda                 462 non-null float64
Revenuegrowth          491 non-null float64
City                   492 non-null object
State                  473 non-null object
Country                492 non-null object
Fulltimeemployees      486 non-null float64
Longbusinesssummary    492 non-null object
Weight                 492 non-null float64
dtypes: float64(5), int64(1), object(10)
memory usage: 61.6+ KB


In [4]:
df1.isna().sum()

Exchange                0
Symbol                  0
Shortname               0
Longname                0
Sector                  0
Industry                0
Currentprice            0
Marketcap               0
Ebitda                 30
Revenuegrowth           1
City                    0
State                  19
Country                 0
Fulltimeemployees       6
Longbusinesssummary     0
Weight                  0
dtype: int64

In [5]:
# Read data in immigration date file

df2 = pd.read_csv('raw/sp500_index.csv') 
df2.head(5)

Unnamed: 0,Date,S&P500
0,2012-10-01,1444.49
1,2012-10-02,1445.75
2,2012-10-03,1450.99
3,2012-10-04,1461.4
4,2012-10-05,1460.93


In [6]:
df2.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2517 entries, 0 to 2516
Data columns (total 2 columns):
Date      2517 non-null object
S&P500    2517 non-null float64
dtypes: float64(1), object(1)
memory usage: 39.4+ KB


In [7]:
df2.isna().sum()

Date      0
S&P500    0
dtype: int64

In [8]:

df3 = pd.read_csv('raw/sp500_stocks.csv') 
df3.head()

Unnamed: 0,Date,Symbol,Adj_Close,Close,High,Low,Open,Volume
0,12/31/2009,MMM,,,,,,
1,1/4/2010,MMM,59.318886,83.019997,83.449997,82.669998,83.089996,3043700.0
2,1/5/2010,MMM,58.947342,82.5,83.230003,81.699997,82.800003,2847000.0
3,1/6/2010,MMM,59.783295,83.669998,84.599998,83.510002,83.879997,5268500.0
4,1/7/2010,MMM,59.826176,83.730003,83.760002,82.120003,83.32,4470100.0


In [9]:
df3.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1048575 entries, 0 to 1048574
Data columns (total 8 columns):
Date         1048575 non-null object
Symbol       1048575 non-null object
Adj_Close    998131 non-null float64
Close        998131 non-null float64
High         998131 non-null float64
Low          998131 non-null float64
Open         998131 non-null float64
Volume       998131 non-null float64
dtypes: float64(6), object(2)
memory usage: 64.0+ MB


In [10]:
df3.isna().sum()

Date             0
Symbol           0
Adj_Close    50444
Close        50444
High         50444
Low          50444
Open         50444
Volume       50444
dtype: int64

In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

# df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
# df = spark
# df = spark.read.option("header",True).csv("sp500_stocks.csv")

# df.write.partitionBy("Symbol").parquet("sp500_stocks.parquet", mode="overwrite")

In [12]:
df = spark.read.parquet("raw/sp500_stocks.parquet")
df.show(5, truncate=False)

+----------+-----------+-----------+-----------+-----------+-----------+--------+------+
|Date      |Adj_Close  |Close      |High       |Low        |Open       |Volume  |Symbol|
+----------+-----------+-----------+-----------+-----------+-----------+--------+------+
|12/31/2009|null       |null       |null       |null       |null       |null    |HON   |
|1/4/2010  |29.38503838|38.46846008|38.49706268|37.63903046|37.76296616|7750615 |HON   |
|1/5/2010  |29.44331551|38.54473114|38.57332993|38.22058487|38.28731918|6796106 |HON   |
|1/6/2010  |29.44331551|38.54473114|38.64960098|38.35405731|38.55426407|6209345 |HON   |
|1/7/2010  |30.01133537|39.28835678|39.43136215|38.21105194|38.55426407|10266533|HON   |
+----------+-----------+-----------+-----------+-----------+-----------+--------+------+
only showing top 5 rows



In [13]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Adj_Close: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Symbol: string (nullable = true)



In [14]:
df.count()

1048575

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [15]:
df1.nunique()

Exchange                 4
Symbol                 492
Shortname              489
Longname               489
Sector                  11
Industry               113
Currentprice           489
Marketcap              492
Ebitda                 455
Revenuegrowth          317
City                   229
State                   38
Country                  7
Fulltimeemployees      388
Longbusinesssummary    489
Weight                 492
dtype: int64

In [16]:
# Drop all duplicate rows
df1 = df1.drop_duplicates()
df1.nunique()

Exchange                 4
Symbol                 492
Shortname              489
Longname               489
Sector                  11
Industry               113
Currentprice           489
Marketcap              492
Ebitda                 455
Revenuegrowth          317
City                   229
State                   38
Country                  7
Fulltimeemployees      388
Longbusinesssummary    489
Weight                 492
dtype: int64

In [17]:
mean_Ebitda = df1['Ebitda'].mean()
mean_Revenuegrowth = df1['Revenuegrowth'].mean()
df1['Ebitda'].fillna(value=mean_Ebitda, inplace=True)
df1['Revenuegrowth'].fillna(value=mean_Revenuegrowth, inplace=True)
df1.isna().sum()

Exchange                0
Symbol                  0
Shortname               0
Longname                0
Sector                  0
Industry                0
Currentprice            0
Marketcap               0
Ebitda                  0
Revenuegrowth           0
City                    0
State                  19
Country                 0
Fulltimeemployees       6
Longbusinesssummary     0
Weight                  0
dtype: int64

In [18]:
df2.nunique()

Date      2517
S&P500    2505
dtype: int64

In [19]:
# Drop all duplicate rows
df2 = df2.drop_duplicates()
df2.nunique()

Date      2517
S&P500    2505
dtype: int64

In [20]:
df.distinct().count()

1048575

In [21]:
df = df.dropDuplicates()
df.distinct().count()

1048575

In [22]:
# Count null values
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+---------+-----+-----+-----+-----+------+------+
|Date|Adj_Close|Close| High|  Low| Open|Volume|Symbol|
+----+---------+-----+-----+-----+-----+------+------+
|   0|    50444|50444|50444|50444|50444| 50444|     0|
+----+---------+-----+-----+-----+-----+------+------+



In [23]:
df_mean = df.fillna(value=0, subset=['Adj_Close'])\
            .select(F.mean(F.col('Adj_Close')).alias('avg')).collect()
df_mean = df.fillna(value=0, subset=['Close'])\
            .select(F.mean(F.col('Close')).alias('avg')).collect()
df_mean = df.fillna(value=0, subset=['High'])\
            .select(F.mean(F.col('High')).alias('avg')).collect()
df_mean = df.fillna(value=0, subset=['Low'])\
            .select(F.mean(F.col('Low')).alias('avg')).collect()
df_mean = df.fillna(value=0, subset=['Open'])\
            .select(F.mean(F.col('Open')).alias('avg')).collect()
df_mean = df.fillna(value=0, subset=['Volume'])\
            .select(F.mean(F.col('Volume')).alias('avg')).collect()


avg = df_mean[0]['avg']

df = df.withColumn("Adj_Close", F.udf(lambda x: avg if x is None else x)(F.col('Adj_Close')))
df = df.withColumn("Close", F.udf(lambda x: avg if x is None else x)(F.col('Close')))
df = df.withColumn("High", F.udf(lambda x: avg if x is None else x)(F.col('High')))
df = df.withColumn("Low", F.udf(lambda x: avg if x is None else x)(F.col('Low')))
df = df.withColumn("Open", F.udf(lambda x: avg if x is None else x)(F.col('Open')))
df = df.withColumn("Volume", F.udf(lambda x: avg if x is None else x)(F.col('Volume')))

In [24]:
# Count null values
# df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [25]:
df1.to_csv('cleaned_data/sp500_companies.csv', index=False)

In [26]:
df2.to_csv('cleaned_data/sp500_index.csv', index=False)

In [27]:
df.write.parquet("cleaned_data/sp500_stocks.parquet", mode="overwrite")

In [30]:
df = spark.read.parquet("cleaned_data/sp500_stocks.parquet")
df.show(5, truncate=False)

+----------+-----------+-----------+-----------+-----------+-----------+-------+------+
|Date      |Adj_Close  |Close      |High       |Low        |Open       |Volume |Symbol|
+----------+-----------+-----------+-----------+-----------+-----------+-------+------+
|11/19/2010|37.28019333|47.45873642|47.46826935|46.83904648|47.32526398|4916457|HON   |
|2/7/2011  |42.78460312|54.4659996 |54.74247742|54.3134613 |54.59947205|3067121|HON   |
|2/11/2011 |43.42116165|55.27636337|55.36216736|54.41833115|54.6471405 |2425293|HON   |
|3/15/2011 |41.83667374|52.95014191|53.35055923|51.66309357|51.9109726 |5028795|HON   |
|11/15/2011|41.80090332|52.22558212|52.62599945|51.50102234|51.90143585|4782091|HON   |
+----------+-----------+-----------+-----------+-----------+-----------+-------+------+
only showing top 5 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
I use a snowflake schema for this OLAP model. Fact table contains all the main aspects for analysis when changes occur in the market. With symbol refference, we can search for more company details in the dim_company table. Then, dim_location and dim_exchange provide the location where the company is and the exchange where the stock was traded, respectively. Furthermore, dim_index displays the S&P 500 index on that date.

#### 3.2 Mapping Out Data Pipelines
Clean data >> upload data to S3 >> create table >> load data into stage table >> load data into dim and fact >> check data quality



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [2]:
# After create a redshift cluster in Infrastructure_as_code.ipynb run
# !python create_tables.py
# !python etl.py

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [24]:
# Perform quality checks here
import configparser
import psycopg2
config = configparser.ConfigParser()
config.read('dwh.cfg')

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [25]:
dq_checks=[
    {'test_sql': "SELECT COUNT(*) FROM fact_stocks", 'expected_result': 0, 'table':'fact_stocks'},
    {'test_sql': "SELECT COUNT(*) FROM dim_index", 'expected_result': 0, 'table':'dim_index'},
    {'test_sql': "SELECT COUNT(*) FROM dim_company", 'expected_result': 0, 'table':'dim_company'},
    {'test_sql': "SELECT COUNT(*) FROM dim_location", 'expected_result': 0, 'table':'dim_location'},
    {'test_sql': "SELECT COUNT(*) FROM dim_exchange", 'expected_result': 0, 'table':'dim_exchange'},
]

In [29]:
for dq_check in dq_checks:
    cur.execute(dq_check['test_sql'])
    records = cur.fetchone()
    
    if dq_check['expected_result'] == records[0]:
        print(f"There are no data in table {dq_check['table']}")
    else:
        print(f"Data quality check pass on table {dq_check['table']}")
              

Data quality check pass on table fact_stocks
Data quality check pass on table dim_index
Data quality check pass on table dim_company
Data quality check pass on table dim_location
Data quality check pass on table dim_exchange


In [18]:
import boto3
import configparser
import psycopg2

config = configparser.ConfigParser()
config.read('dwh.cfg')

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")


redshift = boto3.client('redshift',region_name = 'us-west-2', aws_access_key_id = KEY, aws_secret_access_key = SECRET)

In [31]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()
cur.execute("SELECT * FROM PG_TABLE_DEF WHERE schemaname = 'public';")
records = cur.fetchone()
records[2:4]


('symbol', 'character varying(255)')

In [3]:
# Run this after etl process to check data quality
# !python data_quality.py

#### 4.3 Data dictionary 



| Field Name          | Data Type | Description                                                                                                      | Example                                       |
|---------------------|-----------|------------------------------------------------------------------------------------------------------------------|-----------------------------------------------|
| Exchange            | VARCHAR   | Exchange where its stocks are negociated                                                                         | NMS                                           |
| Symbol              | VARCHAR   | Stock symbol                                                                                                     | AAPL                                          |
| Short_name          | VARCHAR   | Company short name                                                                                               | Apple Inc.                                    |
| Long_name           | VARCHAR   | Company long name                                                                                                | Apple Inc.                                    |
| Sector              | VARCHAR   | Sector where the company operates                                                                                | Technology                                    |
| Industry            | VARCHAR   | Industry, within a sector, where the company operates                                                            | Consumer Electronics                          |
| Current_price       | FLOAT     | Current stock price                                                                                              | 138.20                                        |
| Marketcap           | FLOAT     | Current marketcap                                                                                                | 2,26736E+12                                   |
| Ebitda              | FLOAT     | Earnings before interest, taxes, depreciation and amortization                                                   | 1,30E+17                                      |
| Revenue_growth      | FLOAT     | Revenue growth                                                                                                   | 0.019                                         |
| Date                | DATE      | Date                                                                                                             | 01/10/2012                                    |
| S&P500              | FLOAT     | Value of the S&P 500 index                                                                                       | 1444.49                                       |
| Symbol              | VARCHAR   | Company Symbol/Ticker                                                                                            | MMM                                           |
| Adj_close_price     | FLOAT     | Similar to the price at market closure, yet also takes into account company actions such as dividends and splits | 59.318.886                                    |
| Close_price         | FLOAT     | Price at market closure                                                                                          | 83.019.997                                    |
| High_price          | FLOAT     | Maximum value of period                                                                                          | 83.449.997                                    |
| Low_price           | FLOAT     | Minimum value of period                                                                                          | 82.669.998                                    |
| Open_price          | FLOAT     | Price at market opening                                                                                          | 83.089.996                                    |
| Volume              | FLOAT     | Volume traded                                                                                                    | 3043700.0                                     |
| City                | VARCHAR   | City                                                                                                             | Cupertino                                     |
| State               | VARCHAR   | State                                                                                                            | CA                                            |
| Country             | VARCHAR   | Country                                                                                                          | United States                                 |
| Fulltimeemployees   | FLOAT     | Fulltimeemployees                                                                                                | 154000.0                                      |
| Longbusinesssummary | VARCHAR   | Longbusinesssummary                                                                                              | Apple Inc. designs, manufactures, and markets |
| Weight              | FLOAT     | Weight                                                                                                           | 0.071062                                      |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Reason
Amazon redshift can run and scale analytics in seconds on data without having to manage data warehouse infrastructure. It works well with a data warehouse. It's easy to connect and analyze data with a basic familiar query. 
It is well suited to OLTP and data warehouse models, as used in this project. 

![image](schema.jpg)

#### Daily
This data should be updated daily due to the daily changes in the stock market. 

#### Scale up
 * The data was increased by 100x: Convert the data to parquet and save it to S3. This is a kind of data lake model concept that we can use spark to quickly ingest and analyze data. Consider using EMR if needed. 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day: Use EMR and set up airflow on it.
 * The database needed to be accessed by 100+ people: Use EMR and set up spark on it.

#### User use example
    SELECT AVG(open_price) AS Average_open_price 
    FROM Fact_stock AS f
    LEFT JOIN Dim_company AS c
    ON f.symbol = c.symbol
    WHERE Sector = 'Technology'
    GROUP BY symbol;
 