# Immigration Data in the United States
### Data Engineering Capstone Project

#### Project Summary
In this project, I am looking at immigration data for the United States. Specifically, I'm interested in the following relationships:
1. Correlation between temperature and tourist volume
2. Seasonality of tourism
3. Correlation between tourist arrivals and foreign arrivals
4. The correlation between tourists and demographics in different cities

##### DataSets
1. **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace.
2. **World Temperature Data:** This dataset came from Kaggle.
3. **U.S. City Demographic Data:** This data comes from OpenSoft.
4. **Airport Code Table:** This is a simple table of airport codes and corresponding cities.
#### Problem-solving
To perform this problem, I have aggregated the following data:
1. Based on time
2. Based on city and airport
3. The impact of temperature on tourists
4. Demographics to tourists

#### Steps
The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# All library
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, date_add, length, col
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import desc, asc
from pyspark.sql.functions import sum as Fsum
import pyspark.sql.functions as F

import datetime

import numpy as np
import pandas as pd

import sql_immigration, sql_dim_fact, sql_data_quality
from checknull import Check
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
In this step, I will learn the composition and structure of datasets, what they consist of, and what best serves the problem I give. I will use the Datasets I introduced above, using pandas, NumPy, and spark, and I will learn them. I will try to clean up the dataset I've included sometimes.

#### Describe and Gather Data 
##### DataSets
1. **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. 
2. **World Temperature Data:** This dataset came from Kaggle.
3. **U.S. City Demographic Data:** This data comes from OpenSoft.
4. **Airport Code Table:** This is a simple table of airport codes and corresponding cities.

First of all, I use Pandas to read files

In [2]:
# Read in the data here
# df_immigration_sample = pd.read_csv('data/immigration_data_sample.csv')
df_countries = pd.read_csv('data/countries.csv')
df_i94portcodes = pd.read_csv('data/i94portCodes.csv')
df_demographics = pd.read_csv('data/us-cities-demographics.csv', sep=';')
df_airports = pd.read_csv('data/airport-codes_csv.csv')
df_temperature = pd.read_csv('data/GlobalLandTemperaturesByCity.csv')
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


Now I will learn an overview of what my tables consist of, and how to structure them,...

In [3]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [4]:
df_immigration.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

Table df_immigration includes columns: Unnamed, cicid, i94yr, i94mon, i94cit, i94res, i94port, arrdate, i94mode, i94addr, depdate, i94bir, i94visa, count, dtadfile,
visapost, occup, entdepa, entdepd, entdepu, matflag, biryear, dtaddto, gender, insnum, airline, admnum, fltno, visatype 

In [5]:
df_countries.head(2)

Unnamed: 0,code,country
0,582,MEXICO
1,236,AFGHANISTAN


In [6]:
df_countries.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 289 entries, 0 to 288
Data columns (total 2 columns):
code       289 non-null int64
country    289 non-null object
dtypes: int64(1), object(1)
memory usage: 4.6+ KB


Table df_countries includes columns: code, country

In [7]:
df_i94portcodes.head(2)

Unnamed: 0,code,location,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK


In [8]:
df_i94portcodes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 660 entries, 0 to 659
Data columns (total 3 columns):
code        660 non-null object
location    660 non-null object
state       585 non-null object
dtypes: object(3)
memory usage: 15.5+ KB


Table df_i94portcodes includes columns: code, location, state

In [9]:
df_demographics.head(2)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723


In [10]:
df_demographics.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


Table df_demographics includes columns: City, State, Median Age, Male Population, Female Population, Total Population, Number of Veterans, Foreign-born, Average Household Size, State Code	Race, Count

In [11]:
df_airports.head(2)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"


In [12]:
df_airports.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


Table df_airports includes columns: ident, type, name, elevation_ft, continent, iso_country, iso_region, municipality, gps_code, iata_code, local_code, coordinates

In [13]:
df_temperature.head(2)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E


In [14]:
df_temperature.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


Table df_temperature includes columns: dt, AverageTemperature, AverageTemperatureUncertainty, City, Country, Latitude, Longitude

In [15]:
# #write to parquet
# df_immigration.write.parquet("sas_data_test")
# df_immigration=spark.read.parquet("sas_data_test")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

df_immigration

In [16]:
df_immigration.createOrReplaceTempView("immig_table")

In [17]:
df_immigration.count()

3096313

Check if column cicid can be used as the primary key for the table

In [18]:
spark.sql(sql_immigration.count_dist_cicid).show()

+-------+
|  count|
+-------+
|3096313|
+-------+



In [19]:
# Or use the command below
df_immigration.select('cicid').distinct().count()

3096313

Check if i94port is 3 characters long

In [20]:
spark.sql(sql_immigration.length_character).show()

+---+
|len|
+---+
|  3|
+---+



In [21]:
# Or use the command below
df_immigration.groupBy(F.length('i94port')).agg(F.length('i94port').alias('len')).select('len').show()

+---+
|len|
+---+
|  3|
+---+



Next, I need to convert the arrdate columns into something usable for my problem.

In [22]:
# All dates in SAS correspond to the number of days since 1960-01-01.
# Therefore, we compute the arrival dates by adding arrdate to 1960-01-01
spark.sql(sql_immigration.compute_arrival_date).createOrReplaceTempView("immig_table")

In [23]:
spark.sql(sql_immigration.check_all).printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

Next, I will replace the data in the I94VISA column with new types based on the old data including three types:
*   1 = Business
*   2 = Pleasure
*   3 = Student

In [24]:
spark.sql(sql_immigration.replace_data_visa).createOrReplaceTempView("immig_table")

In [25]:
spark.sql(sql_immigration.check_all).show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+---------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|arrival_date|visa_type|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+---------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|  2016-04-29| Pleasure|
|  7.0|2016.0|   4.0

