# US Immigration Data Model
### Data Engineering Capstone Project

#### Project Summary

The goal of the Project is to extract, clean and transform the given raw data into a readable format and to create a Data Model preferably Star Schema which could be used further by Data Analysts, Business Analysts, Data Scientists or some of interest for analytical use cases.

The purpose of the data model is to make the lives of Data Scientist, Data analysts and others who are involved in doing analytics and finding a pattern to understand the trend and other required information. An use case would be to find out how many immigrants are coming to US for the study purpose or just for pleasure and to which city in the US is attracting them the most. Based on this the tourism department for example, could make more revenue by taking necessary actions. The star schema of the star data model would make the quering of the database easier and quick.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up



In [43]:
# Do all imports and installs here
import pandas as pd
from datetime import datetime
import os
import configparser
import re
import calendar
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import udf, col, count, when, isnan, create_map, lit
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import pyspark.sql.functions as F
from itertools import chain
import datetime
from pyspark.sql import Window
import time

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project I have decided to use the Udacity provided datasets to build a Data Model with Star Schema which is easy to understand and have fast analytical query performance. I have used the power of Spark (the Python API PySpark) to build ETL pipeline and I have used Python pandas library to explore the data.
 
 

#### Describe and Gather Data 

The datasets that I have used to build the Data Model are as below :
 1. I94 Immigration Data - The data comes from the US National Tourism and Trade Office and it contains the information about the immigrants of differnt age who arrive in the US for various purpose holding differnt type of visas.
 2. World Temperature Data - The data comes from Kaggle and it contains the data about the temperatures recorded and its trends in different countries in the world.
 3. U.S. City Demographic Data - The data comes from OpenSoft and it contains the demographic data of different US states and its population.
 4. Airport Code Table - The data is taken from the website https://datahub.io/core/airport-codes#data and it contains the data about the airports and its types in different states of the US.

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
# Check the number of files and the name of the data files
filenames = [os.path.join('../../data/18-83510-I94-Data-2016', file) for file in os.listdir('../../data/18-83510-I94-Data-2016')]
filenames

['../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat']

In [4]:
# Read in the data here
start = time.time()
fname = filenames[0]
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")
end = time.time()
print(str((end-start)/60)+ ' minutes')

3.5111743172009784 minutes


In [5]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### Create a Spark session

In [4]:
def create_spark_sess():
    spark = SparkSession\
    .builder\
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
    .config("spark.sql.broadcastTimeout", "36000")\
    .enableHiveSupport().getOrCreate()
    return spark
    
spark = create_spark_sess()

### Step 2: Explore and Assess the Data

#### US Immigration Dataset

Read all the data files and write them as *parquet* files partition by *month* of the year

In [8]:
data_dir = "../../data/18-83510-I94-Data-2016/"
files = os.listdir(data_dir)
for i in range(len(files)):
    print('-'*100)
    df_spark =spark.read.format('com.github.saurfang.sas.spark').load(data_dir + files[i])
    print('Month: ' + calendar.month_name[int(df_spark.select('i94mon').take(1)[0][0])]+ ' | ' + 'Count: ' + str(df_spark.count()))
    df_spark.write.partitionBy('i94mon').parquet("data/sas_data/", 'append')
    print(df_spark.columns)

----------------------------------------------------------------------------------------------------
Month: April | Count: 3096313
['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline', 'admnum', 'fltno', 'visatype']
----------------------------------------------------------------------------------------------------
Month: September | Count: 3733786
['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline', 'admnum', 'fltno', 'visatype']
----------------------------------------------------------------------------------------------------
Month: November | Count: 2914926
['cicid', 

##### The above process took really long and I did it on purpose to check if all the data files have the same columns but it turned out that in the month of july we have some extra columns

In [9]:
# Read all the Parquet files
dfImmig = spark.read.parquet("data/sas_data/")

In [10]:
dfImmig.limit(6).toPandas()

Unnamed: 0,cicid,i94yr,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,...,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,i94mon
0,5680949.0,2016.0,117.0,117.0,NYC,20659.0,1.0,NY,,30.0,...,,1986.0,D/S,F,,IG,2947450000.0,3940,F1,7.0
1,5680950.0,2016.0,245.0,245.0,DET,20659.0,1.0,IL,20679.0,46.0,...,M,1970.0,01232017,M,78652.0,DL,2947451000.0,188,B2,7.0
2,5680953.0,2016.0,245.0,245.0,SEA,20659.0,1.0,WA,20670.0,36.0,...,M,1980.0,01232017,F,130660.0,OZ,2947455000.0,272,B2,7.0
3,5680954.0,2016.0,135.0,135.0,ORL,20659.0,1.0,FL,20673.0,17.0,...,M,1999.0,10212016,F,294090.0,MT,2947456000.0,176,WT,7.0
4,5680956.0,2016.0,213.0,213.0,MIA,20659.0,1.0,FL,20728.0,23.0,...,M,1993.0,01232017,M,21180.0,QR,2947457000.0,777,B2,7.0
5,5680958.0,2016.0,689.0,689.0,ORL,20659.0,1.0,FL,20663.0,56.0,...,M,1960.0,01232017,F,154782.0,JJ,2947459000.0,8086,B2,7.0


In [11]:
# Check for the column names in the Dataframe
print(dfImmig.columns)

['cicid', 'i94yr', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline', 'admnum', 'fltno', 'visatype', 'i94mon']


In [12]:
# Print the Schema
dfImmig.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = t

In [9]:
# Count the number of rows
dfImmig.count()

43886842

In [16]:
# Check for duplicate rows
dfImmig.dropDuplicates().count()

40790529

In [18]:
# Check for null values in the columns of the dataframe -> very long process as the dataframe contains 40 million rows
dfImmig.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dfImmig.columns]).toPandas()

