# Project Title
### Data Engineering Capstone Project

#### Project Summary

   The purpose of the project is to extract, clean, tranform, and load certain datasets into a new database schema (using spark). That is optimized to run queries by analytics team, in order to find the resons behind immigration to the US

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

from pyspark.sql.functions import year, month, dayofmonth, hour, dayofweek, date_format

# from pyspark.sql.functions import avg
# from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col
# from pyspark.sql.functions import monotonically_increasing_id
# from pyspark.sql.types import *


# importlib.reload(utility)
# from utility import visualize_missing_values, clean_immigration, clean_temperature_data
# from utility import clean_demographics_data, print_formatted_float

### Step 1: Scope the Project and Gather Data

#### Scope 
   In this project, we will use spark to load the requried our data scources into data frames. Then, we will clean the data to fit our data model and to make it ready for pipelining. Finally, we will use spark to pipeline cleaned data to the destination tables. The final database is optimized to run queries in order to find out the main factors contrbuting to the immigaration to the US.

#### Describe and Gather Data 
In this project, we will use three resources of data. The datasets are as follows:

1- I94 Immigration Data
* This data comes from the US National Tourism and Trade Office. It contains inforamtion about immigrants which was provided by immigrants themselfs at US customs. The data set is provided in SAS7BDAT format which is a binary database storage format. Explanation of some relevant attributes:
 * id: record id
 * i94yr: year
 * i94mon: month 
 * i94cit: origin city code
 * i94port: destination city code
 * arrdate: arrival date to the US 
 * i94mode: transportation mean code
 * depdate: departure date from the US
 * i94visa: reason for immigration
 * occup: occupation that will be performed in U.S
 * entdepd: Departure Flag - Departed, lost I-94 or is deceased

2- World Temperature Data
* This dataset came from Kaggle. It provides temperature information corresponding to city and date. Explanation of some relevant attributes:
 * AverageTemperature: average land temperature in celsius
 * AverageTemperatureUncertainty: 95% confidence interval around the average
 * City: city name
 * Country: country name
 * Latitude: city latitude
 * Longitude: city longitude
 
3- U.S. City Demographic Data
* This data comes from OpenSoft. This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. Explanation of some relevant attributes:
 * City: city name
 * State: US State where city is located
 * Median Age: median age of the population
 * Male Population: count of Male Population
 * Female Population: count of Female Population
 * Total Population: count of total Population
 * Number of Veterans: count of total Veterans
 * Foreign born: count of residents of the city that were not born in the city
 * Average Household Size: average city household size
 * State Code: code of the US state
 * Race: respondent race
 * Count: count of city's individual per race

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [4]:
immigration_df =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [5]:
immigration_df.head()

Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2')

In [6]:
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [7]:
immigration_df.count()

3096313

In [8]:
immigration_df.describe(["biryear"]).show()

+-------+------------------+
|summary|           biryear|
+-------+------------------+
|  count|           3095511|
|   mean|1974.2323855415148|
| stddev|17.420260534588262|
|    min|            1902.0|
|    max|            2019.0|
+-------+------------------+



In [9]:
#### Reading world temperature dataset ####

temperature_df = spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv', header=True, inferSchema=True)

In [10]:
temperature_df.head()

Row(dt=datetime.datetime(1743, 11, 1, 0, 0), AverageTemperature=6.068, AverageTemperatureUncertainty=1.7369999999999999, City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E')

In [11]:
temperature_df.show(5)

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [12]:
temperature_df.count()

8599212

In [13]:
#### Reading US city demographic dataset ####

demographic_df = spark.read.csv('us-cities-demographics.csv', inferSchema=True, header=True, sep=';')

In [14]:
demographic_df.head()

Row(City='Silver Spring', State='Maryland', Median Age=33.8, Male Population=40601, Female Population=41862, Total Population=82463, Number of Veterans=1562, Foreign-born=30908, Average Household Size=2.6, State Code='MD', Race='Hispanic or Latino', Count=25924)

In [15]:
demographic_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [16]:
demographic_df.count()

2891

### Step 2: Explore and Assess the Data

#### Explore: I94 Immigration Data

* Explore data types
* count null and missing values

In [17]:
# Performing cleaning tasks here

immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [18]:
immigration_df.count()

3096313

In [19]:
# create a dataframe with missing values count per column
nan_null_count = immigration_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in immigration_df.columns]).toPandas()