In [26]:
spark.sql(sql_immigration.replace_data_depdate).createOrReplaceTempView("immig_table")

In [27]:
spark.sql(sql_immigration.check_all).show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+---------+--------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|arrival_date|visa_type|departure_date|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+---------+--------------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2

In [28]:
# Check whether the returned result contains an NA value or not
spark.sql(sql_immigration.check_NA).show()

+-----+
|count|
+-----+
|    0|
+-----+



In [29]:
# Check that the arrival date must be greater than the departure date
spark.sql(sql_immigration.check_DD_AD).show()

+-----+
|count|
+-----+
|  375|
+-----+



In [30]:
spark.sql(sql_immigration.check_arr_dep).show(5)

+------------+--------------+
|arrival_date|departure_date|
+------------+--------------+
|  2016-04-01|    2016-03-31|
|  2016-04-02|    2016-03-19|
|  2016-04-02|    2016-01-26|
|  2016-04-02|    2016-04-01|
|  2016-04-02|    2016-01-31|
+------------+--------------+
only showing top 5 rows



In [31]:
spark.sql(sql_immigration.check_AD_DD).createOrReplaceTempView("immig_table")

In [32]:
# check distinct departure_date
spark.sql(sql_immigration.check_dep).show()

+-----+
|count|
+-----+
|  174|
+-----+



In [33]:
# check distinct arrival_dates
spark.sql(sql_immigration.check_arr).show()

+-----+
|count|
+-----+
|   30|
+-----+



In [34]:
# check the common values between the two sets
spark.sql(sql_immigration.check_common).show()

+-----+
|count|
+-----+
|   29|
+-----+



Check the data for the arrival modes

In [35]:
spark.sql(sql_immigration.check_mode).show()

+-------+--------+
|i94mode|count(1)|
+-------+--------+
|   null|     238|
|    1.0| 2871184|
|    3.0|   61572|
|    2.0|   17970|
|    9.0|    2517|
+-------+--------+



In [36]:
# Check if there are any missing values in the age column
spark.sql(sql_immigration.check_age).show()

+-----+
|count|
+-----+
|   46|
+-----+



In [37]:
# Check if biryear columns can be used
spark.sql(sql_immigration.check_bir).show()

+-----+
|count|
+-----+
|    0|
+-----+



In [38]:
spark.sql(sql_immigration.check_biryear).show()

+------------+------------+
|max(biryear)|min(biryear)|
+------------+------------+
|      2016.0|      1916.0|
+------------+------------+



In [39]:
# Number of visitors over 80 years old as of 2016
spark.sql(sql_immigration.check_biryear_age).show()

+-----+
|count|
+-----+
|24694|
+-----+



In [40]:
# frequency of visitors by year of birth
spark.sql(sql_immigration.check_fre_biryear).show()

+-------+-----+
|biryear|count|
+-------+-----+
| 1916.0|    8|
| 1917.0|   16|
| 1918.0|   21|
| 1919.0|   36|
| 1920.0|   34|
| 1921.0|   69|
| 1922.0|   89|
| 1923.0|  155|
| 1924.0|  209|
| 1925.0|  274|
| 1926.0|  414|
| 1927.0|  569|
| 1928.0|  792|
| 1929.0| 1073|
| 1930.0| 1442|
| 1931.0| 1794|
| 1932.0| 2239|
| 1933.0| 2688|
| 1934.0| 3442|
| 1935.0| 4194|
+-------+-----+
only showing top 20 rows



In [41]:
spark.sql(sql_immigration.check_age_bir).show()

+----------+-------+
|difference|  count|
+----------+-------+
|       0.0|2953435|
+----------+-------+



Check the gender to see if the data is usable

In [42]:
spark.sql(sql_immigration.check_gender).show()

+------+-------+
|gender|  count|
+------+-------+
|     F|1228646|
|  null| 407456|
|     M|1316305|
|     U|    238|
|     X|    836|
+------+-------+



In [43]:
spark.sql(sql_immigration.select_gender).createOrReplaceTempView("immig_table")

In [44]:
# Check the citizenship countries is null
spark.sql(sql_immigration.check_cit_null).show()

+-----+
|count|
+-----+
|    0|
+-----+



In [45]:
# Check the residence countries is null
spark.sql(sql_immigration.check_res_null).show()

+-----+
|count|
+-----+
|    0|
+-----+



In [46]:
# Check the reported address is null
spark.sql(sql_immigration.check_addr_null).show()

+------+
| count|
+------+
|114019|
+------+



In [47]:
# Check the visatype is null
spark.sql(sql_immigration.check_visa_null).show()

+-----+
|count|
+-----+
|    0|
+-----+



In [48]:
spark.sql(sql_immigration.check_all_visa).show()

+---------+--------+-------+
|visa_type|visatype|  count|
+---------+--------+-------+
| Business|      B1| 186610|
| Business|      E1|   3182|
| Business|      E2|  16227|
| Business|     GMB|    132|
| Business|       I|   2962|
| Business|      I1|    214|
| Business|      WB| 185857|
| Pleasure|      B2| 967988|
| Pleasure|      CP|  11785|
| Pleasure|     CPL|      8|
| Pleasure|     GMT|  79454|
| Pleasure|     SBP|      2|
| Pleasure|      WT|1060229|
|  Student|      F1|  27789|
|  Student|      F2|   1774|
|  Student|      M1|    708|
|  Student|      M2|     30|
+---------+--------+-------+



The definitions for various detailed visa types are listed below.
* B1 visa is for business visits valid for up to a year
* B2 visa is for pleasure visits valid for up to a year
* CP could not find a definition
* E2 investor visas allow foreign investors to enter and work inside of the United States based on a substantial investment
* F1 visas are used by non-immigrant students for Academic and Language training Courses. 
* F2 visas are used by the dependents of F1 visa holders
* GMT could not find a definition
* M1 for students enrolled in non-academic or “vocational study”. Mechanical, language, cooking classes, etc...
* WB Waiver Program (WT/WB Status) travel to the United States for tourism or business for stays of 90 days or less without obtaining a visa.
* WT Waiver Program (WT/WB Status) travel to the United States for tourism or business for stays of 90 days or less without obtaining a visa.