Unnamed: 0,cicid,i94yr,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,...,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,i94mon
0,0,0,28575,0,0,0,74188,2180518,3450469,10319,...,3358010,10319,102028,4494252,38660700,1391693,0,353471,0,0


### Let's look at the Data dictionary provided along with the US Immigration Data and also gather some data from the data dictionary if required

In [13]:
with open('I94_SAS_Labels_Descriptions.SAS') as f:
    labels = f.readlines()
    
lines = [label for label in labels if '/*' in label and '*/\n' in label]
PATTERN = '^/\*\s+(?P<code>.+?)\s+-\s+(?P<description>.+)\s+\*/$'
match = [re.search(PATTERN, l) for l in lines]

for lab in match:
    print(lab.group("code") +" : "+ lab.group("description"))

I94YR : 4 digit year
I94MON : Numeric month
I94CIT & I94RES : This format shows all the valid and invalid codes for processing
I94PORT : This format shows all the valid and invalid codes for processing
I94MODE : There are missing values as well as not reported (9)
I94BIR : Age of Respondent in Years
COUNT : Used for summary statistics
DTADFILE : Character Date Field - Date added to I-94 Files - CIC does not use
VISAPOST : Department of State where where Visa was issued - CIC does not use
OCCUP : Occupation that will be performed in U.S. - CIC does not use
ENTDEPA : Arrival Flag - admitted or paroled into the U.S. - CIC does not use
ENTDEPD : Departure Flag - Departed, lost I-94 or is deceased - CIC does not use
ENTDEPU : Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use
MATFLAG : Match flag - Match of arrival and departure records
BIRYEAR : 4 digit year of birth
DTADDTO : Character Date Field - Date to which admitted to U.S. (allowed to stay un

In [14]:
# Get the valid ports
PORT_PATTERN = re.compile(r"^\s*'(?P<code>...?)'\s*=\s*'(?P<name>.+)'.*$")
valid_ports = {}

for lab in labels[302:961]:
    matches = PORT_PATTERN.match(lab) 
    valid_ports.update({matches.group('code'): matches.group('name').strip().split(",")[0]})

In [15]:
# Get the valid countries
COUNTRY_PATT = re.compile(r"^\s*(\d+)\s*=\s*'(.*?)'")#\s*([""'])(?:(?=(\\?))\2.)*?\1.*$''')
valid_country = {}

for lab in labels[9:298]:
    country = COUNTRY_PATT.search(lab) 
    valid_country.update({int(country.group(1)): country.group(2)})

In [16]:
# Get the valid address
ADDRESS_PATT = re.compile(r"^\s*'(.*?)'\s*=\s*'(.*?)'")#\s*([""'])(?:(?=(\\?))\2.)*?\1.*$''')
valid_address = {}
for lab in labels[981:1036]:
    country = ADDRESS_PATT.search(lab) 
    valid_address.update({country.group(1): country.group(2)})

In [17]:
STATE_PATTERN = re.compile(r"^\s*'(?P<code>...?)'\s*=\s*'(?P<name>.+)'.*$")
valid_states = {}

for lab in labels[302:961]:
    matches = STATE_PATTERN.match(lab) 
    if (len(matches.group('name').strip().split(","))) < 2:
        continue
    else:
        valid_states.update({matches.group('code'): matches.group('name').strip().split(",")[1].strip()})

In [18]:
# UDF to convert date from SAS format to datetime
epoch = datetime.datetime(1960, 1, 1)
sas_to_dt = udf(lambda s: (epoch + datetime.timedelta(days=int(s))).isoformat() if s is not None else None)

In [19]:
mapping_expr = create_map([lit(x) for x in chain(*valid_country.items())])
                          
mapping_expr1 = create_map([lit(x) for x in chain(*valid_ports.items())])

mapping_expr2 = create_map([lit(x) for x in chain(*valid_address.items())])

mapping_expr3 = create_map([lit(x) for x in chain(*valid_states.items())])

##### I wanted to enrich the US Immigration Dataframe by adding few more columns with the data extracted from the provided data dictionary and also convert the dates from SAS format to datetime format

In [20]:
dfImmig = dfImmig.withColumn('arrival_date', sas_to_dt('arrdate'))\
                .withColumn('departure_date', sas_to_dt('depdate'))\
                .withColumn("origin_country", mapping_expr.getItem(col("i94res")))\
                .withColumn("port", mapping_expr1.getItem(col("i94port")))\
                .withColumn("address", mapping_expr2.getItem(col("i94addr")))\
                .withColumn('duration_of_stay', F.datediff('departure_date', 'arrival_date'))\
                .withColumn('us_state', mapping_expr3.getItem(col("i94port")))\
                .withColumn('us_state', mapping_expr2.getItem(col("us_state")))

In [21]:
dfImmig = dfImmig.withColumn('visa_purpose', F.when(col('i94visa')==1.0, 'business')\
            .when(col('i94visa') ==2.0, 'pleasure')\
            .when(col('i94visa') ==3.0, 'student').otherwise(None))

In [22]:
dfImmig = dfImmig.withColumn('arrival_mode', F.when(col('i94mode')==1.0, 'air')\
            .when(col('i94mode') ==2.0, 'sea')\
            .when(col('i94mode') ==3.0, 'land')\
            .when(col('i94mode') ==9.0, 'not reported')\
            .otherwise(None))

In [23]:
dfImmig = dfImmig.withColumn('port_type', F.when(col('i94mode')==1.0, 'airport')\
            .when(col('i94mode') ==2.0, 'seaport')\
            .when(col('i94mode') ==3.0, 'landport')\
            .when(col('i94mode') ==9.0, 'not reported')\
            .otherwise(None))

In [20]:
# Let's look at the US Immigration Dataframe now
dfImmig.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,...,arrival_date,departure_date,origin_country,port,address,duration_of_stay,us_state,visa_purpose,arrival_mode,port_type
0,5680949.0,2016.0,117.0,117.0,NYC,20659.0,1.0,NY,,30.0,...,2016-07-24T00:00:00,,ITALY,NEW YORK,NEW YORK,,NEW YORK,student,air,airport
1,5680950.0,2016.0,245.0,245.0,DET,20659.0,1.0,IL,20679.0,46.0,...,2016-07-24T00:00:00,2016-08-13T00:00:00,"CHINA, PRC",DETROIT,ILLINOIS,20.0,MICHIGAN,pleasure,air,airport
2,5680953.0,2016.0,245.0,245.0,SEA,20659.0,1.0,WA,20670.0,36.0,...,2016-07-24T00:00:00,2016-08-04T00:00:00,"CHINA, PRC",SEATTLE,WASHINGTON,11.0,WASHINGTON,pleasure,air,airport
3,5680954.0,2016.0,135.0,135.0,ORL,20659.0,1.0,FL,20673.0,17.0,...,2016-07-24T00:00:00,2016-08-07T00:00:00,UNITED KINGDOM,ORLANDO,FLORIDA,14.0,FLORIDA,pleasure,air,airport
4,5680956.0,2016.0,213.0,213.0,MIA,20659.0,1.0,FL,20728.0,23.0,...,2016-07-24T00:00:00,2016-10-01T00:00:00,INDIA,MIAMI,FLORIDA,69.0,FLORIDA,pleasure,air,airport


In [22]:
# Let's inspect the most common mode of arrival by the immigrants
dfImmig.groupBy('arrival_mode').count().show()

+------------+--------+
|arrival_mode|   count|
+------------+--------+
|         sea|  413533|
|        null|   74192|
|        land| 1161661|
|not reported|   76863|
|         air|42160593|
+------------+--------+



##### It show that most of them arrived by airway followed by landway

In [23]:
# Let's inspect for what purpose immigrants visit the US
dfImmig.groupBy('visa_purpose').count().show()

+------------+--------+
|visa_purpose|   count|
+------------+--------+
|    pleasure|36172847|
|     student| 1616637|
|    business| 6097358|
+------------+--------+



##### It shows that most people visited the US for the purpose of *pleasure*

####  US Demography Dataset

In [24]:
# Read the dataset and visualize
df_city_demograph = spark.read.option("sep", ";").csv("/home/workspace/us-cities-demographics.csv", header=True)
df_city_demograph.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [25]:
# Count the no. of rows
print(df_city_demograph.count())

# Check for duplicates
print(df_city_demograph.dropDuplicates().count())

2891
2891


In [26]:
# Check for null values in each column of the US Demography Dataframe
df_city_demograph.select([count(when(isnan(cc) | col(cc).isNull(), cc)).alias(cc) for cc in df_city_demograph.columns]).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,0,0,0,3,3,0,13,13,16,0,0,0


In [27]:
# Check the rows of the Dataframe where the Male Population is null
df_city_demograph.filter(col('Male Population').isNull()).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,The Villages,Florida,70.5,,,72590,15231,4034,,FL,Hispanic or Latino,1066
1,The Villages,Florida,70.5,,,72590,15231,4034,,FL,Black or African-American,331
2,The Villages,Florida,70.5,,,72590,15231,4034,,FL,White,72211


In [28]:
# Check the rows of the Dataframe where the Female Population is null
df_city_demograph.filter(col('Female Population').isNull()).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,The Villages,Florida,70.5,,,72590,15231,4034,,FL,Hispanic or Latino,1066
1,The Villages,Florida,70.5,,,72590,15231,4034,,FL,Black or African-American,331
2,The Villages,Florida,70.5,,,72590,15231,4034,,FL,White,72211


##### It looks like the rows which has null values are the same rows in both the cases

### Airport Codes Dataset

In [5]:
# Read and visualize the Airport codes dataset
df_airport = spark.read.csv("/home/workspace/airport-codes_csv.csv", header=True)
df_airport.limit(10).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


In [30]:
# Count the number of rows in the dataset
print(df_airport.count())

# Check for duplicates
df_airport.dropDuplicates().count()

55075


55075

In [31]:
# Check for null values in each column of the Airport codes dataset
df_airport.select([count(when(isnan(cc) | col(cc).isNull(), cc)).alias(cc) for cc in df_airport.columns]).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,0,0,0,7006,0,0,0,5676,14045,45886,26389,0


In [29]:
# Check how many countries and its airports are included in the dataset
df_airport.select('iso_country').distinct().show()

+-----------+
|iso_country|
+-----------+
|         DZ|
|         LT|
|         MM|
|         CI|
|         TC|
|         AZ|
|         FI|
|         SC|
|         PM|
|         UA|
|         ZM|
|         KI|
|         RO|
|         SL|
|         SB|
|         NL|
|         LA|
|         BS|
|         BW|
|         MN|
+-----------+
only showing top 20 rows



In [6]:
df_airport.select('type').distinct().show()

+--------------+
|          type|
+--------------+
| large_airport|
|   balloonport|
| seaplane_base|
|      heliport|
|        closed|
|medium_airport|
| small_airport|
+--------------+



##### It looks like that the Dataframe contains details about airports from many countries. In my case I am interested in the airport which are located in the US, which could be the point of entry for the immigrants into the US

In [7]:
# Filter the Airport codes dataframes to get the airports only from the US
df_airport_us = df_airport.filter(col('iso_country') == 'US')
df_airport_us.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [24]:
# Split the string in the column iso_region to get the US States
df_airport_us = df_airport_us.withColumn('state', F.split(col('iso_region'), '-').getItem(1))

# Get the name of the states from the dictionary valid address and map the values into the new column
df_airport_us = df_airport_us.withColumn("port_state", mapping_expr2.getItem(col("state")))
df_airport_us.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,state,port_state
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125",PA,PENNSYLVANIA
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022",KS,KANSAS
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968",AK,ALASKA
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172",AL,ALABAMA
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087",AR,ARKANSAS


In [25]:
df_airport_us.select('port_state').distinct().show()
print(df_airport_us.select('port_state').distinct().count())

+-----------------+
|       port_state|
+-----------------+
|       NEW JERSEY|
|     PENNSYLVANIA|
|         ILLINOIS|
|         MARYLAND|
|DIST. OF COLUMBIA|
|            IDAHO|
|         MISSOURI|
|          MONTANA|
|         MICHIGAN|
|          FLORIDA|
|           OREGON|
|             null|
|           ALASKA|
|        LOUISIANA|
|            MAINE|
|         OKLAHOMA|
|    NEW HAMPSHIRE|
|         VIRGINIA|
|       WASHINGTON|
|      S. CAROLINA|
+-----------------+
only showing top 20 rows

52


In [34]:
# Check for null values and filter them off
print(df_airport_us.filter(col('port_state').isNull()).count())

df_airport_us.filter(col('port_state').isNull()).limit(5).toPandas()

10


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,state,port_state
0,US-0303,large_airport,atl,,,US,US-U-A,,,,,"-84.375, 33.137551",U,
1,US-0319,small_airport,Ronnie Cole,900.0,,US,US-U-A,Greenfield,,,,"-85.72268, 39.831008",U,
2,US-0384,closed,Beacon Station Air Strip,,,US,US-U-A,Baker,,,,"-116.20977, 35.13047",U,
3,US-0610,closed,Erase Me 13,,,US,US-U-A,,,,,"0, 0.4",U,
4,US-0742,medium_airport,34S Airport,,,US,US-U-A,,,,,"-16.875, 19.145168",U,


In [26]:
df_airport_us = df_airport_us.filter(col('port_state').isNotNull())

In [27]:
df_airport.select('type').distinct().show()

+--------------+
|          type|
+--------------+
| large_airport|
|   balloonport|
| seaplane_base|
|      heliport|
|        closed|
|medium_airport|
| small_airport|
+--------------+



### World Temperature Data

In [36]:
# Read the dataset and visualize it
df_spark_temp =spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv', header=True)
df_spark_temp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### I can see some that in column *AverageTemperatureUncertainty* I have got lots of digits after the decimal place. I need to take care of that by rounding it off to 3 digits after the decimal place

In [37]:
df_spark_temp = df_spark_temp.withColumn('AverageTemperature', F.round(col('AverageTemperature'), 3))\
                                .withColumn('AverageTemperatureUncertainty', F.round(col('AverageTemperatureUncertainty'), 3))
df_spark_temp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### Now better :)

In [54]:
print(df_spark_temp.count())

df_spark_temp.dropDuplicates().count()

8599212


8599212

In [39]:
# Check for null values in each column of the World temperature dataframe
df_spark_temp.select([count(when(isnan(cc) | col(cc).isNull(), cc)).alias(cc) for cc in df_spark_temp.columns]).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,0,364130,364130,0,0,0,0


##### It looks like that the names of the city have both Capital letter and small letters. In order to maintain consistency I will make all the letter Capitals.

In [38]:
df_spark_temp = df_spark_temp.withColumn('city', F.upper(col('City')))
df_spark_temp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,city,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,ÅRHUS,Denmark,57.05N,10.33E
1,1743-12-01,,,ÅRHUS,Denmark,57.05N,10.33E
2,1744-01-01,,,ÅRHUS,Denmark,57.05N,10.33E
3,1744-02-01,,,ÅRHUS,Denmark,57.05N,10.33E
4,1744-03-01,,,ÅRHUS,Denmark,57.05N,10.33E


##### We don't need an historic data from 1700s, 1800s, 1900s as we have US Immigration data only from 2016. Therefore, I am filtering which are in 2000s

In [39]:
df_spark_temp = df_spark_temp.filter(col('dt') >= '2000-01-01')
df_spark_temp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,city,Country,Latitude,Longitude
0,2000-02-01,3.724,0.241,ÅRHUS,Denmark,57.05N,10.33E
1,2000-03-01,3.976,0.296,ÅRHUS,Denmark,57.05N,10.33E
2,2000-04-01,8.321,0.221,ÅRHUS,Denmark,57.05N,10.33E
3,2000-05-01,13.567,0.253,ÅRHUS,Denmark,57.05N,10.33E
4,2000-06-01,14.702,0.24,ÅRHUS,Denmark,57.05N,10.33E


In [40]:
# Drop all the NaN values
df_spark_temp = df_spark_temp.dropna()

In [55]:
# Check the columns for NaN values once again
df_spark_temp.select([count(when(isnan(cc) | col(cc).isNull(), cc)).alias(cc) for cc in df_spark_temp.columns]).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,city,Country,Latitude,Longitude
0,0,0,0,0,0,0,0


In [31]:
df_spark_temp.count()

572570

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

I have chosen the star schema, since it is easy for quering and also fast during aggregation.

The ER Diagram below show the conceptual fact and dimension tables that I want to include in my data model

Fact Table -
 * fact_immigration table
 
Dimension Tables -
 * dim_immigrant table
 * dim_port table
 * dim_date table
 * dim_usdemograph table
 * dim_usairport table
 * dim_ustemperature table


<img src="image/ERD_DEND.jpeg" width="50%"/>

#### 3.2 Mapping Out Data Pipelines
The steps required to channel the data into the data model are as follows -
 1. Since the i94 dataset contains more information than necessary, use this dataset to create three dimension tables from it
     - dim_immigration table which contains the necessary information lije the age, gender, country of origin, etc of the immigrant
     - dim_port table which contains the information about the ports in the US
     - dim_date table which contains the timestamp, date and all other datetime related information 
     
 2. US demography dataset contains all the demographic data about the US, use this dataset to create the dimension table -
      - dim_usdemograph table

 3. The df_airport dataframe contains all the necessary information about the airports in the US, use the dataset and to clean it to create the dimension table -
       - dim_usairport table
       
 4. df_spark_temp contains the information about the World's temperature from the year 2000 till 2013, use this dataframe, filter it to contain only the temperatures from the US and create the dimension table -
     - dim_ustemperature table
     
 5. To create the fact table join the dfImmig dataframe with the df_spark_temp dataframe and select all the necessary columns -
     - fact_immigration table
 
#### PS: Write all the fact and dimension tables after creation to parquet format


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [21]:
from workspace_utils import active_session

In [38]:
def process_immigration_data(input_data, output_path):
    """
    Create the following dimension tables
    :param input_data: Input dataframe
    :param output_path: Path to which parquet file to be written
    :return: Immigrant dimension table, Port dimension table and Date dimension table
    """
    with active_session():
        dim_immigrant_table = input_data.select(col('i94bir').alias('age').cast(IntegerType()), 
                                         col('gender'), col('arrival_date'), 
                                         col('departure_date'), col('visatype'), col('visa_purpose'), 
                                         col('origin_country'), col('arrival_mode'), col('duration_of_stay'), 
                                         col('port').alias('port_of_entry'), col('address'), 
                                         col('i94port').alias('port'), 'occup', 'i94res')
    
        # Check if there are any negative duration of stay and filter them out. 
        # We need the duration of stay which has None value, as we don't know, 
        # if the immigrant left US to another destination or stayed in the US
        if (dim_immigrant_table.filter(col('duration_of_stay') < 0).count()) > 0:
            dim_immigrant_table = dim_immigrant_table.filter((col('duration_of_stay') > 0) | (col('duration_of_stay').isNull()))
        
        # Write the immgrant dimension table as a parquet file partitioned by the port of entry
        dim_immigrant_table.write.partitionBy('port_of_entry').parquet(output_path +
                                                                       "output_data/dim_immigrant_table/", 'overwrite')
    
        # Create Port Dimension Table
        dim_port_table = input_data.select('i94port', col('port').alias('port_city'), 'us_state').dropDuplicates()
        # Write the port dimension table as a parquet file partitioned by the name of the port cities
        dim_port_table.write.partitionBy('port_city').parquet(output_path + "output_data/dim_port_table/", 'overwrite')
    
        # Create Date Dimension Table
        dim_date_table = input_data.select(
                    'arrival_date',
                    'departure_date',
                    dayofmonth("arrival_date").alias('arrival_day'),
                    weekofyear("arrival_date").alias('arrival_week'),
                    month("arrival_date").alias('arrival_month'),
                    year("arrival_date").alias('arrival_year'),
                    date_format("arrival_date", "E").alias("arrival_weekday"),
                    dayofmonth("departure_date").alias('departure_day'),
                    weekofyear("departure_date").alias('departure_week'),
                    month("departure_date").alias('departure_month'),
                    year("departure_date").alias('departure_year'),
                    date_format("departure_date", "E").alias("departure_weekday")
                )
        # Write the date dimension table as a parquet file
        dim_date_table.write.parquet(output_path + "output_data/dim_date_table/", 'overwrite')
    
        return dim_immigrant_table, dim_port_table, dim_date_table

In [39]:
def process_demograph_data(input_data, output_path):
    """
    Create the US Demograph dimension table
    :param input_data: Input dataframe
    :param output_path: Path to which parquet file to be written
    :return: US Demograph dimension table
    """
    with active_session():
        # Create US Demograph Dimension Table
        dim_usdemograph_table = input_data.select(F.upper(col('City')).alias('city'), 
                                              F.upper(col('State')).alias('state'), 
                                              col('Male Population').alias('male_poulation'), 
                                              col('Female Population').alias('female_population'), 
                                              col('Total Population').alias('total_population'), 
                                              col('Median Age').alias('median_age').cast(IntegerType()), 
                                              col('Foreign-born').alias('foreign_born'), 
                                              col('Number of Veterans').alias('no_of_veterans'), 
                                              col('Race').alias('ethnicity'), 
                                              col('Count').alias('ethinicity_count'))
        # Write the US demograph dimension table as a parquet file
        dim_usdemograph_table.write.partitionBy('state', 'city').parquet(output_path +
                                                                         "output_data/dim_usdemograph_table/", 'overwrite')
    
        return dim_usdemograph_table

In [40]:
def process_aiport_data(input_data, output_path):
    """
    Create the US Airport dimension table
    :param input_data: Input dataframe
    :param output_path: Path to which parquet file to be written
    :return: US Airport dimension table 
    """
    with active_session():
        # Create US Airport Dimension Table
        dim_usairport_table = input_data.select(col('ident'),
                              col('type'),
                              col('name'),
                              col('elevation_ft'),
                              col('iata_code'),
                              col('local_code'),
                              col('coordinates'),
                              col('iso_region'),
                              col('port_state'))
    
        # Add a column 'port_type' which contains a string 'airport'. This could be handy during joins
        dim_usairport_table = dim_usairport_table.withColumn('port_type', F.lit('airport'))
        # Write the US airport dimension table as a parquet file partitioned by the state in which the port is located
        dim_usairport_table.write.partitionBy('port_state').parquet(output_path +
                                                                    "output_data/dim_usairport_table/", 'overwrite')
    
        return dim_usairport_table

In [41]:
def process_temperature_data(input_data, output_path):
    """
    Create the US Temperature dimension table
    :param input_data: Input dataframe
    :param output_path: Path to which parquet file to be written
    :return: US Temperature dimension table 
    """
    with active_session():
        # Since I am interested only in the US, I would filter the dataframe and get the rows where *Country = United States*
        dim_ustemperature_table = input_data.filter(input_data.Country.contains('United States'))
        # Rename the column city to us_city to give a clear idea
        dim_ustemperature_table = dim_ustemperature_table.withColumnRenamed('city', 'us_city')
        # Drop all the duplicates formed because of the join
        dim_ustemperature_table = dim_ustemperature_table.dropDuplicates()
        # Write the US temperature dimension table as a parquet file partitioned by the us city 
        #print("Writing the file now to AWS S3")
        
        start = time.time()
        dim_ustemperature_table.write.partitionBy('us_city').parquet(output_path +
                                                                     "output_data/dim_ustemperature_table/", 
                                                                     'overwrite')
        
        end = time.time()
        print(str((end-start)/60)+ ' minutes')
    
        return dim_ustemperature_table

In [42]:
def create_fact_table(df1, df2, output_path):
    """
    Create the US Immigration fact table
    :param df1: First dataframe
    :param df2: Second dataframe
    :param output_path: Path to which parquet file to be written
    :return: Immigration fact table 
    """
    with active_session():
        req_col = ['city', 'Country', 'Latitude', 'Longitude']
        df_temp = df2.filter((col('dt') >= '2009-01-01') & df_spark_temp.Country.contains('United States')).groupBy(req_col).agg(F.avg('AverageTemperature').alias('avg_temp'))
        #df_temp.count()
        
        fact_immigration_table = df1.join(df_temp, (df1.port == df_temp.city))\
                .filter(col('port').isNotNull())\
                .withColumn('immigration_id', F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))\
                .select(col('port'), col('i94port'), col('arrival_date'), col('port_type'), 
                        col('us_state'), col('avg_temp').alias('avg_temperature')).dropDuplicates()
        
        print(fact_immigration_table.limit(5).toPandas())
        
        start = time.time()
        print(start)
        # Write the US immigration fact table as a parquet file partitioned by port 
        fact_immigration_table.write.partitionBy('port').parquet(output_path +
                                                                 "output_data/fact_immigration_table/", 'overwrite')
        
        end = time.time()
        print(str((end-start)/60)+ ' minutes')
    
        return fact_immigration_table

