# Data Engineering Capstone Project

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [10]:
# Do all imports and installs here
!pip install pyarrow
!pip install fastparquet
from datetime import timedelta, datetime
import pandas as pd
import boto3
import configparser
from botocore.exceptions import ClientError
#import pyarrow
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import col,udf,date_format
from pyspark.sql import SparkSession
import re






## Step 1: Scope the Project and Gather Data

### Project plan and end goal

The scope of the project is to gain insights on immigration to the United States based on source data from few different sources. The source data will be linked together through a relational data model and go through a ETL process and reside into a AWS Redshift analytics database from where it can be queried for analytics and insights can be produced in the form of visual dashboards. 

In addition to that some of the data may need to be accessed more frequently than others and require to be queried by management for quick high level information therefore a copy of that data from AWS Redshift can then be put together into one table in the form of a curated dataset which will be stored and can be queried using AWS Athena which is much cheaper and more suitable to this use case. This data can then also be quickly transformed to visual dashboards in a very user friendly manner using AWS Quicksight which is already integrated to AWS Athena.

### Data sources


### Tools

The tools that were used in this project are as follows:

* Python
* Spark
* AWS S3
* AWS Redshift
* Apache Airflow

### Describe and Gather Data 

* I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. This dataset contains personal metadata information about immirgrants such as their gender, birth year etc. and information about their immigration to the Untied States such as (visatype, which airline they came to US with and which airport they landed in etc.). This is the main dataset and there is a file for each month of the year of 2016 available in the Directory `../../data/18-83510-I94-Data-2016/`. Combined, the 12 datasets have got more than 40 million rows and 28 columns. For most of the work I used only the dataset for the month of April of 2016 which has more than three million records.

