# Project Title
### Data Engineering Capstone Project

#### Project Summary
The main objective of this project is to explore the datasets 194 immigration data, global land temperatures data, us demographics data and the airport-codes data, then create an ETL pipeline as a use case for an analytics database using Spark.  A use case for this analytics database is to find immigration patterns to the US. For example, we could try to find answears to questions such as, do people from countries with warmer or cold climate immigrate to the US in large numbers?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [39]:
# Do all imports and installs here
from pyspark.sql import SparkSession
from pyspark.sql.types import MapType, StringType
import pyspark.sql.functions as F
from pyspark.sql.types import StructType as R, StructField as Fld,\
DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, ArrayType as Ar,FloatType as Flt
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import *

import seaborn as sns
import matplotlib.pyplot as plt
import os
import configparser
import datetime as dt


import helpers
import util


Configurations

In [None]:

config = configparser.ConfigParser()
config.read('aws_config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

Setup Spark

In [2]:
spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
* Use spark to load the data into dataframe

* Perform Exploratory data analysis on 194 immigration dataset, missing values, e.t.c

* Perform Exploratory data analysis on global land temperatures dataset, missing values, e.t.c

* Perform Exploratory data analysis onairport-codes dataset, missing values, e.t.c

* Perform Exploratory data analysis on us-cities-demographics dataset, missing values, e.t.c

* Data cleaning on all datasets

* Creat fact table from the cleaned immigration dataset

* Create dimension tables


The technology used in this project is Amazon S3, Apache Spark. Data will be read and staged from the customers repository using Spark.

While the whole project has been implemented on this notebook, provisions has been made to run the ETL on a spark cluster through etl.py. The etl.py script reads data from S3 and creates fact and dimesion tables through Spark that are loaded back into S3.

#### Describe and Gather Data 

##### 194 Immigration data
This data comes from the US National Tourism and Trade Office. In the past all foreign visitors to the U.S. arriving via air or sea were required to complete paper Customs and Border Protection Form I-94 Arrival/Departure Record or Form I-94W Nonimmigrant Visa Waiver Arrival/Departure Record and this dataset comes from this forms.

In [3]:
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df =spark.read.format('com.github.saurfang.sas.spark').load(fname)

In [4]:
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


### Data dictionary

|Feature| Description|
| --- | --- | 
|cicid  | Unique record ID|
|i94yr|   |4 digit year|
|i94mon  |Numeric month|
|i94cit  |3 digit code for immigrant country of birth|
|i94res  |3 digit code for immigrant country of residence|
|i94port |Port of admission|
|arrdate |Arrival Date in the USA|
|i94mode |Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)|
|i94addr |USA State of arrival|
|depdate|Departure Date from the USA|
|i94bir|Age of Respondent in Years|
|i94visa|Visa codes collapsed into three categories|
|count|Field used for summary statistics|
|dtadfile|Character Date Field - Date added to I-94 Files|
|visapost|Department of State where where Visa was issued|
|occup|Occupation that will be performed in U.S|
|entdepa|Arrival Flag - admitted or paroled into the U.S.|
|entdepd|Departure Flag - Departed, lost I-94 or is deceased|
|entdepu|Update Flag - Either apprehended, overstayed, adjusted to perm residence|
|matflag|Match flag - Match of arrival and departure records|
|biryear|4 digit year of birth|
|dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)|
|gender|Non-immigrant sex|
|insnum|INS number|
|airline| Airline used to arrive in U.S.|
|admnum|Admission Number|
|fltno |Flight number of Airline used to arrive in U.S.|
|visatype| Class of admission legally admitting the non-immigrant to temporarily stay in U.S.|


##### AIRPORT CODES DATASET

Airport codes from around the world. Downloaded from public domain source http://ourairports.com/data/ who compiled this data from multiple different sources. This data is updated nightly.

The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code

In [5]:
airport_df = spark.read.csv("airport-codes_csv.csv",header=True)

In [6]:
airport_df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


#### Data dictionary