In [47]:
output_path = "s3a://udacity-dataengg-capstone/"

In [65]:
dim_immigrant_table, dim_port_table, dim_date_table = process_immigration_data(dfImmig, output_path)

In [66]:
dim_usdemograph_table = process_demograph_data(df_city_demograph, output_path)

In [67]:
dim_usdemograph_table.limit(5).toPandas()

Unnamed: 0,city,state,male_poulation,female_population,total_population,median_age,foreign_born,no_of_veterans,ethnicity,ethinicity_count
0,SILVER SPRING,MARYLAND,40601,41862,82463,33,30908,1562,Hispanic or Latino,25924
1,QUINCY,MASSACHUSETTS,44129,49500,93629,41,32935,4147,White,58723
2,HOOVER,ALABAMA,38040,46799,84839,38,8229,4819,Asian,4759
3,RANCHO CUCAMONGA,CALIFORNIA,88127,87105,175232,34,33878,5821,Black or African-American,24437
4,NEWARK,NEW JERSEY,138040,143873,281913,34,86253,5829,White,76402


In [68]:
dim_usairport_table = process_aiport_data(df_airport_us, output_path)

In [69]:
dim_usairport_table.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,iata_code,local_code,coordinates,iso_region,port_state,port_type
0,00A,heliport,Total Rf Heliport,11,,00A,"-74.93360137939453, 40.07080078125",US-PA,PENNSYLVANIA,airport
1,00AA,small_airport,Aero B Ranch Airport,3435,,00AA,"-101.473911, 38.704022",US-KS,KANSAS,airport
2,00AK,small_airport,Lowell Field,450,,00AK,"-151.695999146, 59.94919968",US-AK,ALASKA,airport
3,00AL,small_airport,Epps Airpark,820,,00AL,"-86.77030181884766, 34.86479949951172",US-AL,ALABAMA,airport
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,,"-91.254898, 35.6087",US-AR,ARKANSAS,airport