In [20]:
nan_null_count.sort_values(by=0 ,axis=1, ascending=False)

Unnamed: 0,entdepu,occup,insnum,visapost,gender,i94addr,depdate,matflag,entdepd,airline,...,cicid,i94yr,count,i94visa,arrdate,i94port,i94res,i94cit,i94mon,visatype
0,3095921,3088187,2982605,1881250,414269,152592,142457,138429,138429,83627,...,0,0,0,0,0,0,0,0,0,0


In [21]:
missing_percent = 100*(nan_null_count/immigration_df.count())
missing_percent.sort_values(by=0 ,axis=1, ascending=False)

Unnamed: 0,entdepu,occup,insnum,visapost,gender,i94addr,depdate,matflag,entdepd,airline,...,cicid,i94yr,count,i94visa,arrdate,i94port,i94res,i94cit,i94mon,visatype
0,99.98734,99.737559,96.327632,60.757746,13.379429,4.928184,4.600859,4.470769,4.470769,2.700857,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


#### Clean: I94 Immigration Data
* Drop columns with missing valuse percentage exceeding 70%
* Drop duplicates IDs
* Drop rows missing essential information

In [22]:
# Let's drop columns missing more than >70%
to_be_dropped = [ 'entdepu','occup','insnum']

cleaned_immig = immigration_df.drop(*to_be_dropped)

In [23]:
# drop duplicated IDs
cleaned_immig = cleaned_immig.dropDuplicates(['cicid'])

In [24]:
cleaned_immig.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+---------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|entdepa|entdepd|matflag|biryear| dtaddto|gender|airline|         admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+---------------+-----+--------+
|299.0|2016.0|   4.0| 103.0| 103.0|    NYC|20545.0|    1.0|     NY|20550.0|  54.0|    2.0|  1.0|20160401|    null|      O|      O|      M| 1962.0|06292016|  null|     OS|5.5425872433E10|00087|      WT|
|305.0|2016.0|   4.0| 103.0| 103.0|    NYC|20545.0|    1.0|     NY|20555.0|  63.0|    2.0|  1.0|20160401|    null|      O|      O|      M| 1953.0|06292016|  null|     OS|5.5425817433E10|00087|

In [25]:
# Making sure that essential information presents in df

cleaned_immig = cleaned_immig.dropna(subset= ['cicid','i94res','biryear'])
display(cleaned_immig)

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, entdepa: string, entdepd: string, matflag: string, biryear: double, dtaddto: string, gender: string, airline: string, admnum: double, fltno: string, visatype: string]

In [26]:
cleaned_immig.count()

3095511

#### Explore: U.S. City Demographic Data

* Explore data types
* Count null and missing values

In [32]:
demographic_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [33]:
demographic_df.count()

2891

In [34]:
# Percentage of nan and null values in demographic df
nan_null_count = demographic_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in demographic_df.columns]).toPandas()

missing_percent = 100*(nan_null_count/demographic_df.count())
missing_percent.sort_values(by=0 ,axis=1, ascending=False)

Unnamed: 0,Average Household Size,Number of Veterans,Foreign-born,Male Population,Female Population,City,State,Median Age,Total Population,State Code,Race,Count
0,0.553442,0.449671,0.449671,0.10377,0.10377,0.0,0.0,0.0,0.0,0.0,0.0,0.0


#### Clean: U.S. City Demographic Data
Almost all columns contains enugh information, so no need to delete any
* Drop duplicates
* Drop rows missing essential information

In [35]:
# drop duplicate columns
cleaned_demo = demographic_df.drop_duplicates(subset=['City', 'State', 'Race'])

In [36]:
cleaned_demo = cleaned_demo.dropna(subset=['Foreign-born', 'Average Household Size'])

In [37]:
cleaned_demo.count()

2875

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

1- **Fact table**: Immigrant fact
 * **cicid**: record id
 * **i94yr**: year
 * **i94mon**: month 
 * **i94cit**: origin city code
 * **i94port**: destination city code
 * **arrdate**: arrival date to the US 
 * **i94mode**: transportation mean code
 * **depdate**: departure date from the US
 * **i94visa**: reason for immigration

2- **Dimension table**: temperature

 * **AverageTemperature**: average temperature
 * **City**: name of the city
 * **Country**: name of the country
 * **Latitude**: latitude of the city
 * **Longitude**: longitude of the city
 