Check the data for the occup columns

In [49]:
spark.sql(sql_immigration.check_occup).show()

+-----+-------+
|occup|      n|
+-----+-------+
| null|2538838|
|  STU|   3275|
|  OTH|    508|
|  NRR|    299|
|  MKT|    262|
|  EXA|    175|
|  ULS|    142|
|  ADM|    119|
|  GLS|    119|
|  TIE|    108|
|  MVC|     58|
|  ENO|     55|
|  CEO|     53|
|  TIP|     49|
|  LLJ|     45|
|  RET|     44|
|  CMP|     43|
|  PHS|     42|
|  UNP|     33|
|  HMK|     30|
+-----+-------+
only showing top 20 rows



I finished parsing the df_immigration tables 

In [50]:
df_immigration = spark.sql(sql_immigration.check_all) 

In [51]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

df_countries

In [52]:
df_countries.head(5)

Unnamed: 0,code,country
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [53]:
df_countries.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 289 entries, 0 to 288
Data columns (total 2 columns):
code       289 non-null int64
country    289 non-null object
dtypes: int64(1), object(1)
memory usage: 4.6+ KB


Since the data is complete, I don't need to clean or delete the data

df_i94portcodes

In [54]:
df_i94portcodes.head(5)

Unnamed: 0,code,location,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


In [55]:
df_i94portcodes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 660 entries, 0 to 659
Data columns (total 3 columns):
code        660 non-null object
location    660 non-null object
state       585 non-null object
dtypes: object(3)
memory usage: 15.5+ KB


Since the data is complete, I don't need to clean or delete the data

df_demographics

In [56]:
df_demographics.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [57]:
df_demographics.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [58]:
# Convert the city columns to uppercase and remove any leading and trailing spaces
df_demographics['City'] = df_demographics['City'].str.upper().str.strip()

In [59]:
df_demographics.isna().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [60]:
# Since the primary key is a combination of city name and race, we will remove duplicates in these 2 columns
columns = ['City','State','Race']
df_demographics = df_demographics[df_demographics[columns].duplicated()]
df_demographics.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


In [61]:
df_demographics.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 0 entries
Data columns (total 12 columns):
City                      0 non-null object
State                     0 non-null object
Median Age                0 non-null float64
Male Population           0 non-null float64
Female Population         0 non-null float64
Total Population          0 non-null int64
Number of Veterans        0 non-null float64
Foreign-born              0 non-null float64
Average Household Size    0 non-null float64
State Code                0 non-null object
Race                      0 non-null object
Count                     0 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 0.0+ bytes


df_airports

In [62]:
df_airports.head(5)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [63]:
df_airports.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [64]:
df_airports.isna().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

Check countries that contain airports

In [65]:
df_airports.iso_country.value_counts()

US    22757
BR     4334
CA     2784
AU     1963
KR     1376
MX     1181
RU     1040
DE      947
GB      911
FR      850
AR      848
CO      706
IT      671
PG      593
VE      592
ZA      489
CL      474
ID      470
ES      416
CN      404
KE      372
IN      341
CD      285
PH      282
PL      278
CZ      269
JP      234
NO      228
SE      224
NZ      212
      ...  
MF        2
RE        2
SM        2
MC        2
PM        2
DM        2
AD        2
SH        2
BN        2
LC        2
KN        2
VA        1
GI        1
CW        1
YT        1
IO        1
NR        1
MQ        1
CC        1
AI        1
MO        1
CX        1
LI        1
GM        1
BL        1
NU        1
SX        1
AW        1
NF        1
JE        1
Name: iso_country, Length: 243, dtype: int64

In [66]:
df_airports['iso_country'].isna().sum()

247

In [67]:
iso_country_null = df_airports['iso_country'].isna()

In [68]:
df_airports[iso_country_null].continent.value_counts()

AF    247
Name: continent, dtype: int64

In [69]:
df_airports.type.value_counts()

small_airport     33965
heliport          11287
medium_airport     4550
closed             3606
seaplane_base      1016
large_airport       627
balloonport          24
Name: type, dtype: int64

In [70]:
values = ['closed', 'heliport', 'seaplane_base', 'balloonport']
excluded = df_airports['type'].str.strip().isin(values)
df_airports = df_airports[excluded == False].copy()

In [71]:
df_airports.isnull().sum()

ident               0
type                0
name                0
elevation_ft     4080
continent       17637
iso_country       243
iso_region          0
municipality     4453
gps_code         8293
iata_code       30443
local_code      19503
coordinates         0
dtype: int64

In [72]:
df_airports[df_airports['municipality'].isna()].head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
7653,6XA4,small_airport,Zadow Airstrip,,,US,US-TX,,6XA4,,,"-95.954353809, 29.991738550900003"
7887,74xa,small_airport,Gun Barrel City Airpark,385.0,,US,US-TX,,74XA,,,"-96.1456650496, 32.3551499558"
8082,79ID,small_airport,Kooskia (Clear Creek Int) Airport,1800.0,,US,US-ID,,79ID,,,"-115.869691372, 46.0488642914"
8114,79WT,small_airport,Ellensburg (Rotor Ranch) Airport,1962.0,,US,US-WA,,79WT,,,"-120.589778423, 47.091426059499994"
9055,8FA4,small_airport,Samsula / Coe Field,40.0,,US,US-FL,,8FA4,,,"-81.1328315735, 29.0102045831"