|Feature| Description|
| --- | --- | 
|ident  | Unique record ID|
|type  | type of airport|
|name  | name of airport|
|elevation_ft  | elevation in feat of airport|
|continent  | continent of airport|
|iso_country  | country of airport|
|municipality  | municipality of airport|
|gps_code  |  an IATA gps identifier|
|iata_code  |  an IATA location identifier|
|local_code  | local airport idenifier code|
|coordinates  | coordinate of airport(lat,long)|

#####   World Temperature Data

This dataset came from Kaggle.  Early data was collected by technicians using mercury thermometers, where any variation in the visit time impacted measurements. In the 1940s, the construction of airports caused many weather stations to be moved.
The  data have been repackaged from a newer compilation put together by the Berkeley Earth, which is affiliated with Lawrence Berkeley National Laboratory. The Berkeley Earth Surface Temperature Study combines 1.6 billion temperature reports from 16 pre-existing archives. It is nicely packaged and allows for slicing into interesting subsets (for example by country)

In [7]:
file_name = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = spark.read.csv(file_name, header=True, inferSchema=True)

In [8]:
temperature_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### Data Dictionary

|Feature| Description|
| --- | --- | 
|dt  | Date|
|AverageTemperature  | Global average land temperature in celsius|
|AverageTemperatureUncertainty  | 95% confidence interval around the average|
|City  | Name of City|
|Country  | Name of Country|
|Latitude | City Latitude|
|Longitude   | City Longitude|


##### U.S City Demographic Data

This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 

This data comes from the US Census Bureau's 2015 American Community Survey.

In [9]:
file_name = "us-cities-demographics.csv"
demographics_df = spark.read.csv(file_name, inferSchema=True, header=True, sep=';')

In [10]:
demographics_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


##### Data Dictionary

|Feature| Description|
| --- | --- | 
|City  | Date|
|State  | US State where city is located|
|Median Age  | Median age of the population|
|Male Population  | Number of male population|
|Femal Population  | Number of female population|
|Total Population  | Number of total population|
|Number of Veterans  | Number of total Veterans|
|Foriegn-born  | Count of residents of the city that were not born in the city|
|Average Household Size  | Average city household size|
|State Code  | Code of the US state|
|Race  | Respondent race|
|Count  | 	Count of city's individual per race|


### Step 2: Explore and Assess the Data



#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.
#### Cleaning Steps
Document steps necessary to clean the data

##### Explore Immigration dataset

In [11]:
immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

How many null rows has each column?

In [12]:
immigration_df.select([((F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)) / F.count(c)) * 100).alias(c)  for c in immigration_df.columns]).toPandas().T

Unnamed: 0,0
cicid,0.0
i94yr,0.0
i94mon,0.0
i94cit,0.0
i94res,0.0
i94port,0.0
arrdate,0.0
i94mode,0.007719
i94addr,5.183643
depdate,4.822747


Delete columns with over 99 percent null values

In [13]:
columns_to_remove = ['isnum','entdepu','occup']
immigration_df = immigration_df.drop(*columns_to_remove)

How many unique Values in each Column?

In [14]:
immigration_df.select([F.countDistinct(c).alias(c) for c in immigration_df.columns]).toPandas().T

Unnamed: 0,0
cicid,3096313
i94yr,1
i94mon,1
i94cit,243
i94res,229
i94port,299
arrdate,30
i94mode,4
i94addr,457
depdate,235


In [15]:
immigration_df.count()

3096313

Drop duplicate entries

In [16]:
col = ['cicid']
immigration_df = immigration_df.dropDuplicates(col)

Drop rows with missing values

In [17]:
new_immig_df = immigration_df.dropna(how='all', subset=col)

##### EDA on Airport Codes Dataset

In [18]:
airport_df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



Percent Null

In [19]:
airport_df.select([((F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)) / F.count(c)) * 100).alias(c)  for c in airport_df.columns]).toPandas().T

Unnamed: 0,0
ident,0.0
type,0.0
name,0.0
elevation_ft,14.574882
continent,0.0
iso_country,0.0
iso_region,0.0
municipality,11.490111
gps_code,34.23105
iata_code,499.357928