In [38]:
dim_ustemperature_table = process_temperature_data(df_spark_temp, output_path)

4.068682173887889 minutes


In [45]:
fact_immigration_table = create_fact_table(dfImmig, df_spark_temp, output_path)

      port i94port         arrival_date port_type us_state  avg_temperature
0  ORLANDO     ORL  2016-07-24T00:00:00   airport  FLORIDA        23.268339
1  ORLANDO     ORL  2016-07-25T00:00:00   airport  FLORIDA        23.268339
2  ORLANDO     ORL  2016-07-26T00:00:00   airport  FLORIDA        23.268339
3  ORLANDO     ORL  2016-07-27T00:00:00   airport  FLORIDA        23.268339
4  ORLANDO     ORL  2016-07-29T00:00:00   airport  FLORIDA        23.268339
1597093760.249533
10.520065184434255 minutes


In [28]:
filepath = "s3a://udacity-dataengg-capstone/output_data/"
dim_immigrant_table = spark.read.parquet(filepath + "dim_immigrant_table/")
dim_port_table = spark.read.parquet(filepath + "dim_port_table/")
dim_date_table = spark.read.parquet(filepath + "dim_date_table/")
dim_usdemograph_table = spark.read.parquet(filepath + "dim_usdemograph_table/")
dim_usairport_table = spark.read.parquet(filepath + "dim_usairport_table/")
dim_ustemperature_table = spark.read.parquet(filepath + "dim_ustemperature_table/")
fact_immigration_table = spark.read.parquet(filepath + "fact_immigration_table/")