In [73]:
df_airports[df_airports.municipality.isna()].head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
7653,6XA4,small_airport,Zadow Airstrip,,,US,US-TX,,6XA4,,,"-95.954353809, 29.991738550900003"
7887,74xa,small_airport,Gun Barrel City Airpark,385.0,,US,US-TX,,74XA,,,"-96.1456650496, 32.3551499558"
8082,79ID,small_airport,Kooskia (Clear Creek Int) Airport,1800.0,,US,US-ID,,79ID,,,"-115.869691372, 46.0488642914"
8114,79WT,small_airport,Ellensburg (Rotor Ranch) Airport,1962.0,,US,US-WA,,79WT,,,"-120.589778423, 47.091426059499994"
9055,8FA4,small_airport,Samsula / Coe Field,40.0,,US,US-FL,,8FA4,,,"-81.1328315735, 29.0102045831"


In [74]:
check_na = df_airports['municipality'].isna()
df_airports = df_airports[check_na == False].copy()

In [75]:
df_airports['municipality'] = df_airports['municipality'].str.upper()

In [76]:
df_airports.iso_region.value_counts()

US-TX     1546
BR-MT      615
US-AK      586
US-IL      579
US-CA      551
US-FL      522
BR-MS      507
US-OH      492
US-IN      486
US-PA      486
US-WI      457
US-MO      411
US-NY      402
US-WA      379
US-MI      379
US-OK      372
US-KS      372
US-GA      365
US-MN      361
US-OR      357
US-NC      349
BR-SP      332
GB-ENG     331
US-VA      311
US-ND      297
US-AR      291
US-CO      288
US-LA      281
CA-ON      280
US-NE      259
          ... 
MA-ESI       1
TD-OD        1
MW-CT        1
GR-13        1
GR-92        1
JE-U-A       1
SI-142       1
LV-LM        1
CY-03        1
SV-MO        1
DZ-47        1
VN-26        1
TR-60        1
SI-126       1
OM-BU        1
JP-31        1
GR-94        1
PH-APA       1
NL-UT        1
MZ-MPM       1
AR-C         1
GR-56        1
SD-24        1
MM-14        1
PH-WSA       1
SD-08        1
CI-14        1
MN-067       1
IR-11        1
BT-31        1
Name: iso_region, Length: 2561, dtype: int64

In [77]:
df_airports['len'] = df_airports.iso_region.apply(len)

In [78]:
check_len = df_airports['len']==5
df_airports = df_airports[check_len].copy()

In [79]:
df_airports['state'] = df_airports.iso_region.str.strip().str.split("-", n = 1, expand = True)[1]

In [80]:
df_airports.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,len,state
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,LEOTI,00AA,,00AA,"-101.473911, 38.704022",5,KS
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,ANCHOR POINT,00AK,,00AK,"-151.695999146, 59.94919968",5,AK
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,HARVEST,00AL,,00AL,"-86.77030181884766, 34.86479949951172",5,AL
5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,ALEX,00AS,,00AS,"-97.8180194, 34.9428028",5,OK
6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,CORDES,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484",5,AZ


In [81]:
df_airports.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 27327 entries, 1 to 55074
Data columns (total 14 columns):
ident           27327 non-null object
type            27327 non-null object
name            27327 non-null object
elevation_ft    26115 non-null float64
continent       11099 non-null object
iso_country     27184 non-null object
iso_region      27327 non-null object
municipality    27327 non-null object
gps_code        24007 non-null object
iata_code       5661 non-null object
local_code      16708 non-null object
coordinates     27327 non-null object
len             27327 non-null int64
state           27327 non-null object
dtypes: float64(1), int64(1), object(12)
memory usage: 3.1+ MB


df_temperature

In [82]:
df_temperature.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [83]:
df_temperature.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [84]:
# Keep only data have a values 'United States'
us_country = df_temperature['Country']=='United States'
df_temperature = df_temperature[us_country]

In [85]:
# Convert date to datetime in columns dt
df_temperature['Date'] = pd.to_datetime(df_temperature['dt'])

In [86]:
df_temperature.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,Date
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W,1820-01-01
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W,1820-02-01
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W,1820-03-01
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W,1820-04-01
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W,1820-05-01


In [87]:
# Remove all dates in columns date at value prior to 1950
date = df_temperature['Date']>"1950-01-01"
df_temperature=df_temperature[date].copy()

In [88]:
df_temperature['Date'].max()

Timestamp('2013-09-01 00:00:00')

In [89]:
df_temperature['Date'].min()

Timestamp('1950-02-01 00:00:00')

In [90]:
# Let's check for null values.
df_temperature.isnull().sum()

dt                               0
AverageTemperature               1
AverageTemperatureUncertainty    1
City                             0
Country                          0
Latitude                         0
Longitude                        0
Date                             0
dtype: int64

In [91]:
df_temperature[df_temperature.AverageTemperature.isnull()]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,Date
287781,2013-09-01,,,Anchorage,United States,61.88N,151.13W,2013-09-01


In [92]:
df_temperature[['City','Date']].drop_duplicates()

Unnamed: 0,City,Date
49116,Abilene,1950-02-01
49117,Abilene,1950-03-01
49118,Abilene,1950-04-01
49119,Abilene,1950-05-01
49120,Abilene,1950-06-01
49121,Abilene,1950-07-01
49122,Abilene,1950-08-01
49123,Abilene,1950-09-01
49124,Abilene,1950-10-01
49125,Abilene,1950-11-01


In [93]:
columns = ['City','Date']
dupl = df_temperature[columns].duplicated()
df_temperature[dupl].head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,Date
405836,1950-02-01,1.655,0.057,Arlington,United States,39.38N,76.99W,1950-02-01
405837,1950-03-01,3.871,0.232,Arlington,United States,39.38N,76.99W,1950-03-01
405838,1950-04-01,9.678,0.191,Arlington,United States,39.38N,76.99W,1950-04-01
405839,1950-05-01,16.786,0.234,Arlington,United States,39.38N,76.99W,1950-05-01
405840,1950-06-01,21.548,0.222,Arlington,United States,39.38N,76.99W,1950-06-01