Delete column with high percent null values

In [20]:
cols = ['local_code','iata_code']
airport_df = airport_df.drop(*cols)

Number of Unique values

In [21]:
airport_df.select([F.countDistinct(c).alias(c) for c in airport_df.columns]).toPandas().T

Unnamed: 0,0
ident,55075
type,7
name,52144
elevation_ft,5449
continent,7
iso_country,244
iso_region,2810
municipality,27133
gps_code,40850
coordinates,54874


In [22]:
airport_df.count()

55075

Explore Temperature Data

In [23]:
temperature_df.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



Percent Missing values

In [24]:
temperature_df.select([((F.count(F.when(F.col(c).isNull(), c)) / F.count(c)) * 100).alias(c)  for c in temperature_df.columns]).toPandas().T

Unnamed: 0,0
dt,0.0
AverageTemperature,4.421692
AverageTemperatureUncertainty,4.421692
City,0.0
Country,0.0
Latitude,0.0
Longitude,0.0


Drop Duplicate rows

In [25]:
temperature_df.count()

8599212

In [26]:
col = ['AverageTemperature','AverageTemperatureUncertainty']
temperature_df = temperature_df.dropDuplicates(col)

In [27]:
temperature_df.count()

3085608

#### EDA on U.S City Demographic Data

In [28]:
demographics_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



Percent Missing

In [29]:
demographics_df.select([((F.count(F.when(F.col(c).isNull(), c)) / F.count(c)) * 100).alias(c)  for c in demographics_df.columns]).toPandas().T

Unnamed: 0,0
City,0.0
State,0.0
Median Age,0.0
Male Population,0.103878
Female Population,0.103878
Total Population,0.0
Number of Veterans,0.451703
Foreign-born,0.451703
Average Household Size,0.556522
State Code,0.0


Number of unique values

In [30]:
demographics_df.select([F.countDistinct(c).alias(c) for c in demographics_df.columns]).toPandas().T

Unnamed: 0,0
City,567
State,49
Median Age,180
Male Population,593
Female Population,594
Total Population,594
Number of Veterans,577
Foreign-born,587
Average Household Size,161
State Code,49


Remove Duplicates

In [31]:
demographics_df.count()

2891

In [32]:
col = ['Average Household Size','Average Household Size','Foreign-born']
demographics_df = demographics_df.dropDuplicates(col)

In [33]:
demographics_df.count()

590

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
We would be using three datasets for defining our conceptual model:
* Immigration dataset
* US cities demographics dataset
* World Temperature dataset


![alt text](udacity_capstone_project_data_model.png "Data Model")


**Fact Table**

- immigration_table 


**Dimension Tables**

- visa_type_table 
- country_table 
- usa_demographics_table
- immigration_calender_table



The us demographics dimension table comes from the demographics dataset and links to the immigration fact table at US state level. This dimension would allow analysts to get insights into migration patterns into the US based on demographics as well as overall population of states. 


The visa type dimension table comes from the immigration datasets and links to the immigaration via the visa_type_key.


The country dimension table is made up of data from the global land temperatures by city and the immigration datasets. The combination of these two datasets allows analysts to study correlations between global land temperatures and immigration patterns to the US.


#### 3.2 Mapping Out Data Pipelines

- Load the datasets into spark dataframe
- Clean the 194 immigration dataset
- Create the immigration fact table
- Create the calender dimension table from the 194 immigration dataset
- Create the visa table from the 194 immigration dataset
- Clean the temperature dataset
- Create the country table
- Clean the demographics data
- Create the demograhpic table



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Immigration Calender Dim Table

