# BITCOIN AND ETHEREUM FORECASTING

### Data Engineering Capstone Project

#### Project Summary

This project defines the pipeline to load historical data of Bitcoin and Ethereum blockchains and create a Data Lake. The process includes data formatting, cleaning, and transformation. 

The project follows the follow steps:

* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [22]:
# Import required libraries
import pandas as pd
import re
import boto3
import zipfile
from pyspark.sql import SparkSession
import os
import glob
import configparser
from datetime import datetime, timedelta, date
from pyspark.sql import types as t
from pyspark.sql.functions import udf, col, monotonically_increasing_id, to_date, to_timestamp, isnan, when, count
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, minute

import matplotlib.pyplot as plt
import seaborn as sns

### Configure goblal variables

In [2]:
# Configure java an hadoop global variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

In [3]:
# Read configiguration file
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

# NOTE: Use these if using AWS S3 as a storage
INPUT_DATA_AWS                = config['AWS']['INPUT_DATA_AWS']
OUTPUT_DATA_AWS               = config['AWS']['OUTPUT_DATA_AWS']

# NOTE: Use these if using local storage
INPUT_DATA_LOCAL              = config['LOCAL']['INPUT_DATA_LOCAL']
OUTPUT_DATA_LOCAL             = config['LOCAL']['OUTPUT_DATA_LOCAL']

# Common configuration parameters
DATA_LOCATION                 = config['COMMON']['DATA_LOCATION']
DATA_STORAGE                  = config['COMMON']['DATA_STORAGE']
INPUT_DATA_BTC_DIRECTORY      = config['COMMON']['INPUT_DATA_BTC_DIRECTORY']
INPUT_DATA_BTC_ZIP_FILENAME   = config['COMMON']['INPUT_DATA_BTC_ZIP_FILENAME']
INPUT_DATA_BTC_FILENAME       = config['COMMON']['INPUT_DATA_BTC_FILENAME']
INPUT_DATA_ETH_DIRECTORY      = config['COMMON']['INPUT_DATA_ETH_DIRECTORY']
INPUT_DATA_ETH_ZIP_FILENAME   = config['COMMON']['INPUT_DATA_ETH_ZIP_FILENAME']
INPUT_DATA_ETH_FILENAME       = config['COMMON']['INPUT_DATA_ETH_FILENAME']
OUTPUT_DATA_BTC_FILENAME      = config['COMMON']['OUTPUT_DATA_BTC_FILENAME']
OUTPUT_DATA_ETH_FILENAME      = config['COMMON']['OUTPUT_DATA_ETH_FILENAME']
OUTPUT_BTC_TABLE_FILENAME     = config['COMMON']['OUTPUT_BTC_TABLE_FILENAME']
OUTPUT_ETH_TABLE_FILENAME     = config['COMMON']['OUTPUT_ETH_TABLE_FILENAME']
OUTPUT_CRYPTO_TABLE_FILENAME  = config['COMMON']['OUTPUT_CRYPTO_TABLE_FILENAME']

In [4]:
# Set global configuration variables
if DATA_LOCATION == "local":
    input_data          = INPUT_DATA_LOCAL
    output_data         = OUTPUT_DATA_LOCAL

elif DATA_LOCATION == "aws":
    input_data          = INPUT_DATA_AWS
    output_data         = OUTPUT_DATA_AWS
    
elif DATA_STORAGE == "parquet":
    data_storage        = DATA_STORAGE
    
# load variables for BTC data
btc_data_directory      = INPUT_DATA_BTC_DIRECTORY
btc_zip_filename        = INPUT_DATA_BTC_ZIP_FILENAME    
btc_filename            = INPUT_DATA_BTC_FILENAME
btc_table_filename      = OUTPUT_BTC_TABLE_FILENAME

# load variables for ETH data
eth_data_directory      = INPUT_DATA_ETH_DIRECTORY
eth_zip_filename        = INPUT_DATA_ETH_ZIP_FILENAME    
eth_filename            = INPUT_DATA_ETH_FILENAME
eth_table_filename      = OUTPUT_ETH_TABLE_FILENAME