In [94]:
arl = df_temperature['City'] == 'Arlington'
temp = df_temperature['Date'] == '1950-02-01'
df_temperature[arl & temp]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,Date
402597,1950-02-01,11.144,0.199,Arlington,United States,32.95N,96.70W,1950-02-01
405836,1950-02-01,1.655,0.057,Arlington,United States,39.38N,76.99W,1950-02-01


In [95]:
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,Date
49116,1950-02-01,10.067,0.332,Abilene,United States,32.95N,100.53W,1950-02-01
49117,1950-03-01,11.824,0.343,Abilene,United States,32.95N,100.53W,1950-03-01
49118,1950-04-01,16.963,0.315,Abilene,United States,32.95N,100.53W,1950-04-01
49119,1950-05-01,21.444,0.25,Abilene,United States,32.95N,100.53W,1950-05-01
49120,1950-06-01,25.832,0.202,Abilene,United States,32.95N,100.53W,1950-06-01


In [96]:
df_temperature.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 196348 entries, 49116 to 8439246
Data columns (total 8 columns):
dt                               196348 non-null object
AverageTemperature               196347 non-null float64
AverageTemperatureUncertainty    196347 non-null float64
City                             196348 non-null object
Country                          196348 non-null object
Latitude                         196348 non-null object
Longitude                        196348 non-null object
Date                             196348 non-null datetime64[ns]
dtypes: datetime64[ns](1), float64(2), object(5)
memory usage: 13.5+ MB


## Step 3: Define the Data Model
### 3.1 Conceptual Data Model
**fact_immigration** have columns:
* cicid
* citizenship_country
* residence_country
* city
* state
* arrival_date
* departure_date
* age
* visa_type
* detailed_visa_type

**dim_time** have columns:
* date
* year
* month
* day
* week
* weekday
* dayofyear

**dim_airports** have columns:
* ident
* type
* name
* elevation_ft
* state
* municipality
* iata_code

**dim_city_demographics** have columns:
* City
* state
* median_age
* male_population
* female_population
* total population
* Foreign_born
* Average_Household_Size
* Race
* Count

**dim_temperatures** have columns:
* date
* City
* average temperature
* average temperature uncertainty 

### 3.2 Mapping Out Data Pipelines
#### Data Extract:
* Load all datasets from CSV and SAS data files;
* Check all data

#### Data Transform:

##### fact_immigration:
* Drop rows when the method of arrival is not air travel
* Drop rows with incorrect gender data
* Conversion of arrival and departure dates;
* replace the country code with its equivalent character string
* replace visa_type with a string of characters
* replace port of entry with city and state
* filter out any goods whose port of entry is not in the US
* calculate age in a new row using our current date of birth and year.

##### dim_temperature:
* Remove all data for cities outside the US;
* Drop all data for dates prior to 1950;
* Convert city to uppercase
* Calculate average temperature and uncertainty by date+city partition
* ...


##### dim_time:
* Get all dates coming from immigration data_set;
* Extract the year, month, day, and week from the date and insert all values into the dim_time table;

##### dim_airports:
* Remove all non-our airports
* Remove all invalid input ports e.g. ['closed', 'heliport', 'seaplane_base', 'balloonport']
* Delete all rows where cities are missing.
* Convert urban to uppercase

##### dim_city_demographics:
* Convert city names to uppercase

### Data Loading:
* Writing to parquet

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Extracting data

In [97]:
df_immigration = spark.read.parquet("sas_data")
df_countries = pd.read_csv('data/countries.csv')
df_i94portcodes = pd.read_csv('data/i94portCodes.csv')
df_demographics = pd.read_csv('data/us-cities-demographics.csv', sep=';')
df_temperature = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
df_airports = pd.read_csv('data/airport-codes_csv.csv')

Tranforming data

In [98]:
df_i94portcodes = df_i94portcodes[df_i94portcodes['state'].isna() == False].copy()

In [99]:
# We need to exclude values for airports outside of the US. 
nonUSstates = ['CANADA', 'Canada','BRAZIL', 'URUGUAY', 'IRELAND', 'GABON', 'BAHAMAS', 'MX', 'CAYMAN ISLAND', 'SEOUL KOREA', 'JAPAN', 'ROMANIA', 'INDONESIA',
               'NETHERLANDS', 'NETH ANTILLES', 'THAILAND', 'ETHIOPIA', 'PRC', 'BERMUDA', 'COLOMBIA', 'ARGENTINA', 'MEXICO', 
               'SOUTH AFRICA', 'ENGLAND', 'KENYA', 'TURK & CAIMAN', 'PANAMA', 'NEW GUINEA', 'ECUADOR', 'ITALY', 'EL SALVADOR']

In [100]:
df_i94portcodes = df_i94portcodes[df_i94portcodes['state'].isin(nonUSstates) == False].copy()

In [101]:
# Keep only data for the United States
us_country = df_temperature['Country']=='United States'
df_temperature = df_temperature[us_country].copy()

# Convert the date to datetime objects
df_temperature['Date'] = pd.to_datetime(df_temperature['dt'])

# Remove all dates prior to 1950
date = df_temperature['Date']>"1950-01-01"
df_temperature=df_temperature[date].copy()

In [102]:
# convert the city names to upper case
df_temperature['City'] = df_temperature['City'].str.strip().str.upper()

In [103]:
df_temperature.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,Date
49116,1950-02-01,10.067,0.332,ABILENE,United States,32.95N,100.53W,1950-02-01
49117,1950-03-01,11.824,0.343,ABILENE,United States,32.95N,100.53W,1950-03-01
49118,1950-04-01,16.963,0.315,ABILENE,United States,32.95N,100.53W,1950-04-01
49119,1950-05-01,21.444,0.25,ABILENE,United States,32.95N,100.53W,1950-05-01
49120,1950-06-01,25.832,0.202,ABILENE,United States,32.95N,100.53W,1950-06-01