* World Temperature Data: [This](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) dataset came from Kaggle. This dataset contains information about temperature across different cites of different countries in the world.
* U.S. City Demographic Data: [This](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/) data comes from OpenSoft. You can read more about it here. This dataset contains infromation about demographics of different cities within different states of US.
* Airport Code Table: [This](https://datahub.io/core/airport-codes#data) is a simple table of airport codes and corresponding cities. It comes from here. This dataset contains information about different IATA airport codes from around the world.
* I94_SAS_Labels_Descriptions - This is a SAS file that contains some more details about the Immigration data which can be used to gain useful insights. For example, with these label descriptions one can gain information about the country of birth and residence of the immigrant.


In [11]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

# Read in the data here

# Read immigration data in from the S3 bucket and load into a Spark dataframe
immigrationdf_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
sas_labels = pd.read_csv('SAS_Labels.csv')

# Read the data for World temperature data and load into a Spark dataframe
world_temperature_file_path = '../../data2/GlobalLandTemperaturesByCity.csv'
world_temperature_df = spark.read.format("csv").option("header", "true").load(world_temperature_file_path)

# Read the data for US city demographics and load into a Spark dataframe
us_cities_demographicsdf = spark.read.format("csv").option("sep", ";").option("header", "true").load("us-cities-demographics.csv")

# Read the data for airport codes into a Spark dataframe
airport = spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")


In [12]:
def preprocess_sas_labels(sas_labels):
    """
    This function takes in the pandas dataframe loaded with sas label descriptions giving us information about how a country code relates to an actual country
    and state codes relate to an actual US State. The function then performs various cleaning activites on the data and writes it back into two .csv files, 
    one for country and one for state.
    
    :param :sas_labels: Dataframe containing information about country codes and state codes with corresponding meaning of the codes
    :return :None
    """
    Countries = sas_labels['Country'].str.split(pat = "=", expand=True)
    States = sas_labels['State'].str.split(pat = "=", expand=True)
    States.columns = ['StateCode', 'StateName']
    Countries.columns = ['CountryCode','CountryName']
    Countries.CountryCode = Countries.CountryCode.str.strip()
    States.StateCode = States.StateCode.str.strip()
    for i in range(0,States.StateCode.shape[0]):
        try:
            States['StateCode'][i] = States['StateCode'][i][1:-1]    
        except:
            continue
    Countries.to_csv('countries.csv')
    States.to_csv('states.csv')

In [13]:
preprocess_sas_labels(sas_labels)

In [14]:
# Load in the cleaned sas_labels files
df_countries = spark.read.format("csv").option("header", "true").load('countries.csv')
df_states = spark.read.format("csv").option("header", "true").load('states.csv')

In [15]:
# SAS datetime conversion
def etl_immigration_data(immigrationdf_spark, df_countries, df_states):
    """
    This is the main function that extracts, transforms and cleans the immigration data using Spark.
    
    :param  :immigrationdf_spark: This is the dataframe containing information about the immigration dataset
    :param  :df_countries: This is the dataframe containing information about countries and their codes
    :param  :df_states: This is the dataframe containing information about US states and their codes
    :return :immigrationdf_spark: This is the updated immigrationdf_spark dataframe
    """
    convert_sas_udf = udf(lambda x: x if x is None else (timedelta(days=x) + datetime(1960, 1, 1)).strftime("%Y-%m-%d"))
    cols = ['cicid','i94yr','i94mon','i94mode','i94bir','i94visa','i94cit','i94res']
    dict1 = {'CountryCode':'i94cit','CountryCode':'i94res'}
    dict2 = {'StateCode':'i94addr'}
    
    def convert_sas_date(df, cols):
        """
        Convert dates in the SAS datatype to a date in a string format YYYY-MM-DD

        :param :df: Spark dataframe to be processed.             
        :param :cols: Column in the SAS date format to be convert    
        :return :df: Updated Spark dataframe with string dates
        """
        df = df.withColumn(cols, convert_sas_udf(df[cols]))
        return df
    # Convert the below columns to Integer

    def data_type_conversion(df,cols):
        """
        This function is used to convert all the floating point values to integers and strings as appropriate.

        :param :df: This is the dataframe containing immigration information
        :param :cols: This is the list of columns whose data type needs to be converted
        :return :df: returns the input dataframe with updated datatypes in specific columns
        """
        for col in cols:
            df = df.withColumn(col, df[col].cast(IntegerType()))
        # Convert the below columns to string
        df = df.withColumn('i94cit', df['i94cit'].cast(StringType()))
        df = df.withColumn('i94res', df['i94res'].cast(StringType()))
        return df
    
    def join_sas_label_data(df1,df2,dict):
        """
        This function loads in data from SAS labels file and compares the labels against codes in us_immigration_df to provide 
        more meaningful values.

        :param :df1: us_immmigration_df: This is the dataframe containing immigration information  
        :param :df2: This is a SAS file containing the corresponding values for each of the codes for country, states, port etc.
        :param :dict: A dictionary containing key value mappings of columns in df1 and df2
        :return :df1: returns the updated input dataframe 
        """
    
        for key,value in dict.items():
            split_list = re.findall('[A-Z][^A-Z]*', key)
            df1 = df1.join(df2, df2[key] == df1[value], "left")
            columns_to_drop = ['_c0', key, value]
            df1 = df1.drop(*columns_to_drop)
            df1 = df1.withColumnRenamed(split_list[0] + "Name",value)        
        return df1
        
    immigrationdf_spark = data_type_conversion(immigrationdf_spark,cols)
    immigrationdf_spark = join_sas_label_data(immigrationdf_spark,df_countries,dict1)
    immigrationdf_spark = join_sas_label_data(immigrationdf_spark,df_states,dict2)
    immigrationdf_spark = convert_sas_date(immigrationdf_spark,'arrdate')
    immigrationdf_spark = convert_sas_date(immigrationdf_spark,'depdate')
    unwanted_columns = ['visapost', 'occup', 'entdepu', 'insnum','dtadfile','biryear','dtaddto','admnum','count' ]
    immigrationdf_spark =  immigrationdf_spark.drop(*unwanted_columns)
    return immigrationdf_spark
    

In [16]:
immigrationdf_spark = etl_immigration_data(immigrationdf_spark, df_countries, df_states)
immigrationdf_spark.write.mode("Overwrite").parquet('immigration_data.parquet')
#immigrationdf_spark.write.parquet('immigration_data.parquet')

### Step 2: Explore and Assess the Data
#### Explore the Data 
* All the columns that had very little information such as the `visapost` column in immigration dataset and columns that had mostly null values such as the `occup` column, `entdepu` column were dropped as these columns provide no value when looking at bulk of data and only complicates the data model if kept.

* All columns that contain values but the information in them do not add any value for this project such as `elevation_ft` column in the airport dataset has also been dropped.

* The `dt` and temperature columns from the world temperature dataset were dropped as the temperature was only from 1743-2013 whereas the main dataset which is the immirgration dataset only contained information for the year 2016. However, the country, city and location co-ordinates were still kept as in future it maybe possible to link that to another dataset that contains useful information such as weather or map data for different cities or location co-ordinates.

* The airport dataset was completely left out because firstly, there were no common fields between the immigration dataset and the airport dataset, atleast not with relaible consistency and even if we could join these two tables we would not be gaining any additional information. Secondly, it could be interesting to find out information about weather data near the airports by joining the world temperature dataset with the airport dataset as they both have a common field about city but considering that the the weather data is only valid till 2013 and the main goal of this project is to gain useful insight about immigration
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps

* All the SAS dates were converted to a valid date so that the immigrants real date of arrival and departure can be seen for ease of analytics.
* The `i94cit`,`i94res` and `i94addr` columns contained codes whose meanings were present in the SAS Labels Description file. Contents of this file was copied across to a `.csv` file which was then loaded into Pandas dataframe. A number of cleaning steps were performed here and a clean dataframe was written back to `.csv` files which was then loaded into Spark. This was done because the cleaning steps required were quite tedious and low level and on top of that the SAS labels file was of a very small size so using Spark to do the cleaning in this particular case would be inefficient.
* The codes and their meanings were initially loaded from SAS label csv file into the dataframe as one column and looked like `209 = Qatar`, therefore, they were separated into two columns.
* The codes also had double quotes around the single quotes which represented that they were string. The double quotes were removed otherwise it would not match against the code in the immigration dataset.
* The codes in the immigration dataset were floats so they needed to be converted to string.
* All column names that had white spaces and capitilization were renamed to ensure robustness when data is getting copied into the tables in redshift
* Joins were performed in the Spark dataframes to link the code in the immigration dataset with the actual meaning, for example, the column `i94cit` now contains values such as `Qatar`, `Germany` instead of 209.0 or 148.0

In [18]:
# Select only the valuable and useful columns from the world temperature dataset
unwanted_cols = ['dt', 'AverageTemperature', 'AverageTemperatureUncertainty']
world_temperature_df = world_temperature_df.drop(*unwanted_cols)
for col in world_temperature_df.columns:
    world_temperature_df = world_temperature_df.withColumnRenamed(col, col.lower())
world_temperature_df.write.mode("Overwrite").parquet('world_temperature_data.parquet')

In [20]:
## Rename the column names as white spaces and capitilization of column names may break the code when inserting values into Redshift tables
old_columns = ['City','State','Median Age','Male Population','Female Population','Total Population','Foreign-born', 'Average Household Size','State Code','Race','Count']
new_columns = ['city','state','medianage','malepopulation','femalepopulation','totalpopulation','foreignborn','averagehouseholdsize','statecode','race', 'countofrace']
unwanted_columns = ['Number of Veterans']
us_cities_demographicsdf = us_cities_demographicsdf.drop(*unwanted_columns)
for i in range(0,len(old_columns)):        
    us_cities_demographicsdf = us_cities_demographicsdf.withColumnRenamed(old_columns[i], new_columns[i])
us_cities_demographicsdf.write.mode("Overwrite").parquet('us_demographic_data.parquet')


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

<img src="https://github.com/arahman5/udacity-capstone-project/blob/master/model_capstone.PNG">


The above represents the entity relationship diagrom for this project and below are the reasons for choosing a relational data model with star schema

* The data is structured and consistent as we know how the data is stored inside the datasets and what are their data types. 
* Easier to change to business requirements and flexibility in queries as the analytics team would want to perform ad-hoc queries to get a better understanding of the data
* The analytics team would want to be able to do aggregations and analytics on the data.
* The ability to do JOINs would be very useful here due to the way data is coming from different sources
* Star schema supports denormalization of the data, which would be quite useful in this analytics case as this will allow the analytics team to execute simple queries and fast aggregations on the data. 
* Star schema supports one to one mapping, which is easy to implement and works very well in this case due to the small number of columns in the tables. 

#### 3.2 Mapping Out Data Pipelines

* The input data from S3 bucket first goes through ETL Processing in Python using Apache Spark and data is then stored as `.parquet files` in S3.
* Data from Parquet files are copied across to tables on AWS Redshift for analytics team to perform analytics.
* Apache Airflow is used to automatically schedule and monitor the data loading into the Redshift database.


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

Apache Airflow is used to connect to the redshift cluster after the below cell is executed and airflow then starts the process of loading data into the tables in the database within the cluster from the `.parquet` files within the S3 bucket. Launch Airflow Scheduler and Airflow webserver from the terminal and run the DAG `capstone_project`.

In [None]:
#Set up and Configure redshift cluster
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)
    
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']

