### Data Engineering Capstone Project

#### Project Summary
In this project we going to explore, process & store three datasets:
   * Bitcoin-USD 
   * cities demographics
   * Immigration data
The first two datasets are in CSV and third in SAS file format.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [21]:
# Do all imports and  here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction as udf
from pyspark.sql.functions import isnan, when, count, col
import datetime 
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project we gonna load three datasets and explore them, then fix and clean if needed, finally, we load them into tables and check on quality.
using tools: Pandas, spark as data store.

#### Describe and Gather Data 
I'm going to load and explore three datasets of:
   1. Bitcoin-USD data from sep-2014 till march-2022
   2. us cities demographics
   3. immigration data

In [2]:
# Read in "Bitcoin-USD" data
bitcoin_df = pd.read_csv("BTC-USD.csv")
bitcoin_df.head()

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume
0,2014-09-17,465.864014,468.174011,452.421997,457.334015,457.334015,21056800
1,2014-09-18,456.859985,456.859985,413.104004,424.440002,424.440002,34483200
2,2014-09-19,424.102997,427.834991,384.532013,394.79599,394.79599,37919700
3,2014-09-20,394.673004,423.29599,389.882996,408.903992,408.903992,36863600
4,2014-09-21,408.084991,412.425995,393.181,398.821014,398.821014,26580100


In [3]:
bitcoin_df.describe()

Unnamed: 0,Open,High,Low,Close,Adj Close,Volume
count,2747.0,2747.0,2747.0,2747.0,2747.0,2747.0
mean,11668.600272,11981.034949,11325.596907,11682.892098,11682.892098,14847040000.0
std,16323.683853,16759.568657,15825.584507,16330.191582,16330.191582,19948190000.0
min,176.897003,211.731003,171.509995,178.102997,178.102997,5914570.0
25%,609.122009,611.894501,606.309478,609.234009,609.234009,81612850.0
50%,6371.850098,6500.870117,6285.629883,6376.709961,6376.709961,5227550000.0
75%,10728.271485,10992.468751,10412.890137,10755.395019,10755.395019,25005170000.0
max,67549.734375,68789.625,66382.0625,67566.828125,67566.828125,350967900000.0


In [4]:
# Read in "US cities demographics" data here
demography_df = pd.read_csv("us-cities-demographics.csv", sep=';')
demography_df.columns = ['city', 'state', 'median_age', 'male_pop', 'female_pop', 'total_pop', 'num_vetarans', 'foreign_born', 'avg_household_size', 'state_code', 'race', 'count']
demography_df.head()

Unnamed: 0,city,state,median_age,male_pop,female_pop,total_pop,num_vetarans,foreign_born,avg_household_size,state_code,race,count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [5]:
# Loading full immigration data
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark.show(2)