In [104]:
df_demographics['City'] = df_demographics['City'].str.strip().str.upper()
df_demographics['State Code'] = df_demographics['State Code'].str.strip().str.upper()
df_demographics['Race'] = df_demographics['Race'].str.strip().str.upper()

In [105]:
df_immigration.createOrReplaceTempView("immig_table")
spark_df_countries = spark.createDataFrame(df_countries)
spark_df_countries .createOrReplaceTempView("countryCodes")
spark_df_i94portcodes = spark.createDataFrame(df_i94portcodes)
spark_df_i94portcodes .createOrReplaceTempView("i94portcodes")
spark_df_temperature = spark.createDataFrame(df_temperature)
spark_df_temperature .createOrReplaceTempView("temperature")
spark_df_airports = spark.read.format("csv").option("header", "true").load('data/airport-codes_csv.csv')
spark_df_airports.createOrReplaceTempView("airports")
spark_df_demographics = spark.createDataFrame(df_demographics)
spark_df_demographics.createOrReplaceTempView("demographics")

df_immigration

In [106]:
# Drop all data weren't via air travel
spark.sql(sql_dim_fact.immig_i94mode).createOrReplaceTempView("immig_table")

In [107]:
# Select data in gender columns is F and M
spark.sql(sql_immigration.select_gender).createOrReplaceTempView("immig_table")

In [108]:
# Convert data in the arrival dates columns to date
spark.sql(sql_immigration.compute_arrival_date).createOrReplaceTempView("immig_table")

In [109]:
# Convert data in the departure dates columns to date
spark.sql(sql_immigration.replace_data_depdate).createOrReplaceTempView("immig_table")

In [110]:
# citizenship_country
spark.sql(sql_dim_fact.immig_city_code).createOrReplaceTempView("immig_table")

In [111]:
# residence_country
spark.sql(sql_dim_fact.immig_res_code).createOrReplaceTempView("immig_table")

In [112]:
# visa_type
spark.sql(sql_immigration.replace_data_visa).createOrReplaceTempView("immig_table")

In [113]:
# entry_port and entry_port_state
spark.sql(sql_dim_fact.immig_port_code).createOrReplaceTempView("immig_table")

In [114]:
# age
spark.sql(sql_dim_fact.immig_age).createOrReplaceTempView("immig_table")

In [115]:
spark.sql(sql_dim_fact.airport_US).createOrReplaceTempView("airports")

In [116]:
spark.sql(sql_dim_fact.airport_len).createOrReplaceTempView("airports")

fact_immigration

In [117]:
spark.sql(sql_dim_fact.immig_fact).printSchema()

root
 |-- cicid: double (nullable = true)
 |-- citizenship_country: string (nullable = true)
 |-- residence_country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: string (nullable = true)
 |-- age: double (nullable = true)
 |-- visa_type: string (nullable = false)
 |-- detailed_visa_type: string (nullable = true)



In [118]:
# Insert the immigration fact data into a spark dataframe
fact_immigration = spark.sql(sql_dim_fact.immig_fact)

dim_time

In [119]:
# extract all distinct dates from arrival and departure dates
spark.sql(sql_dim_fact.time_dim).createOrReplaceTempView("dim_time_table")

In [120]:
# extract year, month, day, weekofyear, dayofweek and weekofyear
dim_time = spark.sql(sql_dim_fact.time_extract_time_dim)

In [121]:
# average_temperature and average_termperature_uncertainty
dim_temperature = spark.sql(sql_dim_fact.temp_dim)

In [122]:
# insert data into the demographics dim table
dim_demographics = spark.sql(sql_dim_fact.demographic_dim)

In [123]:
# insert data into the airports dim table
dim_airports = spark.sql(sql_dim_fact.airport_dim)

In [124]:
dim_demographics.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|median_age|male_population|female_population|total_population|foreign_born|average_household_size|state_code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------+----------------------+----------+--------------------+-----+
|   SILVER SPRING|     Maryland|      33.8|        40601.0|          41862.0|           82463|     30908.0|                   2.6|        MD|  HISPANIC OR LATINO|25924|
|          QUINCY|Massachusetts|      41.0|        44129.0|          49500.0|           93629|     32935.0|                  2.39|        MA|               WHITE|58723|
|          HOOVER|      Alabama|      38.5|        38040.0|          46799.0|           84839|      8229.0|                  2.58|        AL|              

In [125]:
dim_time.show(5)

+----------+----+-----+---+----+-------+--------+
|      date|year|month|day|week|weekday|year_day|
+----------+----+-----+---+----+-------+--------+
|2012-04-12|2012|    4| 12|  15|      5|     103|
|2015-05-18|2015|    5| 18|  21|      2|     138|
|2015-10-22|2015|   10| 22|  43|      5|     295|
|2016-01-26|2016|    1| 26|   4|      3|      26|
|2016-01-31|2016|    1| 31|   4|      1|      31|
+----------+----+-----+---+----+-------+--------+
only showing top 5 rows



In [126]:
dim_airports.show(5)

+-----+-------------+--------------------+------------+-----+------------+---------+
|ident|         type|                name|elevation_ft|state|municipality|iata_code|
+-----+-------------+--------------------+------------+-----+------------+---------+
| 00AA|small_airport|Aero B Ranch Airport|        3435|   KS|       LEOTI|     null|
| 00AK|small_airport|        Lowell Field|         450|   AK|ANCHOR POINT|     null|
| 00AL|small_airport|        Epps Airpark|         820|   AL|     HARVEST|     null|
| 00AS|small_airport|      Fulton Airport|        1100|   OK|        ALEX|     null|
| 00AZ|small_airport|      Cordes Airport|        3810|   AZ|      CORDES|     null|
+-----+-------------+--------------------+------------+-----+------------+---------+
only showing top 5 rows