try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

# Delete the cluster
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)


#### 4.2 Data Quality Checks

* All the tables have a Primary key constraint and a not null constraint on any foreign keys in the table.
* Count check is peformed to check whether there were actually any data loaded into the table.
* Count check is performed to count the no. of rows inserted into the table.



#### 4.3 Data dictionary 

| Column Name | Description | Source |
| :--- | :--- | :--- |
| CICID | ID that uniquely identify one record in the dataset | Immigration dataset
| I94YR | 4 digit year | Immigration dataset
| I94MON | Numeric month | Immigration dataset
| I94CIT | 3 digit code of source city for immigration (Born country) | Immigration dataset
| I94RES | 3 digit code of source country for immigration (Residence country) | Immigration dataset
| I94PORT | Port addmitted through | Immigration dataset
| ARRDATE | Arrival date in the USA | Immigration dataset
| I94MODE | Mode of transportation | Immigration dataset
| I94ADDR | State of arrival | Immigration dataset
| DEPDATE | Departure date | Immigration dataset
| I94BIR | Age of Respondent in Years | Immigration dataset
| I94VISA | Visa codes  | Immigration dataset
| ENTDEPA | Arrival Flag. Whether admitted or paroled into the US | Immigration dataset
| ENTDEPD | Departure Flag. Whether departed, lost visa, or deceased | Immigration dataset
| MATFLAG | Match flag | Immigration dataset
| GENDER | Gender | Immigration dataset
| AIRLINE | Airline used to arrive in U.S. | Immigration dataset
| FLTNO | Flight number of Airline used to arrive in U.S. | Immigration dataset
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. | Immigration dataset    
| City | City Name | World temperature dataset
| Country | Country Name | World temperature dataset
| Latitude | Latitude | World temperature dataset
| Longitude | Longitude | World temperature dataset
| State | US state of the city | US Demographic dataset
| Median Age | The median of the age of the population | US Demographic dataset
| Male Population | Number of the male population | US Demographic dataset
| Female Population | Number of the female population | US Demographic dataset
| Total Population | Number of the total population | US Demographic dataset
| Foreign-born | Number of residents of the city that were not born in the city | US Demographic dataset
| Average Household Size | Average size of the houses in the city | US Demographic dataset
| State Code | Code of the state of the city | US Demographic dataset
| Race | Race class | US Demographic dataset
| Count | Number of individual of each race | US Demographic dataset