In [5]:
dim_immigrant_table.limit(5).toPandas()

Unnamed: 0,age,gender,arrival_date,departure_date,visatype,visa_purpose,origin_country,arrival_mode,duration_of_stay,address,port,occup,i94res,port_of_entry
0,30,,2016-10-26T00:00:00,2016-11-01T00:00:00,B2,pleasure,BRAZIL,air,6,NEBRASKA,NYC,,689.0,NEW YORK
1,53,F,2016-10-29T00:00:00,2016-11-05T00:00:00,WT,pleasure,ITALY,air,7,NEW YORK,NYC,,117.0,NEW YORK
2,48,F,2016-10-26T00:00:00,2016-11-07T00:00:00,B2,pleasure,"CHINA, PRC",air,12,NEW YORK,NYC,,245.0,NEW YORK
3,16,M,2016-10-29T00:00:00,2016-11-07T00:00:00,WT,pleasure,IRELAND,air,9,NEW YORK,NYC,,116.0,NEW YORK
4,34,M,2016-10-26T00:00:00,2016-11-04T00:00:00,B1,business,"CHINA, PRC",air,9,NEW JERSEY,NYC,,245.0,NEW YORK


In [7]:
dim_port_table.limit(5).toPandas()

Unnamed: 0,i94port,us_state,port_city
0,NYC,NEW YORK,NEW YORK
1,NYC,NEW YORK,NEW YORK
2,NYC,NEW YORK,NEW YORK
3,NYC,NEW YORK,NEW YORK
4,NYC,NEW YORK,NEW YORK