In [127]:
dim_temperature.show(5)

+-------------------+----------+-------------------+--------------------------------+
|               date|      city|average_temperature|average_termperature_uncertainty|
+-------------------+----------+-------------------+--------------------------------+
|1950-03-01 00:00:00|  MESQUITE|             12.498|                            0.27|
|1950-04-01 00:00:00|   MODESTO|             11.735|                           0.435|
|1950-05-01 00:00:00| CLEVELAND| 15.440999999999999|                           0.278|
|1950-06-01 00:00:00|PROVIDENCE|             18.451|                           0.246|
|1950-07-01 00:00:00|   FONTANA|              27.94|                           0.348|
+-------------------+----------+-------------------+--------------------------------+
only showing top 5 rows



In [128]:
fact_immigration.show(5)

+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|    cicid|citizenship_country|residence_country|  city|state|arrival_date|departure_date| age|visa_type|detailed_visa_type|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|4041803.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|49.0| Business|                B1|
|4041804.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|          null|38.0| Business|                B1|
|4041805.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|45.0| Business|                B1|
|4041806.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|25.0| Business|                B1|
| 452706.0|             NORWAY|           NORWAY|BANGOR|   ME|  2016-04-03|    2016-04-05|38.0| Business|                B1|


In [129]:
# Saving the data in parquet format
dim_demographics.write.parquet("output_data/dim_demographics")
dim_time.write.parquet("output_data/dim_time")
dim_airports.write.parquet("output_data/dim_airports")
dim_temperature.write.parquet("output_data/dim_temperature")
fact_immigration.write.parquet("output_data/fact_immigration")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [130]:
dim_demographics.createOrReplaceTempView("dim_demographics")
dim_time.createOrReplaceTempView("dim_time")
dim_airports.createOrReplaceTempView("dim_airports")
dim_temperature.createOrReplaceTempView("dim_temperature")
fact_immigration.createOrReplaceTempView("fact_immigration")

In [131]:
# dictionary of tables and columns to be checked
tables_to_check = { 'fact_immigration' : ['cicid'],
                    'dim_time':['date'],
                    'dim_demographics': ['City','state_code'],
                    'dim_airports':['ident'], 
                    'dim_temperature':['date','City']}

Check(spark, tables_to_check)

Data quality check on table fact_immigration starting...
Table fact_immigration passed.
Data quality check on table dim_time starting...
Table dim_time passed.
Data quality check on table dim_demographics starting...
Table dim_demographics passed.
Data quality check on table dim_airports starting...
Table dim_airports passed.
Data quality check on table dim_temperature starting...
Table dim_temperature passed.


In [132]:
fact_immigration.show(2)

+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|    cicid|citizenship_country|residence_country|  city|state|arrival_date|departure_date| age|visa_type|detailed_visa_type|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|4041803.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|49.0| Business|                B1|
|4041804.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|          null|38.0| Business|                B1|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
only showing top 2 rows



In [133]:
dim_airports.show(2)

+-----+-------------+--------------------+------------+-----+------------+---------+
|ident|         type|                name|elevation_ft|state|municipality|iata_code|
+-----+-------------+--------------------+------------+-----+------------+---------+
| 00AA|small_airport|Aero B Ranch Airport|        3435|   KS|       LEOTI|     null|
| 00AK|small_airport|        Lowell Field|         450|   AK|ANCHOR POINT|     null|
+-----+-------------+--------------------+------------+-----+------------+---------+
only showing top 2 rows



In [134]:
dim_demographics.show(2)

+-------------+-------------+----------+---------------+-----------------+----------------+------------+----------------------+----------+------------------+-----+
|         City|        State|median_age|male_population|female_population|total_population|foreign_born|average_household_size|state_code|              Race|Count|
+-------------+-------------+----------+---------------+-----------------+----------------+------------+----------------------+----------+------------------+-----+
|SILVER SPRING|     Maryland|      33.8|        40601.0|          41862.0|           82463|     30908.0|                   2.6|        MD|HISPANIC OR LATINO|25924|
|       QUINCY|Massachusetts|      41.0|        44129.0|          49500.0|           93629|     32935.0|                  2.39|        MA|             WHITE|58723|
+-------------+-------------+----------+---------------+-----------------+----------------+------------+----------------------+----------+------------------+-----+
only showing top

In [135]:
# Check the number of rows in time table : 192 expected
spark.sql(sql_data_quality.check_time_dim).show()

+-----+
|count|
+-----+
|  192|
+-----+



In [136]:
# Make sure each row has a distinct date key : 192 expected
spark.sql(sql_data_quality.check_time_dim_distinct_date).show()

+-----+
|count|
+-----+
|  192|
+-----+



In [137]:
# Make sure all dates from the fact table are included in the time table (NULL is the expected result)
spark.sql(sql_data_quality.check_time_dim_all_data).show()

+----+
|date|
+----+
+----+



In [138]:
# Should match the primary key count from the fact table (2165257 expected)
spark.sql(sql_data_quality.check_immig_fact_distinct_cicid).show()

+-------+
|  count|
+-------+
|2165257|
+-------+



In [139]:
# And should match the row count from the fact table since it is also the primary key (2165257 expected)
spark.sql(sql_data_quality.check_immig_fact).show()

+-------+
|  count|
+-------+
|2069908|
+-------+



In [140]:
# Check the demographics table: 2891 expected
spark.sql(sql_data_quality.check_demographics_dim).show()

+-----+
|count|
+-----+
| 2891|
+-----+



In [141]:
spark.sql(sql_data_quality.check_demographics_dim_distinct).show()

+-----+
|count|
+-----+
| 2891|
+-----+