In [7]:
# selecting needed columns only
immigration_df = df_spark[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa']]
immigration_df.show(5)

+-----+------+------+-------+-------+-------+-------+-------+-------+
|cicid| i94yr|i94mon|i94port|i94addr|arrdate|depdate|i94mode|i94visa|
+-----+------+------+-------+-------+-------+-------+-------+-------+
|  6.0|2016.0|   4.0|    XXX|   null|20573.0|   null|   null|    2.0|
|  7.0|2016.0|   4.0|    ATL|     AL|20551.0|   null|    1.0|    3.0|
| 15.0|2016.0|   4.0|    WAS|     MI|20545.0|20691.0|    1.0|    2.0|
| 16.0|2016.0|   4.0|    NYC|     MA|20545.0|20567.0|    1.0|    2.0|
| 17.0|2016.0|   4.0|    NYC|     MA|20545.0|20567.0|    1.0|    2.0|
+-----+------+------+-------+-------+-------+-------+-------+-------+
only showing top 5 rows



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps

    1. Transform all columns titles to lowercase and depricate 'Adj Close' column then converting to Spark dataframe
    2. Fixing data types for demography dataset then converting to Spark dataframe
    3. Rename columns in immigration dataset.
    
##### 1. Transform all columns titles to lowercase and depricate 'Adj Close' column


In [8]:
def cleanBitcoin(df):
    df = df[['Date', 'Open', 'High', 'Low', 'Close', 'Volume']]
    df.columns = ['date', 'open', 'high', 'low', 'close', 'volume']
    df['date'] =  pd.to_datetime(df['date']).astype('datetime64').dt.strftime("%Y-%m-%d")
    df['volume'] =  df['volume'].values.astype(int)
    return df

bitcoin_df = cleanBitcoin(bitcoin_df)
sparkDF = spark.createDataFrame(bitcoin_df)
sparkDF.show(2)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  after removing the cwd from sys.path.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """


Unnamed: 0,date,open,high,low,close,volume
0,2014-09-17,465.864014,468.174011,452.421997,457.334015,21056800
1,2014-09-18,456.859985,456.859985,413.104004,424.440002,34483200
2,2014-09-19,424.102997,427.834991,384.532013,394.79599,37919700
3,2014-09-20,394.673004,423.29599,389.882996,408.903992,36863600
4,2014-09-21,408.084991,412.425995,393.181,398.821014,26580100


##### 2. Fixing data types for demography dataset

In [10]:
def cleanDemography(df):
    df = df.dropna()
    df['male_pop'] = df['male_pop'].astype('int')
    return df

demography_df = cleanDemography(demography_df)
sparkDF2 = spark.createDataFrame(demography_df)
sparkDF2.show(2)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  This is separate from the ipykernel package so we can avoid doing imports until


+-------------+-------------+----------+--------+----------+---------+------------+------------+------------------+----------+------------------+-----+
|         city|        state|median_age|male_pop|female_pop|total_pop|num_vetarans|foreign_born|avg_household_size|state_code|              race|count|
+-------------+-------------+----------+--------+----------+---------+------------+------------+------------------+----------+------------------+-----+
|Silver Spring|     Maryland|      33.8|   40601|   41862.0|    82463|      1562.0|     30908.0|               2.6|        MD|Hispanic or Latino|25924|
|       Quincy|Massachusetts|      41.0|   44129|   49500.0|    93629|      4147.0|     32935.0|              2.39|        MA|             White|58723|
+-------------+-------------+----------+--------+----------+---------+------------+------------+------------------+----------+------------------+-----+
only showing top 2 rows



##### 3. Immigration Dataset

In [12]:
def cleanImmigration(df):
    oldcolumns = ['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa']
    newcolumns = ['cic_id', 'year', 'month', 'city_code', 'state_code', 'arrive_date', 'departure_date', 'mode', 'visa']
    for item in range(0, len(oldcolumns)):
        df = df.withColumnRenamed(oldcolumns[item],newcolumns[item])
    return df

immigration_df = cleanImmigration(immigration_df)
immigration_df.show(2)

+------+------+-----+---------+----------+-----------+--------------+----+----+
|cic_id|  year|month|city_code|state_code|arrive_date|departure_date|mode|visa|
+------+------+-----+---------+----------+-----------+--------------+----+----+
|   6.0|2016.0|  4.0|      XXX|      null|    20573.0|          null|null| 2.0|
|   7.0|2016.0|  4.0|      ATL|        AL|    20551.0|          null| 1.0| 3.0|
+------+------+-----+---------+----------+-----------+--------------+----+----+
only showing top 2 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

###### The model is immigration data as fact table with demography as a related dimension table. The bitcoin data is not so related but we can attach them all with Date as a forign key.


| Table        | columns                                                                                                                                                                | description                         | type            |
|--------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------|-----------------|
| bitcoin      | Date - Open - High - Low - Close - Volume                                                                                                                              | stores bitcoin price data           | dimension table |
| demography   | city - state - media_age - male_population - female_population -  total_population - num_veterans - foreign_born - average_household_size -  state_code - race - count | stores demographics data for cities | dimension table |
| immigrations | cicid - year - month - city - arrdate - mode - depdate - visa - count                                                                                                  | stores immigrations data            | fact table      |

##### The data dictionaries as below:

|date|open|high|low|close|volume|
|----|----|----|---|-----|------|
|the date of record |open price |highest price of day |lowest price |close price |volume of trading |

|city|state|median_age|male_pop|female_pop|total_pop|num_vetarans|foreign_born|avg_household_size|state_code|race|count|
|----|------|----------|--------|---------|-----------|----------|------------|-----------------|----------|-----|------|
|the city name |state nameian age | male population | female pop | tolal pop | number of vetarnas | foreign born | average household size | state code | race | count

|cic_id|year|month|city_code|state_code|arrive_date|departure_date|mode|visa|
|------|----|-----|---------|----------|-----------|--------------|----|---|
|cic id number | year | month | city code | state code |arrive date |departure date |mode | visa type|


#### 3.2 Mapping Out Data Pipelines

    1. Assume all data sets are stored in same dir.
    2. Cleaning up datasets
    3. writing then into spark parquet


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model


In [13]:
# Writing three datasets into spark parquet
sparkDF.write.mode('overwrite').parquet("datasets_sas/bitcoin")
sparkDF2.write.mode('overwrite').parquet("datasets_sas/demography")
immigration_df.write.mode('overwrite').parquet("datasets_sas/immigration")

#### 4.2 Data Quality Check

In [15]:
from pathlib import Path
sasfiles = Path('datasets_sas/')

# Checks for empty records
for file_dir in sasfiles.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        record_num = df.count()
        if record_num <= 0:
            raise ValueError("This table is empty!")
        else:
            print(path.split('/')[-1] + f" is not empty: with total {record_num} records.")

demography is not empty: with total 2875 records.
bitcoin is not empty: with total 2747 records.
immigration is not empty: with total 3096313 records.


In [24]:
# Checks for Nan values count
for file_dir in sasfiles.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        record_num = df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()
        print(path.split('/')[-1] + f" has {record_num} records of Nan Values.\n")

+----+-----+----------+--------+----------+---------+------------+------------+------------------+----------+----+-----+
|city|state|median_age|male_pop|female_pop|total_pop|num_vetarans|foreign_born|avg_household_size|state_code|race|count|
+----+-----+----------+--------+----------+---------+------------+------------+------------------+----------+----+-----+
|   0|    0|         0|       0|         0|        0|           0|           0|                 0|         0|   0|    0|
+----+-----+----------+--------+----------+---------+------------+------------+------------------+----------+----+-----+

demography has = None records of Nan Values.

+----+----+----+---+-----+------+
|date|open|high|low|close|volume|
+----+----+----+---+-----+------+
|   0|   0|   0|  0|    0|     0|
+----+----+----+---+-----+------+

bitcoin has = None records of Nan Values.

+------+----+-----+---------+----------+-----------+--------------+----+----+
|cic_id|year|month|city_code|state_code|arrive_date|depa

#### Step 5: Complete Project Write Up


* For this project I've selected jupyter as a visualisation & processing tool and Spark as data store.
* The datasets should be updated monthly, as immigration dataset updated in that interval. 
* If the data increased a hundred times we should use a NoSQL database, and distributed data storage like S3.
* For a 100 users, AWS Redshift allows a max of 500 connections and 50 concurrencies per cluster.
* For scheduled pipelines a tool like Airflow can be used. It has the advantage, that it provides a web view, so that also non programmers can check wether a pipeline ran successfully or not.