In [9]:
dim_usdemograph_table.limit(5).toPandas()

Unnamed: 0,male_poulation,female_population,total_population,median_age,foreign_born,no_of_veterans,ethnicity,ethinicity_count,state,city
0,4081698,4468707,8550405,36,3212500,156961,White,3835726,NEW YORK,NEW YORK
1,4081698,4468707,8550405,36,3212500,156961,Asian,1304564,NEW YORK,NEW YORK
2,4081698,4468707,8550405,36,3212500,156961,American Indian and Alaska Native,90923,NEW YORK,NEW YORK
3,4081698,4468707,8550405,36,3212500,156961,Hispanic or Latino,2485125,NEW YORK,NEW YORK
4,4081698,4468707,8550405,36,3212500,156961,Black or African-American,2192248,NEW YORK,NEW YORK


In [29]:
dim_usairport_table.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,iata_code,local_code,coordinates,iso_region,port_type,port_state
0,00TA,heliport,Sw Region Faa Heliport,598,,00TA,"-97.30580139160156, 32.826900482177734",US-TX,airport,TEXAS
1,00TE,heliport,Tcjc-Northeast Campus Heliport,600,,00TE,"-97.18949890136719, 32.847599029541016",US-TX,airport,TEXAS
2,00TS,small_airport,Alpine Range Airport,670,,00TS,"-97.24199676513672, 32.607601165771484",US-TX,airport,TEXAS
3,00TX,closed,San Jacinto Methodist Hospital Heliport,19,,,"-94.980201, 29.7377",US-TX,airport,TEXAS
4,00XS,small_airport,L P Askew Farms Airport,3110,,00XS,"-101.93399810791016, 33.03340148925781",US-TX,airport,TEXAS