#### Step 5: Complete Project Write Up

### Choice of tools
* Spark - Spark was used for the ETL Process as that's the most process intensive part of the pipeline and Spark is made for dealing with that. Using basic python data stack libraries such as Pandas it would take ~5 minutes to just read in only one of the immigration dataset since they are really huge and takes similar time to write the data out after cleaning it. Whereas, using spark it takes only 30 seconds to perform read and write each separately. In addition to that, the PySpark library is well developed and can be used to perform pretty much everything the usual python data stack libraries perform when it comes to extracting and cleaning the data, however, it does require more effort from the data engineers perspective. But the effort is well worth it as it provides a more faster, robust and scalable data pipeline solution. 

* S3 was chosen as the storage area for the files because it provides a relatively cheap data storage solution and it is very easy to use as there are python APIs available such as the library `boto3` also can be accessed easily via the AWS CLI. In addition to that, S3 has high availability as there will always be more storage area available immediately just we need to pay for more resource. AWS Identity and access management makes it very secure as well.

* Redshift was chosen as the main analytics database because the data is already stored in the cloud in AWS. In addition to that, Redshift provides a massively parallel, column-oriented data warehouse which works perfectly for a columnar data format like `.parquet`. All the mainstream analytical tools such as Tableau, Rapidminer, Python etc. is able to load data from the Redshift database.

* Apache Airflow was chosen as the main tool for scheduling and monitoring the loading of the data into Redshift as it has a lot of built in functions which are very useful, such as ability to set up task dependency to ensure everything is automated, ability to connect to AWS services (S3 and redshift), ability to notify in the case a task fails and also able to automatically retry the task without any user intervention.

### Updating the data

The data should be updated based on the frequency of its ingestion. Since immigration dataset is the main dataset in this project, the update frequency of the database should be based on how often new data is available for the Immigration dataset. In this case, it should be a monthly update as new files are generated every month.

### Data increased by 100x

If the data was increased by 100x then this wouldn't be a problem as AWS S3 is highly scalable and scale upto match whatever the data storage needs are. Apache Spark can run on AWS EMR so it can be scaled easily as required to deal with the ETL pipeline. Amazon Redshift is a data warehouse that can expand to exabyte-scale so loading the data will not be a problem either. 

### Dashboard update at 7am

The airflow schedule can be tuned to run the DAG for this project everyday so that all required data is loaded into Redshift the night before and a live Dashboard connected to the Database via Tableau or AWS QuickSight will then be updated by 7am every day.

### Access by 100+ people 
AWS Redshift is a highly scalable and available database with a very fast query engine that was designed specifically for Online Analytical Processing (OLAP), therefore, this will not be a problem at all.