In [40]:
def create_immigration_calendar_dim(spark_df, output_data):
    """This function creates an immigration calendar based on arrival date
    
    Args:
        spark_df: spark dataframe of immigration events
        output_data: path to write dimension dataframe 
        
    Return: 
         spark dataframe representing calendar dimension
    """
    # create a udf to convert arrival date in SAS format to datetime object
    get_datetime = F.udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    
    # create initial calendar df from arrdate column
    calendar_df = spark_df.select(['arrdate']).withColumn("arrdate", get_datetime(spark_df.arrdate)).distinct()
    
    # expand df by adding other calendar columns
    calendar_df = calendar_df.withColumn('arrival_day', dayofmonth('arrdate'))
    calendar_df = calendar_df.withColumn('arrival_week', weekofyear('arrdate'))
    calendar_df = calendar_df.withColumn('arrival_month', month('arrdate'))
    calendar_df = calendar_df.withColumn('arrival_year', year('arrdate'))
    calendar_df = calendar_df.withColumn('arrival_weekday', dayofweek('arrdate'))

    # create an id field in calendar df
    calendar_df = calendar_df.withColumn('id', monotonically_increasing_id())
    
    # write the calendar dimension to parquet file
    partition_columns = ['arrival_year', 'arrival_month', 'arrival_week']
    calendar_df.write.parquet(output_data + "immigration_calendar", partitionBy=partition_columns, mode= "overwrite")
    
    return calendar_df

In [41]:
output_data = "tables/"
calendar_df = create_immigration_calendar_dim(new_immig_df, output_data)

In [42]:
calendar_df.limit(5).toPandas()

Unnamed: 0,arrdate,arrival_day,arrival_week,arrival_month,arrival_year,arrival_weekday,id
0,2016-04-22,22,16,4,2016,6,8589934592
1,2016-04-15,15,15,4,2016,6,25769803776
2,2016-04-18,18,16,4,2016,2,42949672960
3,2016-04-09,9,14,4,2016,7,68719476736
4,2016-04-11,11,15,4,2016,2,85899345920


Country Table

In [50]:
import pandas as pd
def create_country_dim(spark_df, temp_df, output_data):
    """This function creates a country dimension from the immigration and global land temperatures data.
    
    Args:
    
        spark_df: spark dataframe of immigration events
        temp_df: spark dataframe of global land temperatures data.
        output_data: path to write dimension dataframe to
    Return: 
          spark dataframe representing calendar dimension
    """
    # get the aggregated temperature data
    agg_temp = util.aggregate_temperature_data(temp_df).toPandas()
    # load the i94res to country mapping data
    mapping_codes = pd.read_csv('i94res.csv')
    
    @udf('string')
    def get_country_average_temperature(name):

        avg_temp = agg_temp[agg_temp['Country']==name]['average_temperature']
        
        if not avg_temp.empty:
            return str(avg_temp.iloc[0])
        
        return None
    
    @udf()
    def get_country_name(code):
        name = mapping_codes[mapping_codes['code']==code]['Name'].iloc[0]
        
        if name:
            return name.title()
        return None
        
    # select and rename i94res column
    dim_df = spark_df.select(['i94res']).distinct() \
                .withColumnRenamed('i94res', 'country_code')
    
    # create country_name column
    dim_df = dim_df.withColumn('country_name', get_country_name(dim_df.country_code))
    
    # create average_temperature column
    dim_df = dim_df.withColumn('average_temperature', get_country_average_temperature(dim_df.country_name))
    
    # write the dimension to a parquet file
    dim_df.write.parquet(output_data + "country", "overwrite")
    
    return dim_df

In [51]:
country_table = create_country_dim(new_immig_df, temperature_df, output_data)

In [52]:
country_table.limit(5).toPandas()

Unnamed: 0,country_code,country_name,average_temperature
0,692.0,Ecuador,20.2044865586
1,299.0,Mongolia,-3.51144333043
2,576.0,El Salvador,25.0509344303
3,735.0,Montenegro,10.1025362602
4,206.0,Hong Kong,


Visa Table