In [142]:
spark.sql(sql_data_quality.check_airports_dim).show()

+-----+
|count|
+-----+
|14529|
+-----+



In [143]:
# Check the primary key for airports table: expected 14529
spark.sql(sql_data_quality.check_airports_dim_distinct_ident).show()

+-----+
|count|
+-----+
|14529|
+-----+



In [144]:
# Check primary key for the temperature table: 189472 expected
spark.sql(sql_data_quality.check_temperature_dim).show()

+------+
| count|
+------+
|189472|
+------+



In [145]:
spark.sql(sql_data_quality.check_temperature_dim_distinct).show()

+------+
| count|
+------+
|189472|
+------+



In [146]:
# Distinct combinations of city and state in our fact table
spark.sql(sql_data_quality.check_immig_fact_distinct).show()

+-----+
|count|
+-----+
|  151|
+-----+



In [147]:
# And the combinations of city and state that are common to both
spark.sql(sql_data_quality.check_immig_fact_all_data).show()

+-----+
|count|
+-----+
|  102|
+-----+



In [148]:
# And the combinations of city and state that are common to both the fact table and the demographics table
spark.sql(sql_data_quality.check_immig_fact_all_data2).show()

+-----+
|count|
+-----+
|   69|
+-----+



In [149]:
# A count to see how many rows we would keep using this strategy
spark.sql(sql_data_quality.check_immig_fact_all_data3).show()

+-------+
|  count|
+-------+
|1983869|
+-------+



#### 4.3 Data dictionary 
##### dim_demographics
Primary key  = City, state_code, Race
 * City (String): Name of the city
 * State (String): Name of the state
 * median_age (double): Median age for the city
 * male_population (double): Size of the male population
 * female_population (double): Size of the female population
 * total_population (long) : Total population
 * foreign_born (double): Number of foreign born residents
 * average_household_size (double): Average size of a household
 * state_code (String): Two letter state code
 * Race (String): Racial category selected by respondants. Possible values are:
- Hispanic or Latino
- White
- Asian
- Black or African-American
- American Indian and Alaska Native
 * Count (long): Number of respondants (ie Size) for the combination City, State, Race

##### dim_time
Primary key: date
 * date (string): date in the format yyyy-mm-dd
 * year (integer): year for the given date, available for aggregation
 * month (integer): month for the given date, available for aggregation
 * day (integer): day for the given date, available for aggregation
 * week (integer): week for the given date, available for aggregation (values between 0 and 52)
 * weekday (integer): weekday for the given date, available for aggregation (values between 0 and 6)
 * year_day (integer): year for the given date, available for aggregation

##### dim_airports
Primary key: ident
 * ident (string): Airport identified
 * type (string): Type of airport. Possible values are:
- large_airport
- medium_airport
- small_airport
 * name (string): Name of the airport 
 * elevation_ft (string):  Elevation of the airport
 * state (string): State where the airport is located
 * municipality (string): Name of the city closest to the airport
 * iata_code (string): iata_code that appears to airplane tickets and baggages

##### dim_temperature
Primary key: date, city
 * date (timestamp): date when the temperature was registered
 * city (string): city where the termperature was registered
 * average_temperature (double): average temperature recorded for the day in the specified city
 * average_termperature_uncertainty (double): average uncertainty recored 

##### fact_immigration
Primary key: cicid
 * cicid (double): Unique identifier for each traveller
 * citizenship_country (string): Traveller's country of citizenship
 * residence_country (string): Traveller's country of residence
 * city (string): City where the entry port of the traveller is located
 * state (string): State where the entry port of the traveller is located
 * arrival_date (date): Traveller's arrival date
 * departure_date (string): Traveller's departure date, if known
 * age (double):  Traveller's age
 * visa_type (string): aggregate visa type. Possible values are:
- Business,
- Pleasure,
- Student,
 * detailed_visa_type (string): Detailed visa types. Numerous values are available. Not all could be identified:
- B1: B1 visa is for business visits valid for up to a year
- B2: B2 visa is for pleasure visits valid for up to a year
- CP: could not find a definition
- E2: E2 investor visas allows foreign investors to enter and work inside of the United States based on a substantial investment
- F1: F1 visas are used by non-immigrant students for Academic and Language training Courses. 
- F2: F2 visas are used by the dependents of F1 visa holders
- GMT: could not find a definition
- M1: for students enrolled in non-academic or “vocational study”. Mechanical, language, cooking classes, etc...
- WB: Waiver Program (WT/WB Status) travel to the United States for tourism or business for stays of 90 days or less without obtaining a visa.
- WT: Waiver Program (WT/WB Status) travel to the United States for tourism or business for stays of 90 days or less without obtaining a visa.
 

#### Step 5: Complete Project Write Up
Because of the rather large size of the immigration dataset (~3 million rows) in just one month, combined with the temperature, airport and demographic data sets, the most reasonable technology chosen would be spark, it will have an advantage if the data processing takes place over a longer period of time.

In this project, I am looking at US immigration data. Specifically, I am interested in the following relationships:
1. Correlation between temperature and number of tourists
2. Seasonality of tourism
3. Correlation between the number of tourists and the number of foreign visitors
4. Correlations between tourists and demographics in different cities

Of these requests, none required a rapid update of our data within a short period of time. A monthly or quarterly update should be sufficient for the needs of this study.

 ##### *Situations requiring replacement*:
How would our approach change if the problem had the following requirements:
* Data is increased 100x: Our data will be stored in an Amazon S3 bucket. We will still use spark as our data processing platform as it is the most suitable platform for very large data sets.
* Data that populates the dashboard must be updated daily by 7am daily: We will use Apache Airflow to do data quality validation and ETL.
* Database needs to be accessed by more than 100 people: Once data is ready to be used, it will be stored in postgres database on redshift cluster which easily supports multi-user access.