3- **Dimension table**: demographic

 * **City**: name of the city
 * **State**: name of the state
 * **median_age**: median age of the population
 * **male_population**: count of male population
 * **female_population**: count of female population
 * **total_population**: count of total population
 * **number_of_Veterans**: count of total vetrans
 * **foreign_born**: count of foreign residents of the city
 * **average_Household_Size**: average city household size
 * **Race**: respondent race
 * **Count**: count of city's individual per race
 
#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [38]:
# Pipelining table 1- immigration_fact
fact_immig = cleaned_immig["cicid","i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"]

fact_immig.write.parquet("tables/immigration_fact", partitionBy = 'i94port', mode = 'overwrite')

In [39]:
# Pipelining table 2- temperature
temp_table = cleaned_temp["AverageTemperature", "City", "Country", "Latitude", "Longitude"]

temp_table.write.parquet("tables/temperature", mode = 'overwrite')

In [40]:
# Pipelining table 3- demographic
demo_table = cleaned_demo.withColumnRenamed('Median Age','median_age') \
            .withColumnRenamed('Male Population', 'male_population') \
            .withColumnRenamed('Female Population', 'female_population') \
            .withColumnRenamed('Total Population', 'total_population') \
            .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
            .withColumnRenamed('Foreign-born', 'foreign_born') \
            .withColumnRenamed('Average Household Size', 'average_household_size') \
            .withColumnRenamed('State Code', 'state_code')

demo_table = demo_table.drop("state_code")
demo_table.write.parquet("tables/demographic", partitionBy = 'City', mode = 'overwrite')

#### 4.2 Data Quality Checks
 
 * Count tables records to ensure they are loaded properly

In [45]:
def quality_check(df, table_name):
    """Count a table records to ensure it's loaded properly
    df: spark dataframe of the table
    table_name: the name of the table to be checked
    """
    total_count = df.count()

    if total_count == 0:
        print(f"Data quality check failed for {table_name} with zero records!")
    else:
        print(f"Data quality check passed for {table_name} with {total_count:,} records.")
    return 0

In [46]:
tables = {
    'immigration_fact': fact_immig,
    'temperature': temp_table,
    'demographic': demo_table,
}
for table_name, table_df in tables.items():
    quality_check(table_df, table_name)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:43877)
Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:43877)

#### 4.3 Data dictionary 

1- **Fact table**: Immigrant fact
 * **cicid**: record id
 * **i94yr**: year
 * **i94mon**: month 
 * **i94cit**: origin city code
 * **i94port**: destination city code
 * **arrdate**: arrival date to the US 
 * **i94mode**: transportation mean code
 * **depdate**: departure date from the US
 * **i94visa**: reason for immigration

2- **Dimension table**: temperature

 * **AverageTemperature**: average temperature
 * **City**: name of the city
 * **Country**: name of the country
 * **Latitude**: latitude of the city
 * **Longitude**: longitude of the city
 
3- **Dimension table**: demographic

 * **City**: name of the city
 * **State**: name of the state
 * **median_age**: median age of the population
 * **male_population**: count of male population
 * **female_population**: count of female population
 * **total_population**: count of total population
 * **number_of_Veterans**: count of total vetrans
 * **foreign_born**: count of foreign residents of the city
 * **average_Household_Size**: average city household size
 * **Race**: respondent race
 * **Count**: count of city's individual per race

4- **Dimension table**: time

 * **arrdate**: arrival date to the US
 * **year**: the year when the respondent arrived
 * **month**: the month when the respondent arrived
 * **day**: the day when the respondent arrived
 * **weekday**: the weekday when the respondent arrived

#### Step 5: Complete Project Write Up
* Why Spark is used in the project ?
 * Spark is able to handle a tremendous amount of data, and a variety of data formats. Spark also offers a great API to analyse Big Data
 
* How often the data should be updated and why ?
 * it should be updated monthly as I94 immigration dataset is updated monthly
 
* Write a description of how you would approach the problem differently under the following scenarios:
 * **The data was increased by 100x**:
 
 We would use more nodes in the cloud.
  
 * **The data populates a dashboard that must be updated on a daily basis by 7am every day**:
 
 Airflow is a good choise here to pipeline, schedule, and monitor the data easily.
  
 * **The database needed to be accessed by 100+ people**:
 
 We would use a cloud service like AWS to enable multible access.