In [53]:
def create_visa_type_dim(spark_df, output_data):
    """This function creates a visa type dimension from the immigration data.
    
    Args:
        spark_df: spark dataframe of immigration events
        output_data: path to write dimension dataframe to
    Return: 
        spark dataframe representing calendar dimension
    """
    # create visatype df from visatype column
    visatype_df = spark_df.select(['visatype']).distinct()
    
    # add an id column
    visatype_df = visatype_df.withColumn('visa_type_key', monotonically_increasing_id())
    
    # write dimension to parquet file
    visatype_df.write.parquet(output_data + "visatype", "overwrite")
    
    return visatype_df


In [54]:
def get_visa_type_dimension(output_data):
    return spark.read.parquet(output_data + "visatype")

In [55]:
visatype_df = create_visa_type_dim(new_immig_df, output_data)

In [56]:
visatype_df.limit(5).toPandas()

Unnamed: 0,visatype,visa_type_key
0,F2,103079215104
1,GMB,352187318272
2,B2,369367187456
3,F1,498216206336
4,CPL,601295421440


Demographics table

In [57]:

def create_demographics_dimen(spark_df, output_data):
    """This function creates a us demographics dimension table from the us cities demographics data.
    
    Args:
        spark_df: spark dataframe of us demographics survey data
        output_data: path to write dimension dataframe to
    Return: 
        spark dataframe representing demographics dimension
    """
    dim_df = spark_df.withColumnRenamed('Median Age','median_age') \
            .withColumnRenamed('Male Population', 'male_population') \
            .withColumnRenamed('Female Population', 'female_population') \
            .withColumnRenamed('Total Population', 'total_population') \
            .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
            .withColumnRenamed('Foreign-born', 'foreign_born') \
            .withColumnRenamed('Average Household Size', 'average_household_size') \
            .withColumnRenamed('State Code', 'state_code')
    # lets add an id column
    dim_df = dim_df.withColumn('id', monotonically_increasing_id())
    
    # write dimension to parquet file
    dim_df.write.parquet(output_data + "demographics", mode="overwrite")
    
    return dim_df

In [59]:
demographics_dim_df = create_demographics_dimen(demographics_df, output_data)

In [60]:
demographics_dim_df.limit(5).toPandas()

Unnamed: 0,City,State,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,Race,Count,id
0,Bay,Texas,32.9,37977,37508,75485,3478,13192,2.59,TX,Hispanic or Latino,31672,0
1,Centennial,Colorado,42.4,53222,56504,109726,7226,9501,2.66,CO,Black or African-American,4005,1
2,Clovis,California,37.8,52392,51780,104172,6173,13409,2.76,CA,White,78029,2
3,Evanston,Illinois,36.8,34146,41377,75523,2058,15003,2.29,IL,White,54496,8589934592
4,Allen,Texas,37.2,51324,46814,98138,3505,19649,3.04,TX,American Indian and Alaska Native,227,8589934593


Immigration table(fact)

In [61]:
def create_immigration_fact_table(spark_df, output_data):
    """This function creates an country dimension from the immigration and global land temperatures data.
    
    Args:
        spark_df: spark dataframe of immigration events
        visa_type_df: spark dataframe of global land temperatures data.
        output_data: path to write dimension dataframe to
        
    Return:
         spark dataframe representing calendar dimension
    """
    # get visa_type dimension
    dim_df = get_visa_type_dimension(output_data).toPandas()
    
    @udf('string')
    def get_visa_key(visa_type):
        """user defined function to get visa key
        
        :param visa_type: US non-immigrant visa type
        :return: corresponding visa key
        """
        key_series = dim_df[dim_df['visatype']==visa_type]['visa_type_key']
        
        if not key_series.empty:
            return str(key_series.iloc[0])
        
        return None
    
    # create a udf to convert arrival date in SAS format to datetime object
    get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    
    # rename columns to align with data model
    df = spark_df.withColumnRenamed('cicid','record_id') \
            .withColumnRenamed('i94res', 'country_residence_code') \
            .withColumnRenamed('i94addr', 'state_code') 
    
    # create visa_type key
    df = df.withColumn('visa_type_key', get_visa_key('visatype'))
    
    # convert arrival date into datetime object
    df = df.withColumn("arrdate", get_datetime(df.arrdate))
    
    # write dimension to parquet file
    df.write.parquet(output_data + "immigration_fact", mode="overwrite")
    
    return df

In [None]:
immigration_fact_df = create_immigration_fact_table(new_immig_df, output_data)

In [None]:
immigration_fact_df.limit(5).toPandas()


**4.2 Data Quality Checks**

The data quality checks ensures that the ETL has created fact and dimension tables with adequate records.

In [None]:
# Perform quality checks here
table_dfs = {
    'immigration_fact': immigration_fact_df,
    'visa_type_dim': visatype_df,
    'calendar_dim': calendar_df,
    'usa_demographics_dim': demographics_dim_df,
    'country_dim': country_dim_f
}
for table_name, table_df in table_dfs.items():
    # quality check for table
    helpers.quality_checks(table_df, table_name)

#### 4.3 Data dictionary 

**Fact-Table**

##### Immigration table

| Feature| Descriptions|
|--------|-------------|
|record_id| Unique record ID|
|country_residence_code| 3 digit code for immigrant country of residence|
|visa_type_key| A numerical key that links to the visa_type dimension table|
|state_code| US state of arrival|
|i94yr| 4 digit year|
|i94mon| Numeric month|
|i94port| Port of admission|
|arrdate| Arrival Date in the USA|
|i94mode|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)|
|i94addr|USA State of arrival|
|depdate| Departure Date from the USA|
|i94bir|Age of Respondent in Years|
|i94visa| Visa codes collapsed into three categories|
|count| Field used for summary statistics|
|dtadfile|Character Date Field - Date added to I-94 Files|
|visapost| Department of State where where Visa was issued|
|occup| Occupation that will be performed in U.S|
|entdepa| Arrival Flag - admitted or paroled into the U.S.|
|entdepd| Departure Flag - Departed, lost I-94 or is deceased|
|entdepu| Update Flag - Either apprehended, overstayed, adjusted to perm residence|
|matflag| Match flag - Match of arrival and departure records|
|biryear| 4 digit year of birth|
|dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)|
|gender| Non-immigrant sex|





**Dimension Tables**

##### Country table

| Feature| Descriptions|
|--------|-------------|
|country_code|Unique country code|
|country_name| Name of country|
|average_temperature| Average temperature of country|


##### Visa Type table


| Feature| Descriptions|
|--------|-------------|
|visa_type_key| Unique record ID|
|visa_type| Name of Visa|


##### Calender table

| Feature| Descriptions|
|--------|-------------|
|id| Unique  ID|
|arrdate| Arrival date into US|
|arrival_year| Arrival year into US|
|arrival_month| Arrival MonthS|
|arrival_day| Arrival Day|
|arrival_week| Arrival Week|
|arrival_weekday| Arrival WeekDay|

##### US Demographics table


| Feature| Descriptions|
|--------|-------------|
|id| Unique  ID|
|state_code| US state code|
|City| City Name|
|State| US State where city is located|
|Median Age| Median age of the population|
|Male Population| Count of male population|
|Female Population|Count of female population|
|Total Population| Count of total population|
|Number of Veterans|Count of total Veterans|
|Foreign born|Count of residents of the city that were not born in the city|
|Average Household Size|Average city household size|
|Race|Respondent race|
|Count| Count of city's individual per race|




#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project:

  - Apache spark is used in this project because of it's ability to handler multiple file formates with large amount of data in a distributed fashion. Itoffers fast analytics engine for big data. It is also easy to use as it is similar to pandas dataframe.
  
* Propose how often the data should be updated and why:

  - THe immigration dataset is updated monthly, so I propose the data to be updated monthly


* Write a description of how you would approach the problem differently under the following scenarios:

 * The data was increased by 100x:
 
    - SPark can handle the increase but we would have to increase the EMR nodes in the clusters
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day:
 
    - In this case, we can leverage Apache Airflow to schedule and run the data pipelines
 
 * The database needed to be accessed by 100+ people:
     
     - We would want to move our analytic database to a columna store data warehouse like Amazon Redshift