# general variables
crypto_timeseries_table = OUTPUT_CRYPTO_TABLE_FILENAME

#### Unzip data. RUN ONLY if files are compressed. ONLY WORKS FOR LOCAL

In [None]:
# Create btc_data directory
!mkdir btc_data_directory

In [None]:
# Create btc_data directory
!unzip btc_zip_filename -d btc_data_directory

In [None]:
# Create eth_data directory
!mkdir eth_data_directory

In [None]:
# Create btc_data directory
!unzip eth_data_directory -d eth_data_directory

## Step 1: Scope the Project and Gather Data

#### Project Scope 

Create a Data Pipeline to process Bitcoin and Ethereum daily prices from CSV files and add to the Data Warehouse, and then can be used to run price prediction models using Machine Learning Time series analysis. The pipeline includes data loading, cleaning, transformation, and aggregation to make the data available to train the ML models. In this project, the main goal is to build the data pipeline to load data to the Data Warehouse and make it available to run ML Models for price prediction.

To build the ETL Pipeline Apache Spark On AWS Services is used, and pandas and matplotlib is used to execute the EDA. Pyspark API is used to interact with Spark.

#### Describe and Gather Data 

Datasets used is obtained from Kaggle's datasets, from these repositories:

**Bitcoin Historical Data**

    * Source: https://www.kaggle.com/mczielinski/bitcoin-historical-data
    * Description: Bitcoin data at 1-min intervals from select exchanges, Jan 2012 to March 2021
    * Format: Unique CSV file
    * Fields: - Timestamp
              - Open
              - High
              - Low
              - Close
              - Volume_(BTC)
              - Volume_(Currency)
              - Weighted_Price
    * Time period: 2012-01-01 to 2021-3-31

**Ethereum (ETH/USDT) 1m Dataset**

    * Source: https://www.kaggle.com/priteshkeleven/ethereum-ethusdt-1m-dataset
    * Description: Ethereum dataset with 1 minute interval from 17-8-2017 to 03-2-2021
    * Format: CSV for each month
    * Fields: - timestamp
              - open
              - high
              - low
              - close
              - volume
              - close_time
              - quote_av
              - trades
              - tb_base_av
              - tb_quote_av
              - ignore

    * Time period: 17-8-2017 to 03-2-2021

## Step 2: Explore and Assess the Data

### 2.1 Read Bitcoin data

**NOTE**: Original content was compressed zip file. The files were unziped and copu to a local directory to execute the EDA

In [5]:
# Define a function to read the BTC data from files and consolidate a unique datafram
path = os.path.join(input_data,'btc_data' ,'*.csv')
print(path)
files = glob.glob(path)
l_data = []

for filename in files:
    e_data = pd.read_csv(filename, index_col=None, header=0)
    l_data.append(e_data)

BTC_data = pd.concat(l_data, axis=0, ignore_index=True)    

