# Project Title
## Data Engineering Capstone Project

### Project Summary
* The goal of this project is to evaluate the impact of weather's temperature on immagrants movements over April, 2016 in USA
* Apache Spark is used to extract and transform raw data, and make a datawarehouse in parquet file format. 
* The star schema is used to develop a database, which will be effectively used for handling analytical queries.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
-- Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>
* This project extracts raw data from two sources as described below. 
* It creates a datamodel of immagrants' movement in US consistsing of one fact tables referencing two dimension tables.

#### Describe and Gather Data 
-- Describe the data sets you're using. Where did it come from? What type of information is included? 

##### I94 Immigration Data: 
* This data comes from the [US National Tourism and Trade Office](https://travel.trade.gov/research/reports/i94/historical/2016.html).
* Dataset includes infoes on individual incomming immigrants and thei ports on entry. 
* Data dictionary: I94_SAS_Labels_Descriptions.SAS
* Sample file: immigration_data_sample.csv
* Columns:
    * 'i94yr   ':  '4 digit year',
    * 'i94mon  ':  'Numeric month',
    * 'i94addr ':  'where the immigrants resides in USA',

##### World Temperature Data:
* This dataset comes from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
* Dataset includes infoe on temprature on cities globally. This project only uses data of US cities.
* Columns:
    * 'dt',
    * 'AverageTemperature',
    * 'City',
    * 'Country',

In [1]:
# do all imports and installs here
import pandas as pd
import logging
logging.getLogger().setLevel(logging.INFO)
import datetime
import pyspark.sql.types as T
import pyspark.sql.functions as F
import pyspark.sql.dataframe as psd
import pyspark.sql.session as pss
import re
from pyspark.sql import SparkSession
from utils.city_code import city_code
import os

In [2]:
# define reader class to handel reading and writting data
class Reader:
    """[Reader class loads raw data for further usage. It is written with Singleton pattern to prevent loading data
    multiple time, as the process is very slow.]

    Returns:
        [type]: [the instance of class]
    """
    _instance = None
    
    def __init__(self, *args, **kwargs):
        pass
    
    def __new__(cls, *args, **kwargs):
        """[initializes a new instance of class if not existed. It initializes parameters, creates a spark session, loads data]

        Returns:
            [type]: [inputpath, output path, spark session, and data]
        """
        if not cls._instance:
            # create inatance
            spark_reader = super(Reader, cls).__new__(cls, *args, **kwargs)
            # fit parameters and data
            spark_reader.fit()
            cls._instance = spark_reader
        
        return cls._instance
        
    def parameters(self) -> str:
        """[initializes necessary parameters]

        Returns:
            [str]: [path to read and write data]
        """
        # data sources
        paths = {
                "i94immigration" : "./datasources/i94immigration",
                "worldtemperature" : "./datasources/worldtemperature"
                }
        return paths
        
    def create_spark_session(self) -> pss.SparkSession: 
        """[initialize a spark session]

        Returns:
            [pyspark.sql.session.SparkSession]: [created spark session]
        """
        # build spark session
        spark = SparkSession \
            .builder \
            .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
            .enableHiveSupport() \
            .getOrCreate()

        return spark

    def from_anyformat_to_parquet(self):
        # Read raw data and load them into parquet files
        if not os.path.exists(self.paths['i94immigration']):
            logging.info("start transfering i94 immigration raw data")
            df_immigration_spark = self.spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
            df_immigration_spark.write.parquet(self.paths['i94immigration'])
            logging.info("i94 immigration raw data are written into parquet files")
        else:
            logging.info("i94 immigration raw data are are already written into parquet files")
            
        if not os.path.exists(self.paths['worldtemperature']):
            logging.info("start transfering world temperature raw data")
            df_worldtemperature_spark =self.spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv', header=True)
            df_worldtemperature_spark.write.parquet(self.paths['worldtemperature'])
            logging.info("world temperature raw data are written into parquet files")
        else:
            logging.info("world temperature raw data are are already written into parquet files")
            
    
    def load_i94immigration_data(self) -> psd.DataFrame:
        """[loads i94 immigration data]

        Returns:
            [pyspark.sql.dataframe.DataFrame]: [i94 immigration data as spark dataframe]
        """
        # read immigration data
        immigration_df = self.spark.read.parquet(self.paths['i94immigration'])

        return immigration_df


    def load_worldtemperature_data(self) -> psd.DataFrame:
        """[loads world temperature data]

        Returns:
            [pyspark.sql.dataframe.DataFrame]: [world temperature data as spark dataframe]
        """
        # read world temprature data
        temperature_df = self.spark.read.parquet(self.paths['worldtemperature'])

        return temperature_df
    
    def fit(self):
        """[fits necessary parameters, creats spark session, and loads i94immigration and worldtemperature data.]

        Returns:
            [type]: []
        """

        logging.info("geting root paths to read and write data")
        self.paths = self.parameters()
        
        logging.info("getting spark session")
        self.spark = self.create_spark_session()
        
        logging.info("stage raw data into parquet files if not already exist!!!")
        self.from_anyformat_to_parquet()
        
        logging.info("Start fitting i94immigration data to reader class ...")
        self.immigration_df = self.load_i94immigration_data()
        print("End fitting i94immigration data to reader class!")
        
        print("Start fitting worldtemperature data to reader class ...")
        self.temperature_df = self.load_worldtemperature_data()
        print("End fitting worldtemperature data to reader class!")
        
        return True

In [3]:
reader = Reader()

INFO:root:geting root paths to read and write data
INFO:root:getting spark session
INFO:root:stage raw data into parquet files if not already exist!!!
INFO:root:i94 immigration raw data are are already written into parquet files
INFO:root:world temperature raw data are are already written into parquet files
INFO:root:Start fitting i94immigration data to reader class ...


End fitting i94immigration data to reader class!
Start fitting worldtemperature data to reader class ...
End fitting worldtemperature data to reader class!


In [4]:
reader.immigration_df.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [5]:
reader.temperature_df.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

## Data Wrangling: Immigration data

In [7]:
def filter_immigration_data():
    # select needed columns
    immigration_usa_df = reader.immigration_df.select(['i94yr', 'i94mon', 'i94addr', 'i94port'])
    return immigration_usa_df

In [8]:
immigration_usa_df.printSchema()

root
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- i94port: string (nullable = true)



In [9]:
immigration_usa_df.limit(10).toPandas()

Unnamed: 0,i94yr,i94mon,i94addr,i94port
0,2016.0,4.0,CA,LOS
1,2016.0,4.0,NV,LOS
2,2016.0,4.0,WA,LOS
3,2016.0,4.0,WA,LOS
4,2016.0,4.0,WA,LOS
5,2016.0,4.0,HI,HHW
6,2016.0,4.0,HI,HHW
7,2016.0,4.0,HI,HHW
8,2016.0,4.0,FL,HOU
9,2016.0,4.0,CA,LOS


In [16]:
# create view for immigration data
immigration_usa_df.createOrReplaceTempView('immigration_usa_table')

In [14]:
# count unique values in a given spark sql table
def value_counts(table, col):
    return reader.spark.sql(f"""
    select {col}, count({col}) as count
    from {table}
    group by {col}
    order by count
    """).show(20)

In [15]:
value_counts('immigration_usa_table', 'i94yr')
value_counts('immigration_usa_table', 'i94mon')
value_counts('immigration_usa_table', 'i94addr')
value_counts('immigration_usa_table', 'i94port')

+------+-------+
| i94yr|  count|
+------+-------+
|2016.0|3096313|
+------+-------+

+------+-------+
|i94mon|  count|
+------+-------+
|   4.0|3096313|
+------+-------+

+-------+-----+
|i94addr|count|
+-------+-----+
|   null|    0|
|     KF|    1|
|     52|    1|
|     71|    1|
|     S6|    1|
|     85|    1|
|     UL|    1|
|     RU|    1|
|     VL|    1|
|     RA|    1|
|     UR|    1|
|     ZN|    1|
|     TC|    1|
|     PD|    1|
|     YH|    1|
|     EX|    1|
|     RF|    1|
|     RO|    1|
|     73|    1|
|     FC|    1|
+-------+-----+
only showing top 20 rows

+-------+-----+
|i94port|count|
+-------+-----+
|    PHF|    1|
|    NC8|    1|
|    CNC|    1|
|    VNB|    1|
|    COO|    1|
|    ERC|    1|
|    CPX|    1|
|    PCF|    1|
|    LWT|    1|
|    NIG|    1|
|    MAI|    1|
|    RIO|    1|
|    HNN|    1|
|    YIP|    1|
|    ANA|    1|
|    SCH|    1|
|    BWM|    1|
|    REN|    1|
|    MND|    1|
|    BHX|    2|
+-------+-----+
only showing top 20 rows



* The above descriptive results show that both 'i94yr' and 'i94mon' columns are clean, but there are lots of invalid codes in 'i94addr' and 'i94port' columns.

In [17]:
def get_valid_ports():
    # Extract valid ports into a dictionary
    re_obj = re.compile("'(.*)'\s=\s'(.*)'") # '(\S{3})'\s=\s'(.*),\s{1}(\S{2}).*
    validPorts = {}
    with open('utils/port_code.py') as f:
         for data in f:
             match = re_obj.search(data)
             if match:
                 validPorts[match[1]] = match[2].strip()
    return validPorts

* We assumed that all immigrants came to the US on the first day of a given month.

In [22]:
def clean_immigration_data(validPorts):
    # select needed columns
    immigration_usa_df = reader.immigration_df.select(['i94yr', 'i94mon', 'i94addr', 'i94port'])  

    # cast cities that are valid
    valid_city_code = list(set(city_code.keys()))
    str_valid_city_code = str(valid_city_code).replace('[', '(').replace(']', ')')

    # cast ports that are valid
    valid_port_code = list(set(validPorts.keys()))
    str_valid_port_code = str(valid_port_code).replace('[', '(').replace(']', ')')


    clean_immigration_usa_df = reader.spark.sql(f"""
    select date(concat(cast(i94yr as int), '-', cast(i94mon as int), '-01')) as dt, cast(i94addr as varchar(2)), cast(i94port as varchar(3))
    from immigration_usa_table
    where i94yr is not null and i94mon is not null and i94addr is not null and i94port is not null and
    i94addr in {str_valid_city_code} and i94port in {str_valid_port_code} 
    """)
    
    return clean_immigration_usa_df

In [23]:
validPorts = get_valid_ports()
clean_immigration_usa_df = clean_immigration_data(validPorts)

In [24]:
clean_immigration_usa_df.printSchema()

root
 |-- dt: date (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- i94port: string (nullable = true)



In [25]:
# create a view of clean immigration data
clean_immigration_usa_df.createOrReplaceTempView('clean_immigration_usa_table')

In [258]:
value_counts('clean_immigration_usa_table', 'i94addr')
value_counts('clean_immigration_usa_table', 'i94port')

+-------+-----+
|i94addr|count|
+-------+-----+
|     99|   52|
|     VI|  226|
|     WY|  460|
|     SD|  557|
|     WV|  808|
|     ND| 1225|
|     MT| 1339|
|     VT| 1477|
|     AK| 1604|
|     ID| 1752|
|     MS| 1771|
|     NM| 1994|
|     ME| 2361|
|     NH| 2817|
|     AR| 2873|
|     DE| 3111|
|     KS| 3224|
|     OK| 3239|
|     RI| 3289|
|     IA| 3391|
+-------+-----+
only showing top 20 rows

+-------+-----+
|i94port|count|
+-------+-----+
|    FRI|    1|
|    VNB|    1|
|    HNN|    1|
|    NC8|    1|
|    SPO|    1|
|    RIO|    1|
|    CPX|    1|
|    MGM|    1|
|    BWM|    1|
|    NIG|    1|
|    ANA|    1|
|    LWT|    1|
|    PHF|    1|
|    YIP|    1|
|    MND|    1|
|    PSM|    2|
|    ADW|    2|
|    NOO|    2|
|    SGJ|    2|
|    MTH|    2|
+-------+-----+
only showing top 20 rows



## Data Wrangling: Temperature data

* We only focus on data from usa

* As the immigration data is from 2016 we only select temperature data as close to this year as possible.

In [4]:
def filter_temperature_data():
    temperature_df = reader.temperature_df
    temperature_usa_df = temperature_df.where(temperature_df.Country == 'United States')
    temperature_usa_df = temperature_usa_df.select(['dt', 'Country', 'City', 'AverageTemperature'])

    temperature_usa_df = temperature_usa_df.withColumn('dt', F.to_date('dt', 'yyyy-MM-dd'))
    temperature_usa_df = temperature_usa_df.where((temperature_usa_df['dt'] >= '2013-04-01') & (temperature_usa_df['dt'] < '2013-05-01'))
    
    return temperature_usa_df

In [5]:
temperature_usa_df = filter_temperature_data()

In [6]:
temperature_usa_df.groupBy('dt').count().show(100)

+----------+-----+
|        dt|count|
+----------+-----+
|2013-04-01|  257|
+----------+-----+



In [7]:
temperature_usa_df.printSchema()

root
 |-- dt: date (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)



In [8]:
temperature_usa_df.limit(10).toPandas()

Unnamed: 0,dt,Country,City,AverageTemperature
0,2013-04-01,United States,Abilene,15.752999999999998
1,2013-04-01,United States,Akron,9.691
2,2013-04-01,United States,Albuquerque,11.555
3,2013-04-01,United States,Alexandria,12.425
4,2013-04-01,United States,Allentown,9.723
5,2013-04-01,United States,Amarillo,12.954
6,2013-04-01,United States,Anaheim,15.380999999999998
7,2013-04-01,United States,Anchorage,-6.421
8,2013-04-01,United States,Ann Arbor,6.819
9,2013-04-01,United States,Antioch,15.996


In [9]:
temperature_usa_df.select(['AverageTemperature']).describe().show()

+-------+------------------+
|summary|AverageTemperature|
+-------+------------------+
|  count|               257|
|   mean| 13.75025680933851|
| stddev| 5.253205757503348|
|    min|            -0.591|
|    max| 9.722999999999999|
+-------+------------------+



In [None]:
for col in temperature_usa_df.columns: value_counts('temperature_usa_table', col)

* Adding i94port column to temprature dataframe. It is mapped from cleaned up immigration dataframe.

In [10]:
@F.udf
def get_port(city):
    for key in validPorts:
        if city.lower() in validPorts[key].lower():
            return key

In [13]:
def clean_temerature_usa_data(temperature_usa_df):
    temperature_usa_df = temperature_usa_df.withColumn('i94port', get_port(temperature_usa_df['City']))
    temperature_usa_df.createOrReplaceTempView('temperature_usa_table')

    clean_temperature_usa_df = reader.spark.sql("""
    select date(dt) as dt, cast(Country as varchar(13)), cast(City as string), round(AverageTemperature, 2) as AverageTemperature, cast(i94port as varchar(3))
    from temperature_usa_table
    where dt is not null and Country is not null and City is not null and  AverageTemperature is not null and i94port is not null
    """)
    
    return clean_temperature_usa_df
    

In [14]:
clean_temperature_usa_df = clean_temerature_usa_data(temperature_usa_df)

In [328]:
clean_temerature_usa_df.show(5)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model
* **Data model**:
    * The star schema is used as datamodel of this project.
    * It is a relational model contains one fact table named fact_table surrounded by two dimension tables named dim_immigration_table and dim_temperature_table.
    * It suits analytical queries and user can analyze the data with few number of joins.


* **Fact table** - dim_immigration table joined with the dim_temperature table on i94port and dt, Columns:
    * dt - timestamp of arrival,
    * i94port: 3 character code of destination USA city,
    * AverageTemperature: average temperature of destination city


* **Immigration dimension table**:
    * dt - timestamp of arrival,
    * i194addr: where the immigrants resides in USA (2 character code),
    * i94port = 3 character code of destination USA city


* **Temperature dimension table**:
    * dt: timestamp
    * AverageTemperature: average temperature
    * City: city name
    * Country: country name
    * i94port: 3 character code of destination city (extracted from i94-immigration data)



#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [284]:
# filter
immigration_usa_df = filter_immigration_data()

# create view for immigration data
immigration_usa_df.createOrReplaceTempView('immigration_usa_table')

# clean
validPorts = get_valid_ports()
clean_immigration_usa_df = clean_immigration_data(validPorts)

# load
clean_immigration_usa_df.write.mode("overwrite").partitionBy("i94port").parquet("/output_data/immigration.parquet")

In [331]:
# filter
temperature_usa_df = filter_temperature_data()

# clean
clean_temperature_usa_df = clean_temerature_usa_data(temperature_usa_df)

# load
clean_temperature_usa_df.write.mode("overwrite").partitionBy("i94port").parquet("/output_data/temprature.parquet")

In [None]:
# create a view of clean immigration data
clean_immigration_usa_df.createOrReplaceTempView('clean_immigration_usa_table')
# create
clean_temperature_usa_df.createOrReplaceTempView('clean_temperature_usa_table')
# fact
fact_df = spark.sql('''
select  im.dt                  AS dt,
        im.i94port             AS i94port,
        t.AverageTemperature   AS AverageTemperature
from clean_immigration_usa_table AS im
JOIN clean_temperature_usa_table AS t 
ON im.i94port = t.i94port
''')

# create
fact_df.createOrReplaceTempView('fact_table')

# load
fact_df.write.mode("overwrite").partitionBy("i94port").parquet("/output_data/fact.parquet")


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [370]:
# Perform quality checks here
from typing import List
import logging
logging.getLogger().setLevel(logging.INFO)
def data_quality_check(tables: List[str]=None):
    for table in tables:
        logging.info(f'Running data quality check on table {table}')

        logging.info(f'Getting number of entries in table {table}')
        records = spark.sql(f"""select count(*) as entries from {table}""").toPandas().entries.tolist()
        logging.info(f'Table {table} has {records} numbers of entries.')

        if not records or len(records) < 1 or records[0] < 1:
            self.log.error(f"Data quality check failed for table {table}")
            raise ValueError(f"Data quality check failed for table {table}")

        logging.info(f"Data quality check passed for table {table}")

In [378]:
data_quality_check(['clean_immigration_usa_table', 'clean_temperature_usa_table', 'fact_table'])

INFO:root:Running data quality check on table clean_immigration_usa_table
INFO:root:Getting number of entries in table clean_immigration_usa_table
INFO:root:Table clean_immigration_usa_table has [2917199] numbers of entries.
INFO:root:Data quality check passed for table clean_immigration_usa_table
INFO:root:Running data quality check on table clean_temperature_usa_table
INFO:root:Getting number of entries in table clean_temperature_usa_table
INFO:root:Table clean_temperature_usa_table has [117] numbers of entries.
INFO:root:Data quality check passed for table clean_temperature_usa_table
INFO:root:Running data quality check on table fact_table
INFO:root:Getting number of entries in table fact_table
INFO:root:Table fact_table has [2454343] numbers of entries.
INFO:root:Data quality check passed for table fact_table


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.