In [6]:
dim_ustemperature_table.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,Country,Latitude,Longitude,us_city
0,2002-04-01,18.366,0.366,United States,36.17N,115.36W,SPRING VALLEY
1,2002-10-01,17.17,0.322,United States,36.17N,115.36W,SPRING VALLEY
2,2002-12-01,6.258,0.235,United States,36.17N,115.36W,SPRING VALLEY
3,2005-03-01,12.268,0.189,United States,36.17N,115.36W,SPRING VALLEY
4,2006-10-01,16.934,0.245,United States,36.17N,115.36W,SPRING VALLEY


In [37]:
print('*'*100)
print(dim_immigrant_table.limit(5).toPandas())
print('*'*100)
print(dim_port_table.limit(5).toPandas())
print('*'*100)
print(dim_date_table.limit(5).toPandas())
print('*'*100)
print(dim_usdemograph_table.limit(5).toPandas())
print('*'*100)
print(dim_usairport_table.limit(5).toPandas())
print('*'*100)
print(dim_ustemperature_table.limit(5).toPandas())
print('*'*100)

****************************************************************************************************
   age gender         arrival_date       departure_date visatype visa_purpose  \
0   30   None  2016-10-26T00:00:00  2016-11-01T00:00:00       B2     pleasure   
1   53      F  2016-10-29T00:00:00  2016-11-05T00:00:00       WT     pleasure   
2   48      F  2016-10-26T00:00:00  2016-11-07T00:00:00       B2     pleasure   
3   16      M  2016-10-29T00:00:00  2016-11-07T00:00:00       WT     pleasure   
4   34      M  2016-10-26T00:00:00  2016-11-04T00:00:00       B1     business   

  origin_country arrival_mode  duration_of_stay     address port occup  \
0         BRAZIL          air                 6    NEBRASKA  NYC  None   
1          ITALY          air                 7    NEW YORK  NYC  None   
2     CHINA, PRC          air                12    NEW YORK  NYC  None   
3        IRELAND          air                 9    NEW YORK  NYC  None   
4     CHINA, PRC          air             

#### 4.2 Data Quality Checks

In [95]:
def check_rows(input_data):
    """
    Perform data count check
    :param input_data: dataframe to check
    :return: True or False
    """
    check = input_data.count() > 0
    return check

def integrity_check(fact, dimen, column_fact, column_dim):
    """
    Perform integrity to check the integrity between the fact and dimension tables
    :param fact: immigration fact table
    :param dimen: dimension table
    :column_fact: column of the fact table for join operation
    :column_dim: column of the dimension table for join operation
    :return: True or False (if False the reason behind it)
    """
    check = fact.select(column_fact).distinct() \
            .join(dimen, fact[column_fact] == dimen[column_dim], "left_anti") \
            .count() == 0
    if not check:
        dftmp_ = fact.select(column_fact).distinct() \
            .join(dimen, fact[column_fact] == dimen[column_dim], "left_anti")
        
        #if not 'f' in df.columns:
            
        if dftmp_.filter(col(column_fact).isNull()).collect():
            print("The {} column has null values".format(column_fact))
        
        elif column_dim in dftmp_.columns:
            if dftmp_.filter(col(column_dim).isNull()).collect():
                print("The {} column has null values".format(column_dim))
        else:
            print("The integrity test fail due to the following:")
            print('__' * 50)
            print(dftmp_.limit(5).toPandas())
        return check
    else:
        return check