data/btc_data/*.csv


In [6]:
BTC_data.head(5)

Unnamed: 0,Timestamp,Open,High,Low,Close,Volume_(BTC),Volume_(Currency),Weighted_Price
0,1325317920,4.39,4.39,4.39,4.39,0.455581,2.0,4.39
1,1325317980,,,,,,,
2,1325318040,,,,,,,
3,1325318100,,,,,,,
4,1325318160,,,,,,,


In [7]:
# Show data columns
BTC_data.columns

Index(['Timestamp', 'Open', 'High', 'Low', 'Close', 'Volume_(BTC)',
       'Volume_(Currency)', 'Weighted_Price'],
      dtype='object')

In [8]:
# count registers
BTC_data.shape

(4857377, 8)

In [9]:
# Count the null values
BTC_data.isnull().sum(axis = 0)

Timestamp                  0
Open                 1243608
High                 1243608
Low                  1243608
Close                1243608
Volume_(BTC)         1243608
Volume_(Currency)    1243608
Weighted_Price       1243608
dtype: int64

### 2.2 Read ETH data

In [10]:
# Define a function to read the ETH data from files and consolidate a unique datafram
path = os.path.join(eth_data_directory, '*.csv')
files = glob.glob(path)
l_data = []

for filename in files:
    e_data = pd.read_csv(filename, index_col=None, header=0)
    l_data.append(e_data)

ETH_data = pd.concat(l_data, axis=0, ignore_index=True)         

In [11]:
ETH_data.head(5)

Unnamed: 0,timestamp,open,high,low,close,volume,close_time,quote_av,trades,tb_base_av,tb_quote_av,ignore
0,2020-08-19 00:00:00,421.92,423.56,421.51,423.55,1632.38697,1597795259999,689035.13322,511,998.48569,421432.15264,0.0
1,2020-08-19 00:01:00,423.56,424.27,423.56,423.98,909.17074,1597795319999,385519.232088,382,352.17966,149320.871212,0.0
2,2020-08-19 00:02:00,424.0,424.0,422.96,423.01,712.07169,1597795379999,301541.453675,305,174.70524,74014.364972,0.0
3,2020-08-19 00:03:00,423.0,423.02,422.56,422.68,680.10097,1597795439999,287561.429434,264,228.55088,96630.8249,0.0
4,2020-08-19 00:04:00,422.67,423.07,422.42,422.54,414.13931,1597795499999,175058.889421,315,153.82495,65014.349621,0.0


In [12]:
# Show data columns
ETH_data.columns

Index(['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time',
       'quote_av', 'trades', 'tb_base_av', 'tb_quote_av', 'ignore'],
      dtype='object')

In [13]:
# count registers
ETH_data.shape

(1817149, 12)

In [14]:
# Count the null values
ETH_data.isnull().sum(axis = 0)

timestamp      0
open           0
high           0
low            0
close          0
volume         0
close_time     0
quote_av       0
trades         0
tb_base_av     0
tb_quote_av    0
ignore         0
dtype: int64

### 2.3 Data Quality Analysis

#### After verify the BTC and ETH data, these quality issues were identified:

**istorical BTC data:**

  - From 4.857.377 registers in the file, 1.243.608 have null values
  - The time format is timestamp and needs transformation to make possible join data to ETH dataset


**Historical ETH data:**

   - From 1.817.149 registers in the files there are not null values
   - The time format is timestamp and needs transformation to make possible join data to ETH dataset

- Number of columns between datasets arr diferent, need to map atribbuites.
- Registers are not ordered, for the combined table need sorting.


**Mapping data fields**

    BTC fields                         ETF fields
    - Timestamp                        - timestamp 
    - Open                             - open 
    - High                             - high 
    - Low                              - low 
    - Close                            - close 
    - Volume_(BTC)                     - close_time 
    - Volume_(Currency)                - quote_av 
    - Weighted_Price                   - trades 
                                       - tb_base_av 
                                       - tb_quote_av 
                                       - ignore   

### 2.4 Data cleaning required

Input data requires execute this process:

    - Drop null values from staging tables for BTC data (Can't replace with cero values )
    - Only use fields that appear in both datasets
    - Timestamp need to be splitted into year, month, day, hour 
    - Data need to be ordered vy date keys

## 3. Define the Data Model

### 3.1 Conceptual Data Model


![conceptual mode](./conceptual_model-spark.png)


The basic model is consolidate table that works as the source for ML Models training.




    *Table: staging_btc

    *Columns:
        - Timestamp
        - Open
        - High
        - Low
        - Close
        - Volume_btc
        - Volume_currency
        - Weighted_price


    *Table: staging_eth

    *Columns:
        - timestamp
        - open
        - high
        - close
        - volume
        - close_time
        - quote_av
        - trades
        - tb_base_av
        - tb_quote_Av
        - ignore


    *Table: btc_timeseries

    *Columns:
        - timestamp
        - year
        - month
        - day
        - hour
        - btc_open
        - btc_high
        - btc_low
        - btc_close
        - btc_volume


    *Table: eth_timeseries

    *Columns:
        - timestamp
        - year (Partition Key)
        - month
        - day
        - hour
        - eth_open
        - eth_high
        - eth_low
        - eth_close
        - eth_volume


    *Table: crypto_timeseries

    *Columns:
        - year (Partition Key)
        - month
        - day
        - hour
        - btc_open
        - btc_high
        - btc_low
        - btc_close
        - btc_volume
        - eth_open
        - eth_high
        - eth_low
        - eth_close
        - eth_volume

### 3.2 Mapping Out Data Pipelines

  - Define the global variables in the configuration file (dl.cfg)
  - Read data from CSV files from INPUT_FILE Directory into Spark dataframe
  - Save Spark dataframes to staging parquet file 
  - Read BTC parket file and drop null values.
  - Transform BTC timestamp in to (year, month, day, hour)
  - Transform ETH timestamp in to (year, month, day, hour)
  - Join BTC and ETH spark dataframes
  - Save data to table crypto_timeseries
  - Run the Quality control check

## Step 4: Run Pipelines to Model the Data 

#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### 4.1.1 Create Spark session

In [15]:
# Create the spark session
spark = SparkSession.builder.getOrCreate()

#### 4.1.2 Read data from CSV files and write Spark DataFrames to parquet files

##### Create the BTC Data Staging file

In [16]:
# read BTC data to spark
btc_data_staging = spark.read.options(header='True', inferSchema='True').csv(INPUT_DATA_BTC_DIRECTORY)
btc_data_staging.printSchema()
btc_data_staging.show(5, truncate=False)

root
 |-- Timestamp: integer (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume_(BTC): double (nullable = true)
 |-- Volume_(Currency): double (nullable = true)
 |-- Weighted_Price: double (nullable = true)

+----------+----+----+----+-----+------------+-----------------+--------------+
|Timestamp |Open|High|Low |Close|Volume_(BTC)|Volume_(Currency)|Weighted_Price|
+----------+----+----+----+-----+------------+-----------------+--------------+
|1325317920|4.39|4.39|4.39|4.39 |0.45558087  |2.0000000193     |4.39          |
|1325317980|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |
|1325318040|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |
|1325318100|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |
|1325318160|NaN |NaN |NaN |NaN  |NaN         |NaN              |NaN           |
+----------+----+----+----+--

In [17]:
# BTC Rows Count 
btc_data_staging.count()

4857377

In [18]:
# Rename columns with not allowed symbols and write the parket file
btc_data_staging_temp = btc_data_staging.withColumnRenamed("Volume_(BTC)", "Volume_BTC") \
                                        .withColumnRenamed("Volume_(Currency)", "Volume_Currency")

btc_data_staging_temp.printSchema()
btc_data_staging_temp.write.mode("overwrite").parquet(output_data+"btcstaging.parquet")

root
 |-- Timestamp: integer (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume_BTC: double (nullable = true)
 |-- Volume_Currency: double (nullable = true)
 |-- Weighted_Price: double (nullable = true)



##### Create the ETH Data Staging file

In [19]:
# read ETH data to spark
eth_data_staging = spark.read.options(header='True', inferSchema='True').csv('data/eth_data/eth_data/')
eth_data_staging.printSchema()
eth_data_staging.show(5, truncate=False)

root
 |-- timestamp: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: double (nullable = true)
 |-- close_time: long (nullable = true)
 |-- quote_av: double (nullable = true)
 |-- trades: integer (nullable = true)
 |-- tb_base_av: double (nullable = true)
 |-- tb_quote_av: double (nullable = true)
 |-- ignore: double (nullable = true)

+----------------------+------+------+------+------+--------+-------------+-------------+------+----------+-------------+---------------+
|timestamp             |open  |high  |low   |close |volume  |close_time   |quote_av     |trades|tb_base_av|tb_quote_av  |ignore         |
+----------------------+------+------+------+------+--------+-------------+-------------+------+----------+-------------+---------------+
|2017-12-13 00:00:20.81|619.4 |621.99|615.11|615.12|34.17637|1513123280809|21085.5846554|42    |22.16912  |13678

In [20]:
# ETH Rows Count 
eth_data_staging.count()

1817149

In [21]:
# Write the parket file
eth_data_staging.write.mode("overwrite").parquet(output_data+"ethstaging.parquet")

#### 4.1.3 Clean null values from staging BTC

In [None]:
# Read parquet file for BTC data and drop null values
btc_data_staging = spark.read.parquet(output_data+"btcstaging.parquet")
btc_data_staging_temp = btc_data_staging.na.drop()
btc_data_staging_temp.show(5, truncate=False)
btc_data_staging_temp.count()

#### 4.1.4 Format BTC date , create btc_timeseries table and save parquet file

In [None]:
btc_data_staging_temp = btc_data_staging_temp.withColumn('year',year(to_timestamp('Timestamp')))
btc_data_staging_temp = btc_data_staging_temp.withColumn('month',month(to_timestamp('Timestamp')))
btc_data_staging_temp = btc_data_staging_temp.withColumn('day',dayofmonth(to_timestamp('Timestamp')))
btc_data_staging_temp = btc_data_staging_temp.withColumn('hour',hour(to_timestamp('Timestamp')))
btc_data_staging_temp = btc_data_staging_temp.withColumn('minute',minute(to_timestamp('Timestamp')))

In [None]:
btc_data_staging_temp.show(5, truncate=False)

In [None]:
# btc_timeseries table creation

btc_data_staging_temp.createOrReplaceTempView("btc_timeseries")
btc_timeseries_table = spark.sql("""
    SELECT  DISTINCT Timestamp    AS timestamp,
                     year         AS year, 
                     month        AS month, 
                     day          AS day, 
                     hour         AS hour, 
                     minute       AS minute,
                     Open         AS btc_open, 
                     High         AS btc_high, 
                     Low          AS btc_low,
                     Volume_BTC   AS btc_volume                     
    FROM btc_timeseries
    ORDER BY year, month, day, hour, minute
""")
btc_timeseries_table.printSchema()

In [None]:
# Write btc_timeseries_table to parquet file:
btc_timeseries_table.write.mode("overwrite").parquet(output_data+btc_table_filename)

#### 4.1.5 Format ETH date , create eth_timeseries table and save parquet file

In [None]:
# Read parquet file for ETH DATA
eth_data_staging_temp = spark.read.parquet(output_data+"ethstaging.parquet")
eth_data_staging_temp.show(5, truncate=False)
eth_data_staging_temp.count()

In [None]:
eth_data_staging_temp = eth_data_staging_temp.withColumn('year',year(to_timestamp('Timestamp')))
eth_data_staging_temp = eth_data_staging_temp.withColumn('month',month(to_timestamp('Timestamp')))
eth_data_staging_temp = eth_data_staging_temp.withColumn('day',dayofmonth(to_timestamp('Timestamp')))
eth_data_staging_temp = eth_data_staging_temp.withColumn('hour',hour(to_timestamp('Timestamp')))
eth_data_staging_temp = eth_data_staging_temp.withColumn('minute',minute(to_timestamp('Timestamp')))

In [None]:
eth_data_staging_temp.show(5, truncate=False)

In [None]:
# eth_timeseries table creation

eth_data_staging_temp.createOrReplaceTempView("eth_timeseries")
eth_timeseries_table = spark.sql("""
    SELECT  DISTINCT timestamp    AS timestamp,
                     year         AS year, 
                     month        AS month, 
                     day          AS day, 
                     hour         AS hour, 
                     minute       AS minute,
                     open         AS eth_open, 
                     high         AS eth_high, 
                     low          AS eth_low,
                     volume       AS eth_volume                     
    FROM eth_timeseries
    ORDER BY year, month, day, hour, minute
""")
eth_timeseries_table.printSchema()

In [None]:
# Write btc_timeseries_table to parquet file:
eth_timeseries_table.write.mode("overwrite").parquet(output_data+eth_table_filename)

### 4.2 Join BTC and ETH data and create crypto_timeseries table

In [None]:
# Read BTC timeseries table data form parquet file
btc_timserie_table = spark.read.parquet(output_data+btc_table_filename)
btc_timserie_table.show(5, truncate=False)

In [None]:
# Read ETH timeseries table data form parquet file
eth_timserie_table = spark.read.parquet(output_data+eth_table_filename)
eth_timserie_table.show(5, truncate=False)

In [None]:
# Join BTC and ETH tables by year, month, day, hour adn minute keys
crypto_timeseries_spark = btc_timserie_table.join(eth_timserie_table, \
                                        (btc_timserie_table.year ==  eth_timserie_table.year))

In [None]:
crypto_timeseries_spark.show(5, truncate=False)

In [None]:
btc_timserie_table.createOrReplaceTempView("BTC_TABLE")
eth_timserie_table.createOrReplaceTempView("ETH_TABLE")

crypto_timeseries_spark = spark.sql("""
                SELECT DISTINCT B.year,
                                B.month,
                                B.day,
                                B.hour,
                                B.minute,
                                B.btc_open,
                                B.btc_high,
                                B.btc_low,
                                B.btc_volume,                                
                                E.eth_open,
                                E.eth_high,
                                E.eth_low,
                                E.eth_volume
                FROM BTC_TABLE B INNER JOIN ETH_TABLE E 
                ON B.year == E.year 
                AND B.month == E.year
                AND B.day == E.day
                AND B.hour == E.hour                
                AND B.minute == E.minute                
                ORDER BY year, month, day, hour, minute
                """)
crypto_timeseries_spark.printSchema()

In [None]:
# Count rows from crypto_timeseries_table
crypto_timeseries_spark.show(5)

In [None]:
# Write crypto_timeseries_table to parquet file:
crypto_timeseries_spark.write.mode("overwrite").parquet(output_data+crypto_timeseries_table)

#### 4.3 Data Quality Checks

**Data quality checks:**
 * Check that all primary and secondary keys in star schema dimension and fact tables have values.
 * Check that all tables have more than 0 rows.

In [None]:
round_ts = start_time
results = { "round_ts": round_ts,
            "staging_btc": 0,
            "staging_eth": "",
            "countries_count": 0,
            "countries": "",
            "airports_count": 0,
            "airports": "",
            "time_count": 0,
            "time": "",
            "immigrations_count": 0,
            "immigrations": ""}

##### 4.3.1 Quality checks for staging_btc table

In [30]:
# verify table is not empty and key don' have null values
btc_data_staging_check = spark.read.parquet(output_data+"btcstaging.parquet")
btc_data_staging_check.select(count(col('timestamp'))).show()
btc_data_staging_check.select(count(when(isnan('timestamp') | col('timestamp').isNull(), 'timestamp'))).show()

+----------------+
|count(timestamp)|
+----------------+
|         4857377|
+----------------+

+-----------------------------------------------------------------------------+
|count(CASE WHEN (isnan(timestamp) OR (timestamp IS NULL)) THEN timestamp END)|
+-----------------------------------------------------------------------------+
|                                                                            0|
+-----------------------------------------------------------------------------+



##### 4.3.2 Quality checks for staging_eth table

In [31]:
# verify table is not empty and key don' have null values
eth_data_staging_check = spark.read.parquet(output_data+"ethstaging.parquet")
eth_data_staging_check.select(count(col('timestamp'))).show()
eth_data_staging_check.select(count(when(col('timestamp').isNull(), 'timestamp'))).show()

+----------------+
|count(timestamp)|
+----------------+
|         1817149|
+----------------+

+-------------------------------------------------------+
|count(CASE WHEN (timestamp IS NULL) THEN timestamp END)|
+-------------------------------------------------------+
|                                                      0|
+-------------------------------------------------------+



In [None]:
##### 4.3.1 Quality checks for staging_btc table


In [None]:
if admissions_table_check1.collect()[0][0] > 0 & admissions_table_check2.collect()[0][0] < 1:
    results['admissions_count'] = admissions_table_check2.collect()[0][0]
    results['admissions'] = "NOK"
else:
    results['admissions_count'] = admissions_table_check2.collect()[0][0]
    results['admissions'] = "OK"

print(f"RESULTS: {results}")

#### 4.2.2 Quality checks for countries table

In [None]:
# Check that key fields have valid values (no nulls or empty)
countries_table_df.createOrReplaceTempView("countries_table_DF")
countries_table_check1 = spark.sql("""
    SELECT  COUNT(*)
    FROM countries_table_DF
    WHERE   country_code IS NULL OR country_code == ""
""")
countries_table_check1.show(1)
countries_table_check1.collect()[0][0]

In [None]:
# Check that table has > 0 rows
countries_table_df.createOrReplaceTempView("countries_table_DF")
countries_table_check2 = spark.sql("""
    SELECT  COUNT(*)
    FROM countries_table_DF
""")
countries_table_check2.show(1)
countries_table_check2.collect()[0][0]

In [None]:
if countries_table_check1.collect()[0][0] > 0 & countries_table_check2.collect()[0][0] < 1:
    results['countries_count'] = countries_table_check2.collect()[0][0]
    results['countries'] = "NOK"
else:
    results['countries_count'] = countries_table_check2.collect()[0][0]
    results['countries'] = "OK"

print(f"RESULTS: {results}")

#### 4.2.3 Quality checks for airports table

In [None]:
# Check that key fields have valid values (no nulls or empty)
airports_table_df.createOrReplaceTempView("airports_table_DF")
airports_table_check1 = spark.sql("""
    SELECT  COUNT(*)
    FROM airports_table_DF
    WHERE   airport_id IS NULL OR airport_id == "" OR
            airport_name IS NULL OR airport_name == ""
""")
countries_table_check2.show(1)
countries_table_check2.collect()[0][0]

In [None]:
# Check that table has > 0 rows
airports_table_df.createOrReplaceTempView("airports_table_DF")
airports_table_check2 = spark.sql("""
    SELECT  COUNT(*)
    FROM airports_table_DF
""")
airports_table_check2.show(1)
airports_table_check2.collect()[0][0]

In [None]:
if airports_table_check1.collect()[0][0] > 0 & airports_table_check2.collect()[0][0] < 1:
    results['airports_count'] = airports_table_check2.collect()[0][0]
    results['airports'] = "NOK"
else:
    results['airports_count'] = airports_table_check2.collect()[0][0]
    results['airports'] = "OK"

print(f"RESULTS: {results}")

#### 4.2.4 Quality checks for time table

In [None]:
# Check that key fields have valid values (no nulls or empty)
time_table_df.createOrReplaceTempView("time_table_DF")
time_table_check1 = spark.sql("""
    SELECT  COUNT(*)
    FROM time_table_DF
    WHERE   arrival_ts IS NULL OR arrival_ts == ""
""")
time_table_check1.show(1)
time_table_check1.collect()[0][0]

In [None]:
# Check that table has > 0 rows
time_table_df.createOrReplaceTempView("time_table_DF")
time_table_check2 = spark.sql("""
    SELECT  COUNT(*)
    FROM time_table_DF
""")
time_table_check2.show(1)
time_table_check2.collect()[0][0]

In [None]:
if time_table_check1.collect()[0][0] > 0 & time_table_check2.collect()[0][0] < 1:
    results['time_count'] = time_table_check2.collect()[0][0]
    results['time'] = "NOK"
else:
    results['time_count'] = time_table_check2.collect()[0][0]
    results['time'] = "OK"

print(f"RESULTS: {results}")

#### 4.2.1 Quality checks for immigrations table

In [None]:
#immigrations_table_path = "data/output_data/immigrations_table.parquet_2019-08-15-12-22-26-417652"
#print(f"OUTPUT: {immigrations_table_path}")
#immigrations_table_df = spark.read.parquet(immigrations_table_path)

In [None]:
immigrations_table_df.count()

In [None]:
# Check that key fields have valid values (no nulls or empty)
immigrations_table_df.createOrReplaceTempView("immigrations_table_DF")
immigrations_table_check1 = spark.sql("""
    SELECT  COUNT(*)
    FROM immigrations_table_DF
        WHERE   immigration_id IS NULL OR immigration_id == "" OR
                arrival_time IS NULL OR arrival_time == "" OR
                arrival_year IS NULL OR arrival_year == "" OR
                arrival_month IS NULL OR arrival_month == "" OR
                airport_id IS NULL OR airport_id == "" OR
                country_code IS NULL OR country_code == "" OR
                admission_nbr IS NULL OR admission_nbr == ""
""")
immigrations_table_check1.show(1)
immigrations_table_check1.collect()[0][0]

In [None]:
# Check that table has > 0 rows
immigrations_table_df.createOrReplaceTempView("immigrations_table_DF")
immigrations_table_check2 = spark.sql("""
    SELECT  COUNT(*)
    FROM immigrations_table_DF
""")
immigrations_table_check2.show(1)
immigrations_table_check2.collect()[0][0]

In [None]:
if immigrations_table_check1.collect()[0][0] > 0 & immigrations_table_check2.collect()[0][0] < 1:
    results['immigrations_count'] = immigrations_table_check2.collect()[0][0]
    results['immigrations'] = "NOK"
else:
    results['immigrations_count'] = immigrations_table_check2.collect()[0][0]
    results['immigrations'] = "OK"

print(f"RESULTS: {results}")

#### 4.3 Data dictionary 








## Step 5: Complete Project Write Up

* _Clearly state the rationale for the choice of tools and technologies for the project._
* _Propose how often the data should be updated and why._
* _Write a description of how you would approach the problem differently under the following scenarios:_
 * _The data was increased by 100x._
 * _The data populates a dashboard that must be updated on a daily basis by 7am every day._
 * _The database needed to be accessed by 100+ people._
 
**Rationale for the tools selection:**
* Python, Pandas and Spark were natural choises to process project's input data since it contains all necessary (and easy to use) libraries to read, clean, process, and form DB tables.
* Since the data set was still limited, local and server storage was used in storing, reading, writing the input and output data. 
* Input data could have been stored in AWS without big problems (excluded in this project). 
* Output data could have been easily written to AWS after processing (excluded in this project). Experiences have shown that it's better to write parquet files locally first and only after that write them to cloud storage (as a bulk oparation) to avoid delays and extra costs caused by AWS S3.

**How often ETL script should be run:**
* ETL script should be run monthly basis (assuming that new I94 data is available once per month).

**Other scenarions (what to consider in them):**
* Data is 100x: 
    * Input data should be stoted in cloud storage e.g. AWS S3
    * Clustered Spark should be used to enable parallel processing of the data.
    * Clustered Cloud DB e.g. AWS Redshift should be used to store the data during the processing (staging and final tables).  
    * Output data (parquet files) should be stored to Cloud storage e.g. AWS S3 for easy access or to a Cloud DB for further analysis. AWS Redshift is very expensive for storing the data, so maybe some SQL DB (e.g. AWS RDS) should be used. 
    
* Data is used in dashboard and updated every day 07:00AM:
    * ETl script should be refactored to process only the changed inout information instead of processing all the inout files as it does now to minimise the used time and comouting resources.
    * Output data should be stored and updated in a Cloud DB (e.g. AWS RDS) to make it available all times for the dashboard.
    * Possibly this "always available" DB (serving the dashboard) would contain a latest sub-set of all available data to make it fast perfoming and easier to manage.

* DB is accessed by 100+ people:
    * Output data should be stored in a Cloud DB (e.g. AWS RDS) to make it "always available" for further analysis. Tools should be provided for the end-users to access the output DB. 
    * Potentially, some new tables could be created to serve the most used queries better.

**Potential further work:** 
* ETL pipeline script could be re-factored
    * make it more modular (split functions to separate files/classes)
    * combine functions to have fewer, more general purpose functions instead of several specific function per ETL steps 
    
* IATA airport data could be (semi-manually) mapped to I94 airport data to add more value for the analysis and enable further data merges.

* Other data e.g. daily weather data could be combined as inout data to provide insights about the weather immigrants experienced when they entered US. 

    