# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project aims to be able to answers questions on US immigration such as what are the most popular cities for immigration, what is the gender and age distribution of the immigrants, what is the visa type distribution of the immigrants, what is the average age per immigrant and what is the average temperature per month per city. We extract data from 4 different sources, the I94 immigration dataset of 2010, temperature data from Kaggle and US city demographic data from OpenSoft and Airport dataset from kaggle. We design 4 dimension tables: immigration table, airport table, temperature table,demographic table and 1 fact table: Immigration_US. We use Spark for ETL jobs and store the results in parquet for downstream analysis.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
import os
import glob
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *

In [3]:
#Build spark session
#Build spark session
spark = spark = SparkSession.builder.getOrCreate()

In [4]:
#Build SQL context object
sqlContext = SQLContext(spark)

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

# U.S. City Demographic Data
* This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 

* This data comes from the US Census Bureau's 2015 American Community Survey.
* Check data from here https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/

In [5]:
# Read in the data here
df_demographics=spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")

In [6]:
df_demographics.show(3)

+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|Hispanic or Latino|25924|
|       Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|             White|58723|
|       Hoover|      Alabama|      38.5|          38040|            46799|           

In [7]:
df_demographics.count()

2891

# Airport Code Data
* The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code (from wikipedia).
* Check from here https://datahub.io/core/airport-codes#readme

In [8]:
df_airports=spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")

In [9]:
df_airports.show(3)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
+-----+-------------+--------------------+------------+---------+-----------+-----

In [10]:
df_airports.count()

55075

# World Temperature Data
* climate change is the biggest threat of our age while others say it’s a myth based on dodgy science. We are turning some of the data over to you so you can form your own view.
* This dataset came from Kaggle. 
* Check from here https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

In [11]:
#temp_path = '../../data2/GlobalLandTemperaturesByCity.csv'
#df_temp = pd.read_csv(temp_path)

In [12]:
df_temperature=spark.read.format("csv").option("header", "true").load("df_temp.csv")

In [13]:
df_temperature.show(3)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 3 rows



In [14]:
df_temperature.count()

8599212

# I94 Immigration Data

* This data comes from the US National Tourism and Trade Office. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in.

* The National Travel and Tourism Office (NTTO) functions as the U.S. federal tourism office. A core responsibility is to collect, analyze, and disseminate international travel and tourism statistics. As a result, NTTO is charged with managing, improving, and expanding the system to fully account and report the impact of travel and tourism in the United States.

### I-94 Visitor Arrivals Program
* The National Travel and Tourism Office (NTTO) works cooperatively with the U.S. Department of Homeland Security (DHS)/U.S. Customs and Border Protection (CBP) to release I-94 Visitor Arrivals Program data, providing a comprehensive count of all visitors (overseas all travel modes plus Mexico air and sea) entering the United States. 

In [15]:
# Read in the data here
## will took some minutes
#i94_path = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
#df_i94 = pd.read_sas(i94_path, 'sas7bdat', encoding="ISO-8859-1")

In [16]:
df_immegration_i94=spark.read.format("csv").option("header", "true").load("df_i94.csv")

In [17]:
df_immegration_i94.show(3)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|  dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|      admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|      null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1897628485.0| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811.0|     

In [18]:
df_immegration_i94.count()

3096313

# Step 2: Explore and Assess the Data
## Explore the Data
### Identify data quality issues, like missing values, duplicate data, etc.

## Cleaning Steps
* Filter average temperature data only for the United States and only year == 2010 and create new fields with year, month, average temperature.
* Remove nulls then convert i94res codes to country of origin then select important columns from the immigration data and drop duplicates.
* Sort city demographic data then calculate percentages and select percentages fields and drop duplicates.
* Filter airport data for "small_airport" and use substring to return the state code.

In [19]:
from us_state_abbrev import state_udf, abbrev_state, abbrev_state_udf,city_code_udf,city_codes
from immigration_codes import country_udf
import re

In [20]:
df_temp_us = df_temperature.filter(df_temperature["country"]=="United States")\
.filter(year(df_temperature["dt"])==2010)\
.withColumn("year",year(df_temperature["dt"]))\
.withColumn("month",month(df_temperature["dt"]))\
.withColumn("avg_temp",df_temperature["AverageTemperature"])

In [21]:
df_temp_us.show(5)

+----------+------------------+-----------------------------+-------+-------------+--------+---------+----+-----+------------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|year|month|          avg_temp|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+----+-----+------------------+
|2010-01-01|             4.811|                        0.257|Abilene|United States|  32.95N|  100.53W|2010|    1|             4.811|
|2010-02-01|              4.51|                        0.309|Abilene|United States|  32.95N|  100.53W|2010|    2|              4.51|
|2010-03-01|            11.398|          0.33799999999999997|Abilene|United States|  32.95N|  100.53W|2010|    3|            11.398|
|2010-04-01|17.145999999999994|                         0.18|Abilene|United States|  32.95N|  100.53W|2010|    4|17.145999999999994|
|2010-05-01|            21.534|                        0.358|Abilene|

In [22]:
df_temp_us.count()

3084

In [23]:
# Dictionary of valid i94port codes is created
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94portvalid = {}
with open('i94port.txt') as f:
     for data in f:
            match = re_obj.search(data)
            i94portvalid[match[1]]=[match[2]]

In [24]:
@udf()
def get_city(city):
    for key in i94portvalid:
        if city.lower() in i94portvalid[key][0].lower():
            return key

In [25]:
df_temp_us_2 = df_temp_us.withColumn("i94port_code", get_city(df_temp_us.City))

In [26]:
df_temp_us_2 = df_temp_us_2.filter(df_temp_us_2.i94port_code != "null")

In [27]:
final_temp_data=df_temp_us_2.select("year","month",round(col("avg_temp"),1).alias("avg_temp"),"i94port_code","City","Country").dropDuplicates()

In [28]:
final_temp_data.count()

1403

In [29]:
final_temp_data.show(5)

+----+-----+--------+------------+---------+-------------+
|year|month|avg_temp|i94port_code|     City|      Country|
+----+-----+--------+------------+---------+-------------+
|2010|    7|    26.5|         BAL|Baltimore|United States|
|2010|   11|     6.5|         COL| Columbus|United States|
|2010|    9|    30.5|         OTM|     Mesa|United States|
|2010|    3|    12.3|         OAK|  Oakland|United States|
|2010|    2|    10.4|         ACY|   Pomona|United States|
+----+-----+--------+------------+---------+-------------+
only showing top 5 rows



## Temperature Table 

* year : year
* month : month
* avg_temp : average of temperature 
* i94port_code : code of the city visited
* City : name of the city visited
* Country : United States for all data 

# Immigration Data

In [30]:
df_immigration = df_immegration_i94.filter(df_immegration_i94.i94port.isin(list(i94portvalid.keys())))

In [31]:
df_immigration.count()

3088544

In [32]:
df_immigration_1 = df_immigration.filter(df_immigration.i94addr.isNotNull())\
.filter(df_immigration.i94res.isNotNull())\
.withColumn("city_port_name",city_code_udf(df_immigration["i94port"]))

In [33]:
final_immigration_data=df_immigration_1.select("cicid",col("i94yr").alias("year"),col("i94mon").alias("month"),\
"city_port_name",col("i94res").alias("origin_country"),"i94port",col("i94addr").alias("state_code"),"arrdate", "i94mode", "depdate", "i94visa")

In [34]:
final_immigration_data.count()

2938179

In [35]:
final_immigration_data.show(5)

+-----+------+-----+--------------------+--------------+-------+----------+-------+-------+-------+-------+
|cicid|  year|month|      city_port_name|origin_country|i94port|state_code|arrdate|i94mode|depdate|i94visa|
+-----+------+-----+--------------------+--------------+-------+----------+-------+-------+-------+-------+
|  7.0|2016.0|  4.0|  ATLANTA           |         276.0|    ATL|        AL|20551.0|    1.0|   null|    3.0|
| 15.0|2016.0|  4.0|WASHINGTON DC    ...|         101.0|    WAS|        MI|20545.0|    1.0|20691.0|    2.0|
| 16.0|2016.0|  4.0|  NEW YORK          |         101.0|    NYC|        MA|20545.0|    1.0|20567.0|    2.0|
| 17.0|2016.0|  4.0|  NEW YORK          |         101.0|    NYC|        MA|20545.0|    1.0|20567.0|    2.0|
| 18.0|2016.0|  4.0|  NEW YORK          |         101.0|    NYC|        MI|20545.0|    1.0|20555.0|    1.0|
+-----+------+-----+--------------------+--------------+-------+----------+-------+-------+-------+-------+
only showing top 5 rows



## Immigration Table

* cicid
* year : year
* month : month
* city_port_name : name of the arrival city
* origin_country : country of residence
* i94port : arrival airport (city visited)
* state_code : code of arrival state
* arrdate : arrival date
* i94mode : 1 digit travel code
* depdate = departure date from the USA,
* i94visa = reason for immigration,

# U.S. Demographic Data by State

In [36]:
df_demographics.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [37]:
#Calculate percentages of each numeric column and create new columns.
demographics_data=df_demographics\
.withColumn("Median Age",col("Median Age").cast("float"))\
.withColumn("male_pop_prc",df_demographics["Male Population"]/df_demographics["Total Population"]*100)\
.withColumn("female_pop_prc",df_demographics["Female Population"]/df_demographics["Total Population"]*100)\
.withColumn("veterans_prc",df_demographics["Number of Veterans"]/df_demographics["Total Population"]*100)\
.withColumn("foreign_born_prc",df_demographics["Foreign-born"]/df_demographics["Total Population"]*100)\
.withColumn("race_prc",df_demographics["Count"]/df_demographics["Total Population"]*100)\
.orderBy("State")

In [38]:
demographics_data.show(2)

+----------+-------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+-----------------+-----------------+-----------------+------------------+------------------+
|      City|  State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|     male_pop_prc|   female_pop_prc|     veterans_prc|  foreign_born_prc|          race_prc|
+----------+-------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+-----------------+-----------------+-----------------+------------------+------------------+
|Montgomery|Alabama|      35.4|          94582|           106004|          200586|             14955|        9337|                  2.41|        AL|             White|73545|47.15284217243477|52.84715782

In [39]:
#Select columns with new calculated percentages and state names.
demographics_data_1=demographics_data.select("State",col("State Code").alias("state_code"),\
                                                         col("Median Age").alias("median_age"),\
                                                         "male_pop_prc",\
                                                         "female_pop_prc",\
                                                         "veterans_prc",\
                                                         "foreign_born_prc",\
                                                         "race_prc",\
                                                         "Race")

In [40]:
demographics_data_1.show(5)

+-------+----------+----------+------------------+------------------+-----------------+------------------+------------------+--------------------+
|  State|state_code|median_age|      male_pop_prc|    female_pop_prc|     veterans_prc|  foreign_born_prc|          race_prc|                Race|
+-------+----------+----------+------------------+------------------+-----------------+------------------+------------------+--------------------+
|Alabama|        AL|      38.1| 48.52311304292649| 51.47688695707351|8.797339171081992| 6.710767050562094|32.552322937487446|Black or African-...|
|Alabama|        AL|      29.1| 48.09229392503407| 51.90770607496593|3.708637556183774| 4.785535601700258| 2.516829709776485|  Hispanic or Latino|
|Alabama|        AL|      38.9|47.636815920398014|52.363184079601986|9.378701729447998| 2.515695332859512|1.7398128405591091|               Asian|
|Alabama|        AL|      35.4| 47.15284217243477| 52.84715782756524|7.455654931052018|4.6548612565184015|36.665071340

In [41]:
# pivot the Race column
demographics_data_2=demographics_data_1.groupBy("State","state_code",\
                                             "median_age","male_pop_prc",\
                                             "female_pop_prc","veterans_prc",\
                                              "foreign_born_prc").pivot("Race").avg("race_prc")

In [None]:
#change the header name of the race fields for spark compatibility.
demographics_data_3=demographics_data_2.select("State","state_code","median_age","male_pop_prc",\
                                         "female_pop_prc","veterans_prc","foreign_born_prc",\
                                         col("American Indian and Alaska Native").alias("native_american"),\
                                         col("Asian"),col("Black or African-American").alias("Black"),\
                                         col("Hispanic or Latino").alias("hispanic_or_latino"),"White")

In [None]:
#Find the average of each column per state.

In [42]:
final_demographics_data=demographics_data_3.groupBy("State","state_code").avg("median_age","male_pop_prc","female_pop_prc",\
                                                       "veterans_prc","foreign_born_prc","native_american",\
                                                       "Asian","Black","hispanic_or_latino","White").orderBy("State")

In [43]:
#Round the percentages and fix column names
final_demographics_data=final_demographics_data.select("State","state_code",round(col("avg(median_age)"),1).alias("median_age"),\
                  round(col("avg(male_pop_prc)"),1).alias("male_pop_prc"),\
                   round(col("avg(female_pop_prc)"),1).alias("female_pop_prc"),\
                   round(col("avg(veterans_prc)"),1).alias("veterans_prc"),\
                   round(col("avg(foreign_born_prc)"),1).alias("foreign_born_prc"),\
                   round(col("avg(native_american)"),1).alias("native_american"),\
                   round(col("avg(Asian)"),1).alias("Asian"),\
                   round(col("avg(hispanic_or_latino)"),1).alias("hispanic_or_latino"),\
                   round(col("avg(Black)"),1).alias("Black"),\
                   round(col('avg(White)'),1).alias('White')
                  )

In [44]:
final_demographics_data.show(5)

+----------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+
|     State|state_code|median_age|male_pop_prc|female_pop_prc|veterans_prc|foreign_born_prc|native_american|Asian|hispanic_or_latino|Black|White|
+----------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+
|   Alabama|        AL|      36.2|        47.2|          52.8|         6.8|             5.1|            0.8|  2.9|               3.6| 45.0| 52.0|
|    Alaska|        AK|      32.2|        51.2|          48.8|         9.2|            11.1|           12.2| 12.3|               9.1|  7.7| 71.2|
|   Arizona|        AZ|      35.0|        48.8|          51.2|         6.6|            12.6|            2.8|  5.1|              28.8|  6.0| 82.7|
|  Arkansas|        AR|      32.8|        48.4|          51.6|         5.2|            10.7|            1.8|  4.1|          

## Demographics Table 
    
* State : City visited
* state_code : code of visited city
* median_age : median age of visitor
* male_pop_prc : male population percentage
* female_pop_prc : female population percentage
* veterans_prc : veterans population percentage
* foreign_born_prc : foreign born population percentage
* native_american : native_american population percentage
* Asian : Asian population percentage
* hispanic_or_latino : hispanic_or_latino population percentage
* Black : balck population percentage
* White : white population percentage

# U.S. Airport Data by State

In [45]:
df_airports.show(10)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [46]:
#Filter airport data for 'small_airport' in the U.S. and use substring to show state
airport_data=df_airports.filter(df_airports["type"]=="small_airport")\
.filter(df_airports["iso_country"]=="US")\
.withColumn("iso_region",substring(df_airports["iso_region"],4,2))\
.withColumn("elevation_ft",col("elevation_ft").cast("float"))

In [47]:
airport_data.show()

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
| 00AA|small_airport|Aero B Ranch Airport|      3435.0|       NA|         US|        KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|       450.0|       NA|         US|        AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|       820.0|       NA|         US|        AL|     Harvest|    00AL|     null|      00AL|-86.7703018188476...|
| 00AS|small_airport|      Fulton Airport|      1100.0|       NA|         US|     

In [48]:
airport_data.count()

13720

In [49]:
#Find average elevation per state
airport_elevation_ft=airport_data.groupBy("iso_country","iso_region").avg("elevation_ft")

In [50]:
airport_elevation_ft.count()

51

In [51]:
#Select relevant columns and drop duplicates
final_airport_data=airport_elevation_ft.select(col("iso_country").alias("country"),\
                                               col("iso_region").alias("state"),\
                                               round(col("avg(elevation_ft)"),1).alias("avg_elevation_ft")).orderBy("iso_region")

In [52]:
final_airport_data.show(5)

+-------+-----+----------------+
|country|state|avg_elevation_ft|
+-------+-----+----------------+
|     US|   AK|           545.1|
|     US|   AL|           414.6|
|     US|   AR|           488.4|
|     US|   AZ|          3098.0|
|     US|   CA|          1261.4|
+-------+-----+----------------+
only showing top 5 rows



## Airport Table 

* country : country visited (US)
* state : city visited code
* avg_elevation_ft : average elevation in specific city

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

## Temperature Table 

* year : year
* month : month
* avg_temp : average of temperature 
* i94port_code : code of the city visited
* City : name of the city visited
* Country : United States for all data 

## Immigration Table

* cicid
* year : year
* month : month
* city_port_name : name of the arrival city
* origin_country : country of residence
* i94port : arrival airport (city visited)
* state_code : code of arrival state
* arrdate : arrival date
* i94mode : 1 digit travel code
* depdate = departure date from the USA,
* i94visa = reason for immigration,

## Demographics Table 
    
* State : City visited
* state_code : code of visited city
* median_age : median age of visitor
* male_pop_prc : male population percentage
* female_pop_prc : female population percentage
* veterans_prc : veterans population percentage
* foreign_born_prc : foreign born population percentage
* native_american : native_american population percentage
* Asian : Asian population percentage
* hispanic_or_latino : hispanic_or_latino population percentage
* Black : balck population percentage
* White : white population percentage

## Airport Table 

* country : country visited (US)
* state : city visited code
* avg_elevation_ft : average elevation in specific city

# Immigration_US Fact Table
* avg_temp
* female_pop_prc
* male_pop_prc
* month
* year
* foreign_born_prc
* native_american
* avg_elevation_ft
* male_pop_prc
* Asian
* hispanic_or_latino
* White
* Black
* origin_country
* city_port_name

## 3.2 Mapping Out Data Pipelines
<br>1- Dimension tables will be created from cleansed data.
<br>2- Fact table is created as a SQL query with joins to dimension tables.
<br>3- Fact table is converted back to a spark dataframe.
<br>4- Fact table is written as final parquet file.

## Step 4: Run Pipelines to Model the Data
### 4.1 Create the data model
* Build the data pipelines to create the data model.

In [53]:
# Create Dimension tables
final_immigration_data.createOrReplaceTempView("immigration")
final_demographics_data.createOrReplaceTempView("demographics")
final_airport_data.createOrReplaceTempView("airport")
final_temp_data.createOrReplaceTempView("temperature")

In [54]:
#allow unlimited time for SQL joins and parquet writes.
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

In [52]:
final_temp_data.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- avg_temp: double (nullable = true)
 |-- i94port_code: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)



In [53]:
final_immigration_data.printSchema()

root
 |-- cicid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- city_port_name: string (nullable = true)
 |-- origin_country: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94visa: string (nullable = true)



In [55]:
final_demographics_data.printSchema()

root
 |-- State: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_pop_prc: double (nullable = true)
 |-- female_pop_prc: double (nullable = true)
 |-- veterans_prc: double (nullable = true)
 |-- foreign_born_prc: double (nullable = true)
 |-- native_american: double (nullable = true)
 |-- Asian: double (nullable = true)
 |-- hispanic_or_latino: double (nullable = true)
 |-- Black: double (nullable = true)
 |-- White: double (nullable = true)



In [55]:
final_airport_data.printSchema()

root
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- avg_elevation_ft: double (nullable = true)



In [56]:
#spark.sql("""SELECT i94port_code,avg_temp,Country,t.month From temperature t JOIN immigration m ON t.i94port_code = m.i94port WHERE t.month = 1""").show(5)

In [57]:
#spark.sql("""SELECT * FROM demographics""").show(5)

In [58]:
#spark.sql("""SELECT * FROM airport""").show(5)

In [59]:
#spark.sql("""SELECT avg_elevation_ft,male_pop_prc,median_age FROM airport a JOIN demographics d ON a.state=d.state_code""").show(5)

In [60]:
#spark.sql("""SELECT avg_temp,t.month,a.avg_elevation_ft,d.male_pop_prc,m.origin_country FROM temperature t JOIN demographics d ON d.State = t.City JOIN immigration m ON t.i94port_code = m.i94port JOIN airport a ON a.state=d.state_code""").show(5)

spark.sql("""SELECT avg_temp,t.month,a.avg_elevation_ft,d.male_pop_prc,m.origin_country\
                                    FROM temperature t\
                                    JOIN demographics d\
                                    ON d.State = t.City\
                                    JOIN immigration m\
                                    ON t.i94port_code = m.i94port\
                                    JOIN airport a\
                                    ON a.state=d.state_code\
                                    GROUP BY m.year,t.month, m.origin_country,\
                                    m.state_code,t.avg_temp,a.avg_elevation_ft,\
                                    d.female_pop_prc,male_pop_prc,d.foreign_born_prc,d.native_american,\
                                    d.Asian,d.hispanic_or_latino,\
                                    d.hispanic_or_latino,d.White,\
                                    d.Black""").show(5)

## Fact Table

spark.sql("""SELECT avg_temp,d.female_pop_prc,d.male_pop_prc,m.month,m.year,d.foreign_born_prc,d.native_american,a.avg_elevation_ft,\
                                    d.male_pop_prc,\
                                    d.Asian,\
                                    d.hispanic_or_latino,\
                                    d.hispanic_or_latino,\
                                    d.White,\
                                    d.Black,\
                                    m.origin_country\
                                    FROM temperature t\
                                    JOIN demographics d\
                                    ON d.State = t.City\
                                    JOIN immigration m\
                                    ON t.i94port_code = m.i94port\
                                    JOIN airport a\
                                    ON a.state=d.state_code\
                                    GROUP BY m.year,m.month, m.origin_country,\
                                    m.state_code,t.avg_temp,a.avg_elevation_ft,\
                                    d.female_pop_prc,male_pop_prc,d.foreign_born_prc,d.native_american,\
                                    d.Asian,d.hispanic_or_latino,\
                                    d.hispanic_or_latino,d.White,\
                                    d.Black""").show(5)

In [66]:
immigration_us = spark.sql("""SELECT avg_temp,d.female_pop_prc,d.male_pop_prc,m.month,m.year,d.foreign_born_prc,d.native_american,a.avg_elevation_ft,\
                                    d.Asian,\
                                    d.hispanic_or_latino,\
                                    d.White,\
                                    d.Black,\
                                    m.origin_country,\
                                    t.City,\
                                    m.city_port_name,\
                                    COUNT(m.state_code) AS state_counter,\
                                    COUNT(m.city_port_name) AS city_counter\
                                    FROM temperature t\
                                    JOIN demographics d\
                                    ON d.State = t.City\
                                    JOIN immigration m\
                                    ON t.i94port_code = m.i94port\
                                    JOIN airport a\
                                    ON a.state=d.state_code\
                                    GROUP BY m.year,m.month, m.origin_country,\
                                    m.state_code,t.avg_temp,a.avg_elevation_ft,\
                                    d.female_pop_prc,male_pop_prc,d.foreign_born_prc,d.native_american,\
                                    d.Asian,d.hispanic_or_latino,\
                                    d.hispanic_or_latino,d.White,\
                                    d.Black,t.City,m.city_port_name""")

## 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:

* Integrity constraints on the relational database (e.g., unique key, data type, etc.)
* Unit tests for the scripts to ensure they are doing the right thing
* Source/Count checks to ensure completeness
<br>Run Quality Checks

In [70]:
# Write fact table to parquet
#immigration_us.write.parquet("output/immigration_us")

In [72]:
immigration_us.select(isnull('year').alias('year'),\
                             isnull('month').alias('month'),\
                             isnull('origin_country').alias('country')).dropDuplicates().show()

+-----+-----+-------+
| year|month|country|
+-----+-----+-------+
|false|false|  false|
+-----+-----+-------+



In [69]:
# people immigrant to united states from fact table
immigration_us.select(sum('city_counter').alias('fact_table_count')).show()

+----------------+
|fact_table_count|
+----------------+
|         2938179|
+----------------+



In [70]:
# Count the total number of immigrants from the source table 
spark.sql('SELECT COUNT(*) as immigrants FROM immigration').show()

+----------+
|immigrants|
+----------+
|   2938179|
+----------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 - **We can use Amazon Redshift: It is an analytical database that is optimized for aggregation and read-heavy workloads**
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 - **We can use Airflow, create DAG retries and send emails on failures and send mails to tell us it is finished.**
 - **Have daily quality checks; if fail, send emails to operators and freeze dashboards**
 * The database needed to be accessed by 100+ people.
 -**We can use RedShift to have the data stored ,it can efficiently be accessed by many people.**