In [37]:
integrity_check(fact_immigration_table, dim_immigrant_table, "port", "port_of_entry")

True

In [38]:
integrity_check(fact_immigration_table, dim_date_table, "arrival_date", "arrival_date")

True

In [96]:
integrity_check(fact_immigration_table, dim_usdemograph_table, "port", "city")

The integrity test fail due to the following:
____________________________________________________________________________________________________
         port
0  LOUISVILLE
1     NOGALES


False

In [97]:
integrity_check(fact_immigration_table, dim_usairport_table, "us_state", "port_state")

The us_state column has null values


False

In [43]:
integrity_check(fact_immigration_table, dim_port_table, "i94port", "i94port")

True

In [44]:
integrity_check(fact_immigration_table, dim_ustemperature_table, "port", "us_city")

True

In [69]:
check_rows(dim_port_table)

True

In [70]:
check_rows(dim_usairport_table)

True

In [71]:
check_rows(dim_usdemograph_table)

True

In [72]:
check_rows(dim_date_table)

True

In [73]:
check_rows(dim_immigrant_table)

True

In [75]:
check_rows(dim_ustemperature_table)

True

In [76]:
check_rows(fact_immigration_table)

True

#### 4.3 Data dictionary 

##### Fact Table

 * fact_immigration table contains the 
     * immigration_id : Unique ID for each immigration
     * i94port : Three letter code for the port
     * port : The city in which port is located
     * arrival_date : Immigrant's arrival date to the US
     * port_type : The type of port (airport, seaport, landport)
     * us_state : The state of the US
     * avg_temperature : The average temperature 
     
##### Dimension Tables

 * dim_immigrant table
     * port_of_entry : The port of entry for the immigrant
     * age : The age of the immigrant
     * gender : The gender of the immigrant
     * arrival_date : The date on which the immigrant entered the US
     * departure_date : The date on which the immigrant departed from the US
     * visatype : The type of Visa issued to the immigrant
     * visa_purpose : The purpose due to which the Visa was issued to the immigrant
     * origin_country : Origin country of the immigrant
     * arrival_mode : Arrival mode (air, sea, land)
     * duration_of_stay : The duration of the stay in the US
     * address : The address of the immigrant in the US
     * port : Three letter code for the port
     * occup : occupation of the immigrant
     * i94res : Three letter code for the origin country
     
 * dim_port table
     * i94port : Three letter code for the port
     * us_state : The state of the US
     * port_city :  The city in which the port is located
     
 * dim_date table
     * arrival_date : Timestamp for arrival 
     * departure_date : Timestamp for departure 
     * arrival_day : The date of arrival
     * arrival_week : The week of arrival
     * arrival_month : The month of arrival
     * arrival_year : The year of arrival
     * arrival_weekday : The weekday of arrival
     * departure_day : The day of departure
     * departure_week : The weekof departure
     * departure_month : The month of departure
     * departure_year : The year of departure
     * departure_weekday : The weekday of departure
     
 * dim_usdemograph table
     * city : City in US
     * state : State in the US
     * male_population : Male population in a city
     * female_population : Female population in a city
     * total_population : Total population in a city
     * median_age : The median age of the population
     * foreign_born : Number of foreign born in the population
     * no_of_veterans : Number of veterans in the population
     * ethnicity : Ethnicity
     * ethnicity_count : Count of the respective ethnicity
     
 * dim_usairport table
     * port_state : State in the US where port is located
     * ident : Unique ID of the port
     * type : Type of the port
     * name : Name of the port
     * elevation_ft : The elevation of the port in feet
     * iata_code : IATA code for the port
     * local_code : Local code for the port
     * coordinates : Coordinates of the port
     * iso_region : Region code of the port
     * port_type : The type of the port
     
 * dim_ustemperature
     * us_city : The city in the US
     * dt : Date
     * AverageTemperature : The Average temperature
     * AverageTemperatureUncertainty : The uncertainty of the temperature
     * Country : The country (in this case only United States)
     * Latitude : Latitude
     * Longitude : Longitude


#### Step 5: Complete Project Write Up

* Clearly state the rationale for the choice of tools and technologies for the project.
 - I have choose Python Library Pandas to explore the dataset as it more user-friendly, can efficiently handle bigger datasets and has an extensive set of features that helps us to explore and visualize the dataset easily. Secondly I have used the Python API for Spark i.e., Pyspark as it is more efficient when handling Big Data and because of distributed computing property and the power of spark could be improved by horizontally scaling up of the hardware. I have use the cloud storage service S3 from the AWS as it is easy to use, cheap and safe.
 
* Propose how often the data should be updated and why.
 - I think after exploring the US immigration data seeing the trend as the dataset was provided on a monthly basis, it would be best to continue the monthly updation of the dataset for efficient processing of the dataset. In case if any use case arises which could require daily or weekly updation, we have the tools which could be adjusted according for efficient processing of the dataset.
 
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     - In this case I would leverage the power of the AWS EMR cluster for processing and S3 for storage and let the autoscaling do the job in case we need more of less spark workers.
     
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - In this case a cron job for scheduling the job or building the datapipeline with Airflow and scheduling its run by specifying the suitable start date 
     
 * The database needed to be accessed by 100+ people.
     - Either Amazon S3 or Redshift come into the picture which would satify our need and the choice between S3 and Redshift could be made on the use case
     - [Correction]The more people accessing the database the more CPU resources you need to get a fast experience. By using a distributed database you can improve your replications and partitioning to get faster query results for each user.