# Data Engineering Capstone Project

#### Project Summary
- The objective of this project is to work with four different datasets, Immigration data to United States, and supplementary datasets will include data on airport codes, U.S. city demographics, and temperature data.
- We will also explore and assess the data, build a data model, perform ETL process and doing some analytics, exploration and try to find trends and patterns in our data. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

# Import Libraries

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, substring
from pyspark.sql.types import *

### Step 1: Scope the Project and Gather Data

#### Scope 
- Gathering data from different sources with different formatsn and load it into staging dataframes
- Explore the data and perform cleaning process 
- Write data to parquet files in AWS S3 
- Define data model consists of fact and dimension tables to construct star schema 
- Perform ETL process and load the data into our data model
- Perform analysis and discover trends and patterns in our data


#### Datasets

- [**I94 Immigration Data:**](https://www.trade.gov/national-travel-and-tourism-office)  This data comes from the US National Tourism and Trade Office.There's a sample file where we can take a look at the data in csv format before reading it all in. 

- [**World Temperature Data:**](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data) This dataset came from Kaggle, it includes the temperatures of various cities in the world from 1743 to 2013.

- [**U.S. City Demographic Data:**](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)  This data comes from OpenSoft, it contains information about the demographics of all US cities and census-designated places with population greater or equal to 65,000, it comes from the US Census Bureau's 2015 American Community Survey.

- [**Airport Code Table:**](https://datahub.io/core/airport-codes#data) This is a simple table of airport codes and corresponding cities.

### I94 Immigration Data (sample csv file)

In [2]:
# Read immigration sample file in csv format
imm_sample_df  = pd.read_csv('immigration_data_sample.csv')
imm_sample_df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


#### Data Dictionary

|Column|Description|
|------|------|
|cicid     |unique ID     |
|i94yr     |Year     |
|i94mon     |Month    |
|i94cit     |3 digit code for immigrant country of birth     |
|i94res     |3 digit code for immigrant country of residence     |
|i94port     |Admission port|
|arrdate     |Arrival data in USA|
|i94mode     |Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)|
|i94addr     |State of arrival in USA     |
|depdate     |Departure date of USA     |
|i94bir     |Age of Respondent in Years     |
|i94visa     |Visa codes collapsed into three categories|
|count     |Used for summary statistics     |
|dtadfile     |Character Date Field - Date added to I-94 Files|
|visapost     |Department of State where where Visa was issued|
|occup     |Occupation that will be performed in USA|
|entdepa     |Arrival Flag - admitted or paroled into the USA|
|entdepd     |Departure Flag - Departed, lost I-94 or is deceased|
|endtdepu     |Update Flag - Either apprehended, overstayed, adjusted to perm residence     |
|matflag     |Match flag - Match of arrival and departure records|
|biryear     |4 digit year of birth|
|dtaddto     |Character Date Field - Date to which admitted to USA|
|gender     |Non-immigrant sex|
|insnum     |INS number   |
|airline     |Airline used to arrive in USA   |
|admnum     |Admission Number     |
|fltno     |Flight number of Airline used to arrive in USA   |
|visatype     |Class of admission legally admitting the non-immigrant to temporarily stay in USA   |

### World Temperature Data

In [3]:
# Read Temperature data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = pd.read_csv(fname)

In [4]:
temperature_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


#### Data Dictionary

|Column|Description|
|------|------|
|dt     |Date     |
|AverageTemperature     |Average Temperature in Celsius     |
|AverageTemperatureUncertainty     |95% confidence interval around the average temperature   |
|City     |city |
|Country     |country|
|Latitude     |city latitude|
|Longitude     |city longitude|



### U.S. City Demographic Data

In [5]:
# Read united states cities demograhpic data
city_demographic_df  = pd.read_csv('us-cities-demographics.csv', sep = ';')
city_demographic_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### Data Dictionary

|Column|Description|
|------|------|
|City     |City name     |
|State     | state of city      |
|Median Age     |The median population age |
|Male Population     |total male population |
|Female Population     |total female population|
|Total Population     |total population|
|Number of Veterans     |Number of veterans still living in the city|
|Foreign-born    |Number of residents who were not born in the city |
|Average Household Size  |Average size of houses in the city|
|State Code    |State Code|
|Count     |Count of city's individual per race|


### Airport Code Table

In [6]:
# Read airport data
airport_df  = pd.read_csv('airport-codes_csv.csv')
airport_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


#### Data Dictionary

|Column|Description|
|------|------|
|ident     |unique identifier     |
|type     |Airport type     |
|name     |Airport Name  |
|elevation_ft     |Airport altitude |
|continent     |Continent|
|iso_country     |ISO Code of the airport's country|
|iso_region     |ISO Code of the airport's region|
|municipality     |municipality where the airport is located    |
|gps_code     |GPS code of the airport   |
|iata_code     |IATA code of the airport |
|local_code     |Local code of the airport|
|coordinates     |coordinates of the airport |


### Full immigration dataset using Spark

In [7]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

immigration_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [8]:
immigration_df.count()

3096313

In [9]:
immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [None]:
#write to parquet
#immigration_df.write.parquet("sas_data")
#immigration_df = spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### Immigration data

In [11]:
immigration_df.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [12]:
# check for null values
immigration_df.select([count(when(col(c).isNull(), c)).alias(c) for c in immigration_df.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

In [13]:
# creating a view of the immigration dataset
immigration_df.createOrReplaceTempView("immigration_table")

In [14]:
# check for cicid if it unique or not
spark.sql("SELECT COUNT (DISTINCT cicid) FROM immigration_table").show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+



since the **cicid** column has unique values and no duplicates, we can consider it as a primary key

arrdate column
-  We need to convert the arrdate column into readable one.
-  All dates in SAS correspond to the number of days since 1960-01-01.
- Therfore, we compute the arrival date by adding arrdate to 1960-01-01

In [15]:
immigration_df = spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immigration_table")
# creating a view of the immigration dataset
immigration_df.createOrReplaceTempView("immigration_table")
immigration_df.show(1)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|arrival_date|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|  2016-04-29|
+-----+------+------+------+------+-------+-------+-------+-----

Here, we can see the new column **arrival_date** which is 2016-04-29 after conversion which is corresponding to **arrdate** column which value 20573

Now, we want to make sure that departure_date is bigger than arrival_date

**depdate**

In [16]:
immigration_df = spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        FROM immigration_table""")

immigration_df.createOrReplaceTempView("immigration_table")


In [17]:
spark.sql("SELECT COUNT(*) FROM immigration_table WHERE departure_date <= arrival_date").show()

+--------+
|count(1)|
+--------+
|     375|
+--------+



 Since the number of affected rows is relatively small, we can easily drop these rows

In [18]:
immigration_df = spark.sql("""SELECT *
                            FROM immigration_table
                            WHERE departure_date >= arrival_date
""")

immigration_df.createOrReplaceTempView("immigration_table")
immigration_df.show(1)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+------------+--------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|      admnum|fltno|visatype|arrival_date|departure_date|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+------------+--------------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M|  null|     OS|6.66643185E8|   93|      B2|  2016-04-01|    2016-08-25|
+-----+-

In [19]:
#check for result 
spark.sql("SELECT COUNT(*) FROM immigration_table WHERE departure_date <= arrival_date").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



The **i94mode** column has four values as following
- 1 = "Air"
- 2 = "Sea"
- 3 = "Land"
- 9 = "Not reported"

In [20]:
spark.sql("SELECT i94mode, count(*) as count FROM immigration_table GROUP BY i94mode").show()

+-------+-------+
|i94mode|  count|
+-------+-------+
|   null|    238|
|    1.0|2871184|
|    3.0|  61572|
|    2.0|  17970|
|    9.0|   2517|
+-------+-------+



In [21]:
immigration_df = spark.sql("""SELECT *, CASE 
                        WHEN i94mode = 1.0 THEN 'Air' 
                        WHEN i94mode = 2.0 THEN 'Sea'
                        WHEN i94mode = 3.0 THEN 'Land'
                        WHEN i94mode = 9.0 THEN 'Not_reported'
                        ELSE 'Not_reported'  END AS arrival_mode   FROM immigration_table""")

immigration_df.createOrReplaceTempView("immigration_table")
immigration_df.show(1)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+------------+--------------+------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|      admnum|fltno|visatype|arrival_date|departure_date|arrival_mode|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+------------+--------------+------------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M|  null|     OS|6.66643185E8|   93|      B

The **I94VISA** column has three values as following
- 1 = "Business"
- 2 = "Pleasure"
- 3 = "Student"


In [22]:
spark.sql("SELECT i94visa, count(*) as count FROM immigration_table GROUP BY i94visa").show()

+-------+-------+
|i94visa|  count|
+-------+-------+
|    1.0| 508764|
|    3.0|  30305|
|    2.0|2414412|
+-------+-------+



In [23]:
immigration_df = spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa  = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'    END AS visa_type   FROM immigration_table""")

immigration_df.createOrReplaceTempView("immigration_table")
immigration_df.show(1)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+------------+--------------+------------+---------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|      admnum|fltno|visatype|arrival_date|departure_date|arrival_mode|visa_type|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+------------+--------------+------------+---------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M|  null|    

In [24]:
# gender column 
spark.sql("SELECT gender, count(*)  FROM immigration_table GROUP BY gender").show()

+------+--------+
|gender|count(1)|
+------+--------+
|     F| 1228646|
|  null|  407456|
|     M| 1316305|
|     U|     238|
|     X|     836|
+------+--------+



In [25]:
immigration_df = spark.sql("Select * From immigration_table where gender IN ('M', 'F')")
immigration_df.createOrReplaceTempView("immigration_table")
immigration_df.show(1)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+------------+--------------+------------+---------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|      admnum|fltno|visatype|arrival_date|departure_date|arrival_mode|visa_type|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+------------+--------------+------------+---------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M|  null|    

In [26]:
immigration_df = spark.sql("""Select cicid as id, i94yr as year, i94mon as month,
                            i94cit as immigrant_birth_country, i94res as immigrant_residence_country,
                            i94port as adm_port, arrival_date, departure_date,  arrival_mode, i94addr as arrival_state,
                            i94bir as age, visa_type, count, biryear as birth_year, gender, airline,
                            admnum as admission_num , fltno as flight_num
                            From immigration_table
""")
immigration_df.show(5)

+----+------+-----+-----------------------+---------------------------+--------+------------+--------------+------------+-------------+----+---------+-----+----------+------+-------+--------------+----------+
|  id|  year|month|immigrant_birth_country|immigrant_residence_country|adm_port|arrival_date|departure_date|arrival_mode|arrival_state| age|visa_type|count|birth_year|gender|airline| admission_num|flight_num|
+----+------+-----+-----------------------+---------------------------+--------+------------+--------------+------------+-------------+----+---------+-----+----------+------+-------+--------------+----------+
|15.0|2016.0|  4.0|                  101.0|                      101.0|     WAS|  2016-04-01|    2016-08-25|         Air|           MI|55.0| Pleasure|  1.0|    1961.0|     M|     OS|  6.66643185E8|        93|
|27.0|2016.0|  4.0|                  101.0|                      101.0|     BOS|  2016-04-01|    2016-04-05|         Air|           MA|58.0| Business|  1.0|    1958

### World Temperature Data

In [27]:
temperature_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [28]:
temperature_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [29]:
temperature_df.shape

(8599212, 7)

In [30]:
#check for nan values
temperature_df.isnull().sum() 

dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64

In [31]:
#check for any duplicates
temperature_df.duplicated().sum()

0

In [32]:
# Keep only data for the United States to make it useful with our immigration data
temperature_df = temperature_df[temperature_df['Country']=='United States']

As we focus on air travelling data, so we will include only data with date after 1950, after the finish of the second world war and the return of air travelling lines

In [33]:
# Convert the date to datetime objects
temperature_df['date'] = pd.to_datetime(temperature_df.dt)

# Remove all dates before 1950
temperature_df = temperature_df[temperature_df['date']>"1950-01-01"]
temperature_df = temperature_df.drop('dt', axis = 1)

In [34]:
temperature_df.head()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,date
49116,10.067,0.332,Abilene,United States,32.95N,100.53W,1950-02-01
49117,11.824,0.343,Abilene,United States,32.95N,100.53W,1950-03-01
49118,16.963,0.315,Abilene,United States,32.95N,100.53W,1950-04-01
49119,21.444,0.25,Abilene,United States,32.95N,100.53W,1950-05-01
49120,25.832,0.202,Abilene,United States,32.95N,100.53W,1950-06-01


In [35]:
#check for nan values
temperature_df.isnull().sum() 

AverageTemperature               1
AverageTemperatureUncertainty    1
City                             0
Country                          0
Latitude                         0
Longitude                        0
date                             0
dtype: int64

In [36]:
temperature_df[temperature_df.AverageTemperature.isnull()]

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,date
287781,,,Anchorage,United States,61.88N,151.13W,2013-09-01


In [37]:
# Drop row that has null values
temperature_df.dropna(subset=['AverageTemperature'], how='all', inplace=True)

In [38]:
#check for nan values
temperature_df.isnull().sum()

AverageTemperature               0
AverageTemperatureUncertainty    0
City                             0
Country                          0
Latitude                         0
Longitude                        0
date                             0
dtype: int64

### U.S. City Demographic Data

In [39]:
city_demographic_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [40]:
city_demographic_df.shape

(2891, 12)

In [41]:
city_demographic_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [42]:
# check for any duplicates
city_demographic_df.duplicated().sum()

0

In [43]:
# check for null values
city_demographic_df.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [44]:
# Drop null values from our dataframe
city_demographic_df.dropna(subset=['Average Household Size'], how='all', inplace=True)

In [45]:
# check for null values
city_demographic_df.isnull().sum()

City                      0
State                     0
Median Age                0
Male Population           0
Female Population         0
Total Population          0
Number of Veterans        0
Foreign-born              0
Average Household Size    0
State Code                0
Race                      0
Count                     0
dtype: int64

### Airport data


In [46]:
airport_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [47]:
airport_df.shape

(55075, 12)

In [48]:
# chekc for missing values
airport_df.isnull().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

We have 247 null entries in country column, so we will drop them 

In [49]:
# Drop null values from iso_counrty column
airport_df.dropna(subset=['iso_country'], how='all', inplace=True)

In [50]:
# chekc for missing values
airport_df.isnull().sum()

ident               0
type                0
name                0
elevation_ft     6990
continent       27719
iso_country         0
iso_region          0
municipality     5574
gps_code        13872
iata_code       45670
local_code      26142
coordinates         0
dtype: int64

In [51]:
#check for duplicates
airport_df.duplicated().sum()

0

In [52]:
airport_df.type.value_counts()

small_airport     33734
heliport          11287
medium_airport     4539
closed             3602
seaplane_base      1016
large_airport       626
balloonport          24
Name: type, dtype: int64

we will include only meaningful airports which will help us in our immigration data as 
- closed means that it is a closed airport
- ballonport, heliport and seaplaneport will not help us in our immigration data

In [53]:
airport_df.query(" type in ('small_airport','large_airport','medium_airport')", inplace = True)

In [54]:
airport_df.type.value_counts()

small_airport     33734
medium_airport     4539
large_airport       626
Name: type, dtype: int64

In [55]:
# check the countries where airports located
airport_df.groupby('iso_country')['iso_country'].count()

iso_country
AE       29
AF       59
AG        3
AI        1
AL        6
AM        9
AO       94
AQ       25
AR      691
AS        4
AT       55
AU     1730
AW        1
AZ       33
BA       13
BB        3
BD       15
BE       52
BF       51
BG      104
BH        3
BI        6
BJ       10
BL        1
BM        1
BN        2
BO      196
BQ        3
BR     3204
BS       60
      ...  
TJ       15
TL       11
TM       20
TN       14
TO        6
TR      114
TT        3
TV        1
TW       31
TZ      204
UA      135
UG       36
UM        3
US    14582
UY       52
UZ      173
VC        5
VE      505
VG        3
VI        2
VN       37
VU       32
WF        2
WS        4
XK        2
YE       25
YT        1
ZA      473
ZM      102
ZW      136
Name: iso_country, Length: 238, dtype: int64

**This dataset contains airport data for many countries. Our immigration dataset only contains entries into the US, thus we will reduce our data to include entries with iso_country column to be United states for the purpose of data modeling**

In [56]:
airport_df = airport_df[airport_df['iso_country'] == 'US']

In [57]:
airport_df.iso_country.value_counts()

US    14582
Name: iso_country, dtype: int64

In [58]:
# check the region(state) where airports located
airport_df.groupby('iso_region')['iso_region'].count()

iso_region
US-AK      589
US-AL      198
US-AR      292
US-AZ      215
US-CA      554
US-CO      289
US-CT       56
US-DC        2
US-DE       36
US-FL      523
US-GA      365
US-HI       36
US-IA      232
US-ID      240
US-IL      581
US-IN      487
US-KS      373
US-KY      165
US-LA      283
US-MA       79
US-MD      157
US-ME      122
US-MI      380
US-MN      362
US-MO      411
US-MS      212
US-MT      255
US-NC      349
US-ND      297
US-NE      259
US-NH       54
US-NJ      116
US-NM      150
US-NV      115
US-NY      404
US-OH      493
US-OK      372
US-OR      357
US-PA      486
US-RI       10
US-SC      173
US-SD      162
US-TN      228
US-TX     1556
US-U-A       7
US-UT      105
US-VA      311
US-VT       66
US-WA      382
US-WI      458
US-WV       83
US-WY       95
Name: iso_region, dtype: int64

It seems that U-A is an error, so we will drop it and extract state name from iso_region column

In [59]:
airport_df = airport_df[airport_df['iso_region'] != 'US-U-A']

# check for cleaned iso_region column 
airport_df.groupby('iso_region')['iso_region'].count()

iso_region
US-AK     589
US-AL     198
US-AR     292
US-AZ     215
US-CA     554
US-CO     289
US-CT      56
US-DC       2
US-DE      36
US-FL     523
US-GA     365
US-HI      36
US-IA     232
US-ID     240
US-IL     581
US-IN     487
US-KS     373
US-KY     165
US-LA     283
US-MA      79
US-MD     157
US-ME     122
US-MI     380
US-MN     362
US-MO     411
US-MS     212
US-MT     255
US-NC     349
US-ND     297
US-NE     259
US-NH      54
US-NJ     116
US-NM     150
US-NV     115
US-NY     404
US-OH     493
US-OK     372
US-OR     357
US-PA     486
US-RI      10
US-SC     173
US-SD     162
US-TN     228
US-TX    1556
US-UT     105
US-VA     311
US-VT      66
US-WA     382
US-WI     458
US-WV      83
US-WY      95
Name: iso_region, dtype: int64

In [60]:
airport_df['state'] = airport_df['iso_region'].str[3:]
airport_df.state

1        KS
2        AK
3        AL
5        OK
6        AZ
7        CA
8        CA
11       FL
13       FL
14       GA
17       ID
18       KS
20       IL
22       IL
23       KS
24       KY
27       LA
28       MD
30       MN
31       MO
33       NJ
34       NC
37       NY
38       OH
40       OK
43       PA
45       OR
46       SC
47       SD
50       TN
         ..
52740    MN
52741    MN
52742    ND
52743    MI
52744    IA
52745    WI
52746    MI
52747    MI
52748    MI
52749    MI
52750    MI
52751    ND
54550    AK
54551    AK
54552    AK
54557    AK
54559    AK
54560    AK
54562    AK
54563    AK
54564    AK
54565    AK
54570    AK
54571    AK
54573    AK
54574    AK
54575    MI
54576    AK
54577    AZ
54896    AK
Name: state, Length: 14575, dtype: object

In [61]:
airport_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,state
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022",KS
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968",AK
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172",AL
5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028",OK
6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484",AZ


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

### fact table
Immigration dataset is our main data, so we will construct our **fact_immigration** table from it and fact table fields will be as following

- id
- citizenship_country
- residence_country
- city
- state
- arrival_date
- departure_date
- age
- visa_type
- gender

### Dimensions tables

**airports_dim** 
we will include our scope of interest fields
- ident
- type
- name
- elevation_ft
- state
- municipality
- iata_code

**demographic_dim**
we will include our scope of interest fields

- City
- state
- median_age
- male_population
- female_population
- total population
- Foreign_born
- Average_Household_Size
- Race
- Count


**temperature_dim**
we will include our scope of interest fields

- date
- City
- average_temperature
- average_termperature_uncertainty



**time_dim**
we will include our scope of interest fields

- date
- year
- month
- day
- week
- weekday
- dayofyear

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Building the data pipelines to create the data model.

### Immigration data

#### Staging the data 

In [62]:
countries_codes_df = pd.read_csv('countries_codes.csv')
i94airports_codes_df = pd.read_csv('i94port_codes.csv')
countries_codes_df.head()

Unnamed: 0,code,country
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [63]:
# remove all entries with null values
i94airports_codes_df = i94airports_codes_df[~i94airports_codes_df.state.isna()].copy()
i94airports_codes_df.head()

Unnamed: 0,code,location,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


In [65]:
# let's convert the data dictionaries to views in our spark context to perform SQL operations with them
spark_df_country_codes = spark.createDataFrame(countries_codes_df)
spark_df_country_codes .createOrReplaceTempView("country_codes")



spark_df_i94airports_codes = spark.createDataFrame(i94airports_codes_df)
spark_df_i94airports_codes .createOrReplaceTempView("i94port_codes")

immigration_df.createOrReplaceTempView("immigration_table")

In [66]:
#country of citizenship
spark.sql("""
SELECT im.*, cc.country AS citizenship_country
FROM immigration_table im
JOIN country_codes cc
ON im.immigrant_birth_country = cc.code
""").createOrReplaceTempView("immigration_table")


#country of residence
spark.sql("""
SELECT im.*, cc.country AS residence_country
FROM immigration_table im
JOIN country_codes cc
ON im.immigrant_residence_country = cc.code
""").createOrReplaceTempView("immigration_table")

In [67]:
# Add entry_port names and entry port states to the view
spark.sql("""
SELECT im.*, pc.location AS entry_port, pc.state AS entry_port_state
FROM immigration_table im 
JOIN i94port_codes pc
ON im.adm_port = pc.code
""").createOrReplaceTempView("immigration_table")

In [68]:
# Insert the data into immigration fact table
immigration_fact = spark.sql("""
                        SELECT 
                            id, 
                            citizenship_country,
                            residence_country,
                            TRIM(UPPER (entry_port)) AS city,
                            TRIM(UPPER (entry_port_state)) AS state,
                            arrival_date,
                            departure_date,
                            CAST(age as int),
                            visa_type,
                            gender

                        FROM immigration_table
""")

In [69]:
immigration_fact.show(5)

+---------+-------------------+-----------------+------+-----+------------+--------------+---+---------+------+
|       id|citizenship_country|residence_country|  city|state|arrival_date|departure_date|age|visa_type|gender|
+---------+-------------------+-----------------+------+-----+------------+--------------+---+---------+------+
|4041803.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07| 49| Business|     F|
|4041805.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07| 45| Business|     M|
|4041806.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07| 25| Business|     M|
| 452706.0|             NORWAY|           NORWAY|BANGOR|   ME|  2016-04-03|    2016-04-05| 38| Business|     M|
|4670699.0|            DENMARK|          DENMARK|BANGOR|   ME|  2016-04-25|    2016-04-27| 43| Business|     M|
+---------+-------------------+-----------------+------+-----+------------+--------------+---+---------+

### Airports data

In [70]:
airport_df.dtypes

ident            object
type             object
name             object
elevation_ft    float64
continent        object
iso_country      object
iso_region       object
municipality     object
gps_code         object
iata_code        object
local_code       object
coordinates      object
state            object
dtype: object

In [71]:
schema = StructType([
    StructField("ident", StringType(), True),
    StructField("type", StringType(), True),
    StructField("name", StringType(), True),
    StructField("elevation_ft", FloatType(), True),
    StructField("continent", StringType(), True),
    StructField("iso_country", StringType(), True),
    StructField("iso_region", StringType(), True),
    StructField("municipality", StringType(), True),
    StructField("gps_code", StringType(), True), 
    StructField("iata_code", StringType(), True),
    StructField("local_code", StringType(), True),
    StructField("coordinates", StringType(), True),
    StructField("state", StringType(), True)
    ])


spark_df_airports = spark.createDataFrame(airport_df, schema)
spark_df_airports.createOrReplaceTempView("airports")


In [72]:
# Insert the data into airports dimension table 
airports_dim = spark.sql("""
                        SELECT 
                            ident, 
                            type,
                            name,
                            elevation_ft,
                            state,
                            TRIM(UPPER(municipality)) AS municipality,
                            iata_code

                        FROM airports
""")

In [73]:
airports_dim.show(5)

+-----+-------------+--------------------+------------+-----+------------+---------+
|ident|         type|                name|elevation_ft|state|municipality|iata_code|
+-----+-------------+--------------------+------------+-----+------------+---------+
| 00AA|small_airport|Aero B Ranch Airport|      3435.0|   KS|       LEOTI|      NaN|
| 00AK|small_airport|        Lowell Field|       450.0|   AK|ANCHOR POINT|      NaN|
| 00AL|small_airport|        Epps Airpark|       820.0|   AL|     HARVEST|      NaN|
| 00AS|small_airport|      Fulton Airport|      1100.0|   OK|        ALEX|      NaN|
| 00AZ|small_airport|      Cordes Airport|      3810.0|   AZ|      CORDES|      NaN|
+-----+-------------+--------------------+------------+-----+------------+---------+
only showing top 5 rows



### City Demographic data

In [74]:
spark_df_demographics = spark.createDataFrame(city_demographic_df)
spark_df_demographics.createOrReplaceTempView("demographics")


In [75]:
spark_df_demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: double (nullable = true)
 |-- Female Population: double (nullable = true)
 |-- Total Population: long (nullable = true)
 |-- Number of Veterans: double (nullable = true)
 |-- Foreign-born: double (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: long (nullable = true)



In [76]:
# insert data into the demographics dimension table
demographics_dim = spark.sql("""
                                SELECT  City, 
                                        State, 
                                        `Median Age` AS median_age, 
                                        `Male Population` AS male_population, 
                                        `Female Population` AS female_population, 
                                        `Total Population` AS total_population, 
                                        `Foreign-born` AS foreign_born, 
                                        `Average Household Size` AS average_household_size, 
                                        `State Code` AS state, 
                                        Race, 
                                        Count
                                FROM demographics
""")

In [77]:
demographics_dim.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------+----------------------+-----+--------------------+-----+
|            City|        State|median_age|male_population|female_population|total_population|foreign_born|average_household_size|state|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------+----------------------+-----+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|        40601.0|          41862.0|           82463|     30908.0|                   2.6|   MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|        44129.0|          49500.0|           93629|     32935.0|                  2.39|   MA|               White|58723|
|          Hoover|      Alabama|      38.5|        38040.0|          46799.0|           84839|      8229.0|                  2.58|   AL|               Asian| 4759|
|Rancho Cucamong

### Temperature data

In [78]:
temperature_df.head()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,date
49116,10.067,0.332,Abilene,United States,32.95N,100.53W,1950-02-01
49117,11.824,0.343,Abilene,United States,32.95N,100.53W,1950-03-01
49118,16.963,0.315,Abilene,United States,32.95N,100.53W,1950-04-01
49119,21.444,0.25,Abilene,United States,32.95N,100.53W,1950-05-01
49120,25.832,0.202,Abilene,United States,32.95N,100.53W,1950-06-01


In [79]:
spark_df_temperature = spark.createDataFrame(temperature_df)
spark_df_temperature.createOrReplaceTempView("temperature")

In [80]:
spark_df_temperature.printSchema()

root
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- date: timestamp (nullable = true)



In [81]:
# insert data into the demographics dimension table
temperature_dim = spark.sql("""
                                SELECT  DISTINCT cast(date as date), 
                                        City,                   
                                        AVG(AverageTemperature) OVER (PARTITION BY date, City) AS average_temperature, 
                                        AVG(AverageTemperatureUncertainty)  OVER (PARTITION BY date, City) AS average_termperature_uncertainty
                                FROM temperature
""")

In [82]:
temperature_dim.show()

+----------+----------------+-------------------+--------------------------------+
|      date|            City|average_temperature|average_termperature_uncertainty|
+----------+----------------+-------------------+--------------------------------+
|1957-04-01|            Reno|              9.415|                           0.477|
|1977-04-01|          Irvine|             14.277|                           0.259|
|1977-06-01|          Tacoma|             13.556|                           0.205|
|1994-07-01|     Springfield| 24.105333333333334|             0.27466666666666667|
|2006-12-01|          Tucson|              8.923|                           0.246|
|1956-01-01|      Long Beach|             12.837|                           0.478|
|1956-01-01|       Roseville|              3.732|                           0.267|
|1973-10-01|     New Orleans|             22.781|                           0.259|
|1992-08-01|       Lancaster|  25.76400000000001|              0.6659999999999999|
|201

### Time data

In [83]:
# extract all distinct dates from arrival and departure dates to create dimension table
time_dim = spark.sql("""
                SELECT DISTINCT arrival_date AS date
                FROM immigration_table
                UNION
                SELECT DISTINCT departure_date AS date
                FROM immigration_table
                WHERE departure_date IS NOT NULL
""")

time_dim.createOrReplaceTempView("time")

In [84]:
# insert data into the demographics dimension table
time_dim = spark.sql("""
                                SELECT  date, 
                                        YEAR(date) AS year, 
                                        MONTH(date) AS month, 
                                        DAY(date) AS day, 
                                        WEEKOFYEAR(date) AS week, 
                                        DAYOFWEEK(date) as weekday,
                                        DAYOFYEAR(date) year_day

                                FROM time
""")

In [85]:
time_dim.show()

+----------+----+-----+---+----+-------+--------+
|      date|year|month|day|week|weekday|year_day|
+----------+----+-----+---+----+-------+--------+
|2016-08-17|2016|    8| 17|  33|      4|     230|
|2016-04-22|2016|    4| 22|  16|      6|     113|
|2016-08-08|2016|    8|  8|  32|      2|     221|
|2016-08-20|2016|    8| 20|  33|      7|     233|
|2016-09-11|2016|    9| 11|  36|      1|     255|
|2016-07-06|2016|    7|  6|  27|      4|     188|
|2016-05-21|2016|    5| 21|  20|      7|     142|
|2016-04-15|2016|    4| 15|  15|      6|     106|
|2016-07-23|2016|    7| 23|  29|      7|     205|
|2016-04-18|2016|    4| 18|  16|      2|     109|
|2069-05-03|2069|    5|  3|  18|      6|     123|
|2016-06-08|2016|    6|  8|  23|      4|     160|
|2016-04-09|2016|    4|  9|  14|      7|     100|
|2016-07-30|2016|    7| 30|  30|      7|     212|
|2016-05-16|2016|    5| 16|  20|      2|     137|
|2016-08-07|2016|    8|  7|  31|      1|     220|
|2016-09-14|2016|    9| 14|  37|      4|     258|


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [86]:
# check record counts
def perform_quality_check1(input_df, table_name):
    """
    Check data completeness by ensuring that there are records in each table.
    
    Parameters:
     input_df: spark dataframe to check counts on.
     table_name: name of table
    """
    
    record_count = input_df.count()

    if (record_count == 0):
        print("Data quality check failed for {} table with zero records!".format(table_name))
    else:
        print("Data quality check is done for {} table with {} records.".format(table_name, record_count))
        
    return 0

In [87]:
# check duplicated records
def perform_quality_check2(input_df, table_name):
    """
    Check for data duplication in each table.
    
    Parameters:
     input_df: spark dataframe to check counts on.
     table_name: name of table
    """
    
    duplicated_count = input_df.toPandas().duplicated().sum()

    if (duplicated_count == 0):
        print("Data quality check is done for {} table with No duplicated records".format(table_name, duplicated_count))
    else:
        print("Data quality check failed for {} table with duplicated records!".format(table_name))
        
    return 0

In [88]:
tables = {
    "immigration": immigration_fact,
    "airport": airports_dim,
    "Demographics": demographics_dim,
    "temperature": temperature_dim,
    "time": time_dim      
}
print("***********Perform data quality check for data completness*********")
for table_name, table in tables.items():
    perform_quality_check1(table, table_name)

***********Perform data quality check for data completness*********
Data quality check is done for immigration table with 2213029 records.
Data quality check is done for airport table with 14575 records.
Data quality check is done for Demographics table with 2875 records.
Data quality check is done for temperature table with 189471 records.
Data quality check is done for time table with 173 records.


In [89]:
print("***********Perform data quality check for data Duplication*********")
for table_name, table in tables.items():
    perform_quality_check2(table, table_name)

***********Perform data quality check for data Duplication*********
Data quality check is done for immigration table with No duplicated records
Data quality check is done for airport table with No duplicated records
Data quality check is done for Demographics table with No duplicated records
Data quality check is done for temperature table with No duplicated records
Data quality check is done for time table with No duplicated records


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
 * **Due to the massive amount of immigration dataset ( around 3M rows), we use Apache spark to handle such huge amount of data**
 * **Also we use pandas with other relativly small datasets to handle it**


* Propose how often the data should be updated and why.
 * **It should be updated monthly as the main dataset (I94 immigration dataset is updated monthly**

* Write a description of how you would approach the problem differently under the following scenarios:
1. The data was increased by 100x.
 
 * **We will still using spark as it is a good choice for handling big data but we will increase the number of nodes in our cluster, and we have to use AWS to store data in S3 and Redshift**

2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * **We would manage the ETL pipeline in a DAG from Apache Airflow. This would ensure that the pipeline runs in time, perform data quality checks**

 3. The database needed to be accessed by 100+ people.
  * **we would move our analytics database into Amazon Redshift to be easily accessed by all people**