### Data Engineering Capstone Project

#### Project Summary
This project is building up a data warehouse by integrating different data sources to get useful insights about the pattern of migrations.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Scope the Project and Gather Data

#### Scope: 
The project aims to integrate four data sources to form dimensions and fact tables by:
- load the data into spark dataframes.
- Perform data cleaning for all the dataframes.
- Create dimension tables and fact table.


- ##### Tools: python (pandas, pyspark), AWS S3


- #### Describe and Gather Data:
     - ##### I94 Immigration Data https://travel.trade.gov/research/reports/i94/historical/2016.html
     This data comes from the US National Tourism and Trade Office.
     - ##### World Temperature Data https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data
     This dataset came from Kaggle in csv formate. It is us-climate-change, contains monthly average temperature data.
     - ##### U.S. City Demographic Data https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
     This data comes from OpenSoft in csv formate.It contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 
    

In [1]:
# !pip install --upgrade pyspark

In [1]:
# import libaries
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.functions import isnan, when, count, col,concat, upper, udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.functions import avg, mean, round, monotonically_increasing_id
from datetime import datetime, timedelta
from pyspark.sql import types as T
from pyspark.sql.types import IntegerType ,FloatType

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [None]:
#AWS Configs
# config = configparser.ConfigParser()
# config.read('config.cfg')

# os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
# os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

### Step 1: Explore and Assess the Data
- By Identify data quality issues, like missing values, duplicate data

In [2]:

def identify_quality(df,columns):
    '''
    Discription: fuction for identifing data quality (missing values, duplicate data).
    
    inputs: 
         spark dataframe
         columns to check duplicates for these columns
         
    outputs: 
            Number of null and missing data in every column
            Duplicated rows in data
    
    '''
    #Count of both null and missing values of dataframe in pyspark
    print('Number of null and missing data in every column')
    display(df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns if c not in ("dt")]).toPandas())
    
    #Duplicates check
    print("Duplicated rows in data")
    df \
        .groupby(columns) \
        .count() \
        .where('count > 1') \
        .sort('count', ascending=False) \
        .show()
    
   

##### Explore the US-City Data

In [4]:
#read and display data
df_cities = spark.read.csv("us-cities-demographics.csv",sep =";", header=True, inferSchema=True)
print("Total count of data: ", str(df_cities.count()))
# samples of data
df_cities.limit(5).toPandas()

Total count of data:  2891


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [5]:
df_cities.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



##### Description for each column

|Column |Description|
|---|---|
|City|Name of the city
State|State of the city
Foreign born|Number of residents who weren't born here
Average Household Size|Average size of a household
Median Age|Population's median age
Male Population|Population of male
State Code|US state code
Race|Race
Count|Number of people for each race
Female Population|Population of female
Total Population|Total population
Number of Veterans|Total Veterans

In [6]:
# identifing data quality issues
df_cities=df_cities.withColumnRenamed('count','count_of_race')#to avoid conflict between count of duplicates and count of race
identify_quality(df_cities,['City','State'])

Number of null and missing data in every column


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,count_of_race
0,0,0,0,3,3,0,13,13,16,0,0,0


Duplicated rows in data
+--------------------+--------------+-----+
|                City|         State|count|
+--------------------+--------------+-----+
|    North Charleston|South Carolina|    5|
|           Lynchburg|      Virginia|    5|
|            Lakeland|       Florida|    5|
|              Dayton|          Ohio|    5|
|          Cincinnati|          Ohio|    5|
|         Kansas City|        Kansas|    5|
|          Pittsburgh|  Pennsylvania|    5|
|              Austin|         Texas|    5|
|           Camarillo|    California|    5|
|              Upland|    California|    5|
|              Auburn|    Washington|    5|
|          High Point|North Carolina|    5|
|Louisville/Jeffer...|      Kentucky|    5|
|             Shawnee|        Kansas|    5|
|         New Bedford| Massachusetts|    5|
|            Columbus|          Ohio|    5|
|              Newton| Massachusetts|    5|
|             Livonia|      Michigan|    5|
|            Appleton|     Wisconsin|    5|
|       

##### Explore the TemperaturesByCity Data

In [26]:
#read and display data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = spark.read.csv(fname, header=True, inferSchema=True)
print("Total count of data: ", str(df_temp.count()))
# samples of data
df_temp.limit(5).toPandas()

Total count of data:  8599212


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [8]:
df_temp.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



##### Description for each column

| Column | Description |
| :- | :- |
|dt | Date |
|AverageTemperature |Global average land temperature in celsius|
|AverageTemperatureUncertainty |95% confidence interval around the average|
|Country |Country|
|City | Name of the city|
|Latitude | Latitude|
|Longitude | Longitude|

In [9]:
# identifing data quality issues
identify_quality(df_temp,df_temp.columns)

Number of null and missing data in every column


Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,364130,364130,0,0,0,0


Duplicated rows in data
+---+------------------+-----------------------------+----+-------+--------+---------+-----+
| dt|AverageTemperature|AverageTemperatureUncertainty|City|Country|Latitude|Longitude|count|
+---+------------------+-----------------------------+----+-------+--------+---------+-----+
+---+------------------+-----------------------------+----+-------+--------+---------+-----+



##### Explore the I94 Immigration Data

In [10]:
# import os
# os.listdir("../../data/18-83510-I94-Data-2016/")

In [27]:
#read and display data
df_im = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
# Total count of data
print(df_im.count())
# samples of data
df_im.limit(5).toPandas()

3096313


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [4]:
df_im.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

##### Decription for each column
|Column|Description|
|---|---|
|cicid|Unique ID|
|i94cit|3 digit code for immigrant country of birth|
|i94res|3 digit code for immigrant country of residence|
|i94port|Port of admission|
|admnum|Admission Number|
|fltno|Flight number of Airline used to arrive in U.S.|
|visatype|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.|
|arrdate|Arrival Date in the USA|
|i94mode|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)|
|i94addr|USA State of arrival|
|depdate|Departure Date from the USA|
|entdepa|Arrival Flag - admitted or paroled into the U.S.|
|entdepd|Departure Flag - Departed, lost I-94 or is deceased|
|entdepu|Update Flag - Either apprehended, overstayed, adjusted to perm residence|
|matflag|Match flag - Match of arrival and departure records|
|biryear|4 digit year of birth|
|dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)|
|gender|Non-immigrant sex|
|insnum|INS number|
|i94bir|Age of Respondent in Years|
|i94visa|Visa codes collapsed into three categories|
|count|Field used for summary statistics|
|dtadfile|Character Date Field - Date added to I-94 Files|
|visapost|Department of State where where Visa was issued|
|occup|Occupation that will be performed in U.S|
|airline|Airline used to arrive in U.S.|
|i94yr|year|
|i94mon|month|

In [28]:
# identifing data quality issues
df_im=df_im.withColumnRenamed('count','count_of_arrivals')#to avoid conflict between count of duplicates and count of arrivals
identify_quality(df_im,['cicid'])

Number of null and missing data in every column


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,0,0,0,0,0,0,0,239,152592,142457,...,3095921,138429,802,477,414269,2982605,83627,0,19549,0


Duplicated rows in data
+-----+-----+
|cicid|count|
+-----+-----+
+-----+-----+



### Step 2: Cleaning Steps
After exploaring data in the previous code, We will drop duplicates and nans

##### Performing cleaning for Immigration Data

- Immigration Data don't have duplicates, so will drop columns that contains significant missing data and rename columns


In [29]:
# Drop columns that contains significant missing data
columns = ['occup', 'entdepu','insnum']
df_im = df_im.drop(*columns)

In [30]:
#remove i94 from columns names
replacements = {c:c.replace('i94','') for c in df_im.columns if 'i94' in c}
df_im= df_im.select([col(c).alias(replacements.get(c, c)) for c in df_im.columns])

#rename multiple columns
mapping_im = dict(zip(['cicid','res','yr','mon', 'port', 'addr','arrdate','depdate','visa' ],
                      ['id','country_code', 'year','month','city_code','State_Code','arrival_date','departure_date','visa_code']))
df_im=df_im.select([col(c).alias(mapping_im.get(c, c)) for c in df_im.columns])


# Convert to arrive_date, departure_date datetime format
udf_datetime_from_sas = udf(lambda x: datetime(1960, 1, 1) + timedelta(days=int(x)) if x else None, T.DateType())
df_im = df_im.withColumn("arrival_date", udf_datetime_from_sas("arrival_date"))
df_im = df_im.withColumn("departure_date", udf_datetime_from_sas("departure_date"))

In [16]:
df_im.limit(5).toPandas()

Unnamed: 0,id,year,month,cit,country_code,city_code,arrival_date,mode,State_Code,departure_date,...,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,2016-04-29,,,,...,T,,,1979.0,10282016,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,2016-04-07,1.0,AL,,...,G,,,1991.0,D/S,M,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,2016-04-01,1.0,MI,2016-08-25,...,T,O,M,1961.0,09302016,M,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,2016-04-01,1.0,MA,2016-04-23,...,O,O,M,1988.0,09302016,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,2016-04-01,1.0,MA,2016-04-23,...,O,O,M,2012.0,09302016,,AA,92468460000.0,199.0,B2


##### Performing cleaning for US-City Data

In [17]:
# Drop rows that contains missing data
df_cities=df_cities.dropna(how='all')

#replace spance with _
replacements = {c:c.replace(' ','_') for c in df_cities.columns if ' ' in c}
df_cities= df_cities.select([col(c).alias(replacements.get(c, c)) for c in df_cities.columns])

#combine city name with city state
df_cities=df_cities.withColumn("City_State",sf.concat(df_cities.City,sf.lit(', '),df_cities.State_Code))
df_cities=df_cities.withColumn("City_State",upper(col('City_State')))
df_cities=df_cities.withColumn("City",upper(col('City')))
df_cities.limit(5).toPandas()

#drop duplicates
df_cities=df_cities.drop_duplicates(subset=['City_State'])

In [18]:
#ss=cities_desc.toPandas()['City_State'].unique().tolist()
#df_cities[~df_cities.City_State.isin(ss)].show()

##### Performing cleaning for TemperaturesByCity Data

In [31]:
# Performing cleaning for TemperaturesByCity
# Drop rows that contains missing data
df_temp = df_temp.dropna(how='all')
df_temp=df_temp.withColumn("City",upper(col('City')))
df_temp=df_temp.withColumn("Country",upper(col('Country')))

#replacements = {c:c.replace('Å','A').replace('Ñ','N').replace('Ó','O') for c in df_temp.columns}
replacements = {c:c.replace(' ','_') for c in df_temp.columns if ' ' in c}
df_temp= df_temp.select([col(c).alias(replacements.get(c, c)) for c in df_temp.columns])

df_temp.show(5)

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|ÅRHUS|DENMARK|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|ÅRHUS|DENMARK|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|ÅRHUS|DENMARK|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|ÅRHUS|DENMARK|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|ÅRHUS|DENMARK|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
We will implement star schema data modeling. Because the purpose of this data warehouse is OLAP and BI app usage.

#### 3.2 Mapping Out Data Pipelines
- Use I94_SAS_Labels_Descriptions to get the names of country, city, visa and able to map codes in Immigration to their names
- Create time dimension table from arrdate field in I94 immigration dataset.
- Create visa dimension table from I94 immigration dataset and I94_SAS_Labels_Descriptions.
- Create country dimension table from the I94 immigration and the global temperatures dataset. The global land temperatures data was aggregated at country level. The table links to the fact table through the country of residence code allowing analysts to understand correlation between country of residence climate and immigration to US states.
- Create usa demographics dimension table from the us cities demographics data. This table join to the fact table through the state code field.
- Create fact table from I94 immigration dataset.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Open I94_SAS_Labels_Descriptions to get the names of country, city, visa and able to map codes in Immigration to their names

In [25]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()
    
countries = []
for country in contents[9:298]:
    pair = country.split('=')
    code, country_name = pair[0].strip(), pair[1].strip().strip("'")
    countries.append((code,country_name))
countries_desc= spark.createDataFrame(countries, ['country_code', 'Country'])
countries_desc.show(3)

#-------------------------------------------cities-------------------------------------------------------------
cities = []
for city in contents[302:962]:
    pair = city.split('=')
    code, City_State = pair[0].strip("\t").strip().strip("'"), pair[1].strip().strip('\t').strip("''").strip()
    cities.append((code,City_State))
cities_desc = spark.createDataFrame(cities, ['city_code', 'City_State'])
cities_desc.show(3)

#-------------------------------------------visas-------------------------------------------------------------

visas = []
for visa in contents[1046:1049]:
    pair = visa.split('=')
    code, visa_category = pair[0].strip().strip('\t').strip("'"), pair[1].strip().strip("'")
    visas.append((code,visa_category))

visas_desc = spark.createDataFrame(visas, ['visa_code', 'visa_category'])
visas_desc.show()

+------------+--------------------+
|country_code|             Country|
+------------+--------------------+
|         582|MEXICO Air Sea, a...|
|         236|         AFGHANISTAN|
|         101|             ALBANIA|
+------------+--------------------+
only showing top 3 rows

+---------+--------------------+
|city_code|          City_State|
+---------+--------------------+
|      ALC|           ALCAN, AK|
|      ANC|       ANCHORAGE, AK|
|      BAR|BAKER AAF - BAKER...|
+---------+--------------------+
only showing top 3 rows

+---------+-------------+
|visa_code|visa_category|
+---------+-------------+
|        1|     Business|
|        2|     Pleasure|
|        3|      Student|
+---------+-------------+



In [9]:
output_path="output/"

##### Calendar dimension

In [23]:
def create_time_dimension(df, output_path):
    """
    Description: This function creates time_dimension based on arrival date
    inputs:
        df: spark dataframe of immigration
        output_path: path to save dimension table 
    outputs:
         spark dataframe representing calendar dimension
    """
    
    # create initial calendar df from arrdate column
    time_df = df.select(['arrival_date']).distinct()
    
    #create columns from date
    time_df = time_df.withColumn('day', dayofmonth('arrival_date'))
    time_df = time_df.withColumn('week', weekofyear('arrival_date'))
    time_df = time_df.withColumn('month', month('arrival_date'))
    time_df = time_df.withColumn('year', year('arrival_date'))
    time_df = time_df.withColumn('weekday', dayofweek('arrival_date'))
    
    # create an id field 
    time_df = time_df.withColumn('time_id', monotonically_increasing_id())
    # write the calendar dimension to parquet file
    partition_columns = ['year', 'month', 'week']
    try:time_df.write.parquet(output_path + "calendar", partitionBy=partition_columns)
    except :time_df.write.parquet(output_path + "calendar", partitionBy=partition_columns, mode="overwrite")

    return time_df

In [24]:
time_dim= create_time_dimension(df_im, output_path)
time_dim.limit(5).toPandas()

Unnamed: 0,arrival_date,day,week,month,year,weekday,time_id
0,2016-04-25,25,17,4,2016,2,8589934592
1,2016-04-22,22,16,4,2016,6,85899345920
2,2016-04-30,30,17,4,2016,7,188978561024
3,2016-04-26,26,17,4,2016,3,266287972352
4,2016-04-04,4,14,4,2016,2,377957122048


##### Visa dimension

In [59]:
def create_visa_dimension(df, visas_desc,output_path):
    """
    Description: This function creates visa_dimension by selecting 'visapost','visatype','visa_code' from immigration and join with visa_desc
    inputs:
        df: spark dataframe of immigration
        output_path: path to save dimension table 
    outputs:
         spark dataframe representing calendar dimension
    """
    visa_df=df.select('visapost','visatype','visa_code','city_code').distinct().join(visas_desc, 'visa_code', "inner")
    visa_df=visa_df.withColumnRenamed('city_code','admission_port')

    # create an id field 
    visa_df = visa_df.withColumn('visa_id', monotonically_increasing_id())
    
    # write the calendar dimension to parquet file
    partition_columns = [ 'visatype', 'admission_port','visa_category']
    try:visa_df.write.parquet(output_path + "visa", partitionBy=partition_columns)
    except:visa_df.write.parquet(output_path + "visa", partitionBy=partition_columns, mode="overwrite")

    return visa_df

In [60]:
visa_dim= create_visa_dimension(df_im, visas_desc,output_path)
visa_dim.limit(10).toPandas()

Unnamed: 0,visa_code,visapost,visatype,admission_port,visa_category,visa_id
0,1.0,WRW,B1,NEW,Business,498216206336
1,1.0,RME,B1,SPM,Business,498216206337
2,1.0,BCH,B1,ORL,Business,498216206338
3,1.0,BEN,B1,NYC,Business,498216206339
4,1.0,MEX,E2,DET,Business,498216206340
5,1.0,NWD,B1,WAS,Business,498216206341
6,1.0,TLV,B1,LVG,Business,498216206342
7,1.0,TLV,B1,SFR,Business,498216206343
8,1.0,KWT,B1,LVG,Business,498216206344
9,1.0,GYQ,B1,WAS,Business,498216206345


##### State dimension

In [35]:
df_cities=df_cities.withColumn('Total_Population',df_cities['Total_Population'].cast(FloatType()))
df_cities=df_cities.withColumn('Male_Population',df_cities['Male_Population'].cast(FloatType()))
df_cities=df_cities.withColumn('Female_Population',df_cities['Female_Population'].cast(FloatType()))
df_cities=df_cities.withColumn('Foreign-Born',df_cities['Foreign-Born'].cast(FloatType()))

from pyspark.sql.functions import sum 
def create_state_dimension(df, output_path):
    """
        Gather state data, create dataframe and write data into parquet files.
        
        :param input_df: dataframe of input data.
        :param output_data: path to write data to.
        :return: dataframe representing state dimension
    """
    state_df = df.groupBy(col("State_Code"), col("State")).agg(
                round(mean('Median_Age'), 2).alias("median_age"),\
                sum("Total_Population").alias("total_population"),\
                sum("Male_Population").alias("male_population"), \
                sum("Female_Population").alias("female_population"),\
                sum("Foreign-Born").alias("foreign_born"), \
                round(mean("Average_Household_Size"),2).alias("average_household_size")
                ).dropna()
    
    state_dim= df_im.select('State_Code').distinct().join(state_df, 'State_Code', "left")

    # create an id field 
    state_dim = state_dim.withColumn('state_id', monotonically_increasing_id())
    state_dim=state_dim.fillna(0)

    partition_columns=['State','foreign_born']
    try:    state_dim.write.parquet(output_path + "state")
    except: state_dim.write.parquet(output_path + "state", mode="overwrite")

    return state_dim

In [36]:
state_dim= create_state_dimension(df_cities,output_path)
state_dim.limit(10).toPandas()

Unnamed: 0,State_Code,State,median_age,total_population,male_population,female_population,foreign_born,average_household_size,state_id
0,.N,,0.0,0.0,0.0,0.0,0.0,0.0,0
1,RG,,0.0,0.0,0.0,0.0,0.0,0.0,8589934592
2,YH,,0.0,0.0,0.0,0.0,0.0,0.0,8589934593
3,RF,,0.0,0.0,0.0,0.0,0.0,0.0,17179869184
4,FT,,0.0,0.0,0.0,0.0,0.0,0.0,25769803776
5,CI,,0.0,0.0,0.0,0.0,0.0,0.0,25769803777
6,TC,,0.0,0.0,0.0,0.0,0.0,0.0,25769803778
7,SC,South Carolina,34.18,533657.0,260944.0,272713.0,27744.0,2.47,34359738368
8,AZ,Arizona,35.04,4499542.0,2227455.0,2272087.0,682313.0,2.77,34359738369
9,FI,,0.0,0.0,0.0,0.0,0.0,0.0,34359738370


##### Country dimension

In [38]:
def create_country_dimension(countries_desc, temp_df, output_path):
    agg_temp = df_temp.groupBy(col("Country")).agg(
                round(mean('AverageTemperature'), 2).alias("AverageTemperature"),\
                round(mean("AverageTemperatureUncertainty"),2).alias("AverageTemperatureUncertainty")).dropna()
    
    country_dim= countries_desc.join(agg_temp, 'Country', "left")
    # create an id field 
    country_dim = country_dim.withColumn('country_id', monotonically_increasing_id())
    country_dim=country_dim.fillna(0)
    # write the dimension to a parquet file
    try:country_dim.write.parquet(output_path + "country")
    except:country_dim.write.parquet(output_path + "country", mode="overwrite")

    return country_dim

In [39]:
country_dim= create_country_dimension(countries_desc,df_temp,output_path)
country_dim.limit(10).toPandas()


Unnamed: 0,Country,country_code,AverageTemperature,AverageTemperatureUncertainty,country_id
0,ARMENIA,151,8.38,1.16,0
1,BAHAMAS,512,24.79,1.09,1
2,INVALID: DRONNING MAUD LAND (ANTARCTICA-NORWAY),739,0.0,0.0,2
3,SOUTH AFRICA,373,16.36,0.72,3
4,No Country Code (914),914,0.0,0.0,8589934592
5,BURMA,243,26.02,0.88,17179869184
6,No Country Code (54),54,0.0,0.0,25769803776
7,HEARD AND MCDONALD IS.,726,0.0,0.0,51539607552
8,BANGLADESH,274,25.05,0.88,60129542144
9,JAPAN,209,13.4,0.62,60129542145


##### Fact dimension

In [7]:
def create_fact_dimension(df_im, output_path):
    df=df_im.select(['id','country_code','city_code','arrival_date','State_Code','visa_code','count_of_arrivals',\
                     'dtadfile','matflag','dtaddto','gender','airline','admnum','fltno'])
    
    # write dimension to parquet file
    try: df.write.parquet(output_path + "immigration_fact")
    except : df.write.parquet(output_path + "immigration_fact", mode="overwrite")
    return df

In [12]:
fact= create_fact_dimension(df_im,output_path)
fact.limit(10).toPandas()

Unnamed: 0,id,country_code,city_code,arrival_date,State_Code,visa_code,count_of_arrivals,dtadfile,matflag,dtaddto,gender,airline,admnum,fltno
0,6.0,692.0,XXX,2016-04-29,,2.0,1.0,,,10282016,,,1897628000.0,
1,7.0,276.0,ATL,2016-04-07,AL,3.0,1.0,20130811.0,,D/S,M,,3736796000.0,296.0
2,15.0,101.0,WAS,2016-04-01,MI,2.0,1.0,20160401.0,M,09302016,M,OS,666643200.0,93.0
3,16.0,101.0,NYC,2016-04-01,MA,2.0,1.0,20160401.0,M,09302016,,AA,92468460000.0,199.0
4,17.0,101.0,NYC,2016-04-01,MA,2.0,1.0,20160401.0,M,09302016,,AA,92468460000.0,199.0
5,18.0,101.0,NYC,2016-04-01,MI,1.0,1.0,20160401.0,M,09302016,,AZ,92471040000.0,602.0
6,19.0,101.0,NYC,2016-04-01,NJ,2.0,1.0,20160401.0,M,09302016,,AZ,92471400000.0,602.0
7,20.0,101.0,NYC,2016-04-01,NJ,2.0,1.0,20160401.0,M,09302016,,AZ,92471610000.0,602.0
8,21.0,101.0,NYC,2016-04-01,NY,2.0,1.0,20160401.0,M,09302016,,AZ,92470800000.0,602.0
9,22.0,101.0,NYC,2016-04-01,NY,1.0,1.0,20160401.0,M,09302016,,AZ,92478490000.0,608.0


#### 4.2 Data Quality Checks
- Count check: the count check ensures that the ETL has created fact and dimension tables.
- Unique key check: the unique key check ensures the data does not contain duplicated values.

In [49]:
def count_check(table, table_name):
    count = table.count()
    if count == 0: print(f"Failed for {table_name}, it contains {count} records.")
    else: print(f"Check passed for {table_name}, it contains {count} records.")


def unique_key_check(table, column, table_name):
    
    if table.count() > table.dropDuplicates([column]).count():
        print(f"Failed, column {column} in table {table_name} contains duplicated values.")
    else:
        print(f"Check passed for column {column} in table {table_name}.")


In [61]:
#read data from praquets
country = spark.read.parquet("output/country")
visa = spark.read.parquet("output/visa")
time = spark.read.parquet("output/calendar")
state = spark.read.parquet("output/state")
fact = spark.read.parquet("output/immigration")

In [66]:
#   
tables = [
    ['country_dim', country,'country_id'],
    ['visa_dim', visa,'visa_id'],
    ['time_dim', time,'time_id'],
    ['states_dim', state,'state_id'],
    ['immigration_fact', fact,'id']]

#count_check
print('Count check')
for row in tables:
    table_name, table, column=row[0],row[1],row[2]
    count_check(table, table_name)
    
print("--------------------------------------------------------------------------------------------")    
#unique_key_check  
print('Unique key check')
for row in tables:
    table_name, table, column = row[0],row[1],row[2]
    unique_key_check(table,column, table_name)

Count check
Check passed for country_dim, it contains 289 records.
Check passed for visa_dim, it contains 29372 records.
Check passed for time_dim, it contains 30 records.
Check passed for states_dim, it contains 458 records.
Check passed for immigration_fact, it contains 3096313 records.
--------------------------------------------------------------------------------------------
Unique key check
Check passed for column country_id in table country_dim.
Check passed for column visa_id in table visa_dim.
Check passed for column time_id in table time_dim.
Check passed for column state_id in table states_dim.
Check passed for column id in table immigration_fact.


#### 4.3 Data dictionary 

* States Dimension Table
|Feature|Description|
|---|---|
state_id|Unique record ID
State_Code|Code of the US state
State|USA State of arrival
City|City Name
median_age|Median age of the population
male_population|Count of male population
female_population|Count of female population
total_population|Count of total population
average_household_size|average household size 
foreign_born|Count of residents of the city that were not born in the city



* Country Dimension Table
|Feature|Description|
|---|---|
|country_id|Unique record ID (primary key)|
|country_code|3 digit code for immigrant country of residence|
|Country|Name of Country|
AverageTemperature|Global average land temperature in celsius
AverageTemperatureUncertainty|95% confidence interval around the average

* Visa Dimension Table
|Feature|Description|
|---|---|
visa_id|Unique record ID
visatype|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.
visapost|Department of State where where Visa was issued
visa_category| category of visa
visa_code|Visa codes collapsed into three categories
admission_port|Port of admission



* Time Dimension Table
|Feature|Description|
|---|---|
time_id|Unique record ID
arrival_date|Arrival Date in the USA
year|4 digit year
month|Numeric month
day| Day number
week| week number
weekday| day of week


* Immigration Fact Table
|Feature|Description|
|---|---|
id|Unique record ID
state_code|foreign key for state
|country_code|foreign key for country|
visa_code|foreign key for visa
arrival_date|foreign key for time
count_of_arrivals|Field used for summary statistics
matflag|Match flag - Match of arrival and departure records
dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)
admnum|Admission Number
gender|gender
dtadfile|Character Date Field - Date added to I-94 Files
airline|Airline used to arrive in U.S.
fltno|Flight number of Airline used to arrive in U.S.

#### Step 5: Complete Project Write Up
- The rationale for the choice of tools and technologies for the project: 
  - Spark was chosen as the technology for data processing. This is because it has well-developed APIs to process different formats. Pandas was chosen in some cases since it provides flexible data structures for data manipulation.
  
  
- How often the data should be updated and why:
 - This depends on the I94 dataset its self, according to the website, it is updated monthly.


- Description of how you would approach the problem differently under the following scenarios:
  - The data was increased by 100x: Spark will still be chosen but parallel computing will be required, this can be provided by any mordern cloud platform.
  - The data populates a dashboard that must be updated on a daily basis by 7am every day: Pipeline automation tools such as Airflow can be used to schedule the work.
   - The database needed to be accessed by 100+ people: Data warehouse such as Redshift or Azure SQL Data Warehouse can be used to address this senario.

In [71]:
country.createOrReplaceTempView("country")
state.createOrReplaceTempView("state")
fact.createOrReplaceTempView("fact")

In [86]:
# How many people entre the US from country have AverageTemperature >=20?
sqlDF = spark.sql("""
SELECT COUNT(*) 
FROM country c
join fact f
on f.country_code=c.country_code 
where c.AverageTemperature >= 20 """).show()

+--------+
|count(1)|
+--------+
|  700563|
+--------+



In [87]:
# How many people entre the US from JAPAN country?
sqlDF = spark.sql("""
SELECT COUNT(*) 
FROM country c
join fact f
on f.country_code=c.country_code 
where c.Country= 'JAPAN' """).show()

+--------+
|count(1)|
+--------+
|  249167|
+--------+



In [89]:
# How many people entre the US to stetes have below 600000 foreign_born?
sqlDF = spark.sql("""
SELECT COUNT(*) 
FROM state s
join fact f
on f.state_code = s.state_code
where s.foreign_born <= 600000""").show()

+--------+
|count(1)|
+--------+
|  928166|
+--------+



In [90]:
# what is the average of people age in stetes have below 600000 foreign_born?
sqlDF = spark.sql("""
SELECT avg(median_age) 
FROM state s
join fact f
on f.state_code = s.state_code
where s.foreign_born <= 600000""").show()

+------------------+
|   avg(median_age)|
+------------------+
|36.238288883443786|
+------------------+



In [91]:
spark.catalog.dropTempView("country")
spark.catalog.dropTempView("fact")
spark.catalog.dropTempView("state")