# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [127]:
# Do all imports and installs here
import pandas as pd
import os
import glob
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [74]:
# build Spark session
spark = SparkSession.builder.getOrCreate()

In [75]:
# build SQL context object
sql_context = SQLContext(spark)

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

### U.S. City Demographic Data

This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 

This data comes from the US Census Bureau's 2015 American Community Survey.
source: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/

In [76]:
# Read in the data here
df_demographics=spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")

In [77]:
df_demographics.show(10)

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|       Alabama|      38.5|          3

In [78]:
df_demographics.count()

2891

### Airport Code Data

The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code (from wikipedia).

In [79]:
df_airports=spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")

In [80]:
df_airports.show(10)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [81]:
df_airports.count()

55075

### World Temperature Data

source: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

In [82]:
df_temperature=spark.read.format("csv").option("header", "true").load("GlobalLandTemperaturesByCity.csv")

In [83]:
df_temperature.show(10)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01|5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|            10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01|14.050999999999998|                   

In [84]:
df_temperature.count()

8599212

### I94 Immigration Data

This data comes from the US National Tourism and Trade Office. 
In addition to providing statistics, the National Travel and Tourism Office (NTTO) creates a positive climate for growth in travel and tourism by reducing institutional barriers to tourism, administers joint marketing efforts, provides official travel and tourism statistics, and coordinates efforts across federal agencies through the Tourism Policy Council. 
source: https://www.trade.gov/national-travel-and-tourism-office

In [85]:
df_immegration_i94=spark.read.format("csv").option("header", "true").load("immigration_data_sample.csv")

In [86]:
df_immegration_i94.show(10)

+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|    _c0|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|2027561|4084316.0|2016.0|   4.0| 209.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|  61.0|    2.0|  1.0|20160422|    null| null|      G|      O|   null|      M| 1955.0|07202016|     F|  null|     JL|56582674633.0|00782|      WT|
|2171295|4422636.0|2016.0|   4.0| 582.0| 582.0|    MCA|20567.0|    1

In [87]:
df_immegration_i94.count()

1000

In [88]:
# from pyspark.sql import SparkSession

# spark = SparkSession.builder.\
# config("spark.jars.repositories", "https://repos.spark-packages.org/").\
# config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
# enableHiveSupport().getOrCreate()

# df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [89]:
#write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

# Performing cleaning tasks here
- Filter average temperature data only for the United States and only year == 2011 and create new fields with year, month, average temperature.
- Remove nulls then convert i94res codes to country of origin then select important columns from the immigration data and drop duplicates.
- Sort city demographic data then calculate percentages and select percentages fields and drop duplicates.
Filter airport data for "small_airport" and use substring to return the state code.




In [90]:
import re
from us_state_abbrev import state_udf, abbrev_state, abbrev_state_udf,city_code_udf,city_codes
from immigration_codes import country_udf

In [91]:
df_temp_us = df_temperature.filter(df_temperature["country"]=="United States")\
.filter(year(df_temperature["dt"])==2011)\
.withColumn("year",year(df_temperature["dt"]))\
.withColumn("month",month(df_temperature["dt"]))\
.withColumn("avg_temp",df_temperature["AverageTemperature"])

In [92]:
df_temp_us.show(10)

+----------+------------------+-----------------------------+-------+-------------+--------+---------+----+-----+------------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|year|month|          avg_temp|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+----+-----+------------------+
|2011-01-01| 5.002000000000002|                        0.213|Abilene|United States|  32.95N|  100.53W|2011|    1| 5.002000000000002|
|2011-02-01|             6.968|                        0.262|Abilene|United States|  32.95N|  100.53W|2011|    2|             6.968|
|2011-03-01|            14.561|          0.37200000000000005|Abilene|United States|  32.95N|  100.53W|2011|    3|            14.561|
|2011-04-01|            20.344|          0.23199999999999998|Abilene|United States|  32.95N|  100.53W|2011|    4|            20.344|
|2011-05-01|            23.225|                        0.278|Abilene|

In [93]:
df_temp_us.count()

3084

In [94]:
# determine the valid port
valid_port = {}
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
with open('i94_port.txt') as f:
     for data in f:
            port = re_obj.search(data)
            valid_port[port[1]]=[port[2]]

In [95]:
@udf()
def get_city(city):
    for key in valid_port:
        if city.lower() in valid_port[key][0].lower():
            return key

In [96]:
df_temp_us_valid_port = df_temp_us.withColumn("port_code", get_city(df_temp_us.City))

In [97]:
df_temp_us_valid_port_filtered= df_temp_us_valid_port.filter(df_temp_us_valid_port.port_code != "null")

In [98]:
df_temp_us_valid_port_filtered=df_temp_us_valid_port_filtered.select("year","month",round(col("avg_temp"),1).alias("avg_temp"),"port_code","City","Country").dropDuplicates()

Temperature Table
- year 
- month
- avg_temp
- port_code : code of the city visited
- City : name of the city visited
- Country : United States for all data

In [99]:
df_temp_us_valid_port_filtered.show(10)

+----+-----+--------+---------+-----------+-------------+
|year|month|avg_temp|port_code|       City|      Country|
+----+-----+--------+---------+-----------+-------------+
|2011|   10|    19.3|      SRQ|     Denton|United States|
|2011|    4|    10.6|      JER|Jersey City|United States|
|2011|   11|     5.3|      LLB|    Lincoln|United States|
|2011|   12|     6.9|      RCM|   Richmond|United States|
|2011|   12|     0.8|      SPI|Springfield|United States|
|2011|    9|    20.5|      STO|   Stockton|United States|
|2011|    8|    32.2|      AUS|     Austin|United States|
|2011|    3|     5.0|      DEN|     Denver|United States|
|2011|   12|     3.6|      ELP|    El Paso|United States|
|2011|    3|    11.9|      FTF|  Fairfield|United States|
+----+-----+--------+---------+-----------+-------------+
only showing top 10 rows



In [100]:
df_temp_us_valid_port_filtered.count()

1404

### Immigration Data

In [101]:
df_immigration = df_immegration_i94.filter(df_immegration_i94.i94port.isin(list(valid_port.keys())))

In [102]:
df_immigration.count()

999

In [103]:
df_immigration_filtered = df_immigration.filter(df_immigration.i94addr.isNotNull())\
.filter(df_immigration.i94res.isNotNull())\
.withColumn("city_port_name",city_code_udf(df_immigration["i94port"]))

In [104]:
immigration_data_df=df_immigration_filtered.select("cicid",col("i94yr").alias("year"),col("i94mon").alias("month"),\
"city_port_name",col("i94res").alias("origin_country"),"i94port",col("i94addr").alias("state_code"),"arrdate", "i94mode", "depdate", "i94visa")

In [105]:
immigration_data_df.show(10)

+---------+------+-----+------------------+--------------+-------+----------+-------+-------+-------+-------+
|    cicid|  year|month|    city_port_name|origin_country|i94port|state_code|arrdate|i94mode|depdate|i94visa|
+---------+------+-----+------------------+--------------+-------+----------+-------+-------+-------+-------+
|4084316.0|2016.0|  4.0|HONOLULU          |         209.0|    HHW|        HI|20566.0|    1.0|20573.0|    2.0|
|4422636.0|2016.0|  4.0|MCALLEN           |         582.0|    MCA|        TX|20567.0|    1.0|20568.0|    2.0|
|1195600.0|2016.0|  4.0|    KAHULUI - MAUI|         112.0|    OGG|        FL|20551.0|    1.0|20571.0|    2.0|
|5291768.0|2016.0|  4.0|LOS ANGELES       |         297.0|    LOS|        CA|20572.0|    1.0|20581.0|    2.0|
| 985523.0|2016.0|  4.0|CHAMPLAIN         |         111.0|    CHM|        NY|20550.0|    3.0|20553.0|    2.0|
|1481650.0|2016.0|  4.0|ATLANTA           |         577.0|    ATL|        GA|20552.0|    1.0|20606.0|    2.0|
|2197173.0

In [106]:
immigration_data_df.count()

940

## Immigration Table
- cicid
- year : year
- month : month
- city_port_name : name of the arrival city
- origin_country : country of residence
- i94port : arrival airport (city visited)
- state_code : code of arrival state
- arrdate : arrival date
- i94mode : 1 digit travel code
- depdate = departure date from the USA,
- i94visa = reason for immigration,


In [109]:
# calculate percentages for numeric columns
demographics_data=df_demographics\
.withColumn("Median Age",col("Median Age").cast("float"))\
.withColumn("male_pop_prc",df_demographics["Male Population"]/df_demographics["Total Population"]*100)\
.withColumn("female_pop_prc",df_demographics["Female Population"]/df_demographics["Total Population"]*100)\
.withColumn("veterans_prc",df_demographics["Number of Veterans"]/df_demographics["Total Population"]*100)\
.withColumn("foreign_born_prc",df_demographics["Foreign-born"]/df_demographics["Total Population"]*100)\
.withColumn("race_prc",df_demographics["Count"]/df_demographics["Total Population"]*100)\
.orderBy("State")

In [110]:
demographics_data.show(10)

+----------+-------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+------------------+------------------+-----------------+------------------+------------------+
|      City|  State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|      male_pop_prc|    female_pop_prc|     veterans_prc|  foreign_born_prc|          race_prc|
+----------+-------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+------------------+------------------+-----------------+------------------+------------------+
|Huntsville|Alabama|      38.1|          91764|            97350|          189114|             16637|       12691|                  2.18|        AL|Black or African-...|61561| 48.52311304292

In [111]:
#Select columns with new calculated percentages and state names.
demographics_data_renamed=demographics_data.select("State",col("State Code").alias("state_code"),\
                                                         col("Median Age").alias("median_age"),\
                                                         "male_pop_prc",\
                                                         "female_pop_prc",\
                                                         "veterans_prc",\
                                                         "foreign_born_prc",\
                                                         "race_prc",\
                                                         "Race")

In [112]:
# pivot the Race column
demographics_data_pivoted=demographics_data_renamed.groupBy("State","state_code",\
                                             "median_age","male_pop_prc",\
                                             "female_pop_prc","veterans_prc",\
                                              "foreign_born_prc").pivot("Race").avg("race_prc")

In [113]:
demographics_data_pivoted.show(10)

+-------+----------+----------+------------------+------------------+------------------+------------------+---------------------------------+------------------+-------------------------+------------------+------------------+
|  State|state_code|median_age|      male_pop_prc|    female_pop_prc|      veterans_prc|  foreign_born_prc|American Indian and Alaska Native|             Asian|Black or African-American|Hispanic or Latino|             White|
+-------+----------+----------+------------------+------------------+------------------+------------------+---------------------------------+------------------+-------------------------+------------------+------------------+
|Alabama|        AL|      38.5|  44.8378693761124| 55.16213062388759|  5.68017067622202| 9.699548556677943|                             null| 5.609448484777048|       21.441789742924833| 4.042951944270913| 72.92518770848314|
|Alabama|        AL|      35.4| 47.15284217243477| 52.84715782756524| 7.455654931052018|4.6548612565

In [114]:
demographics_data_pivoted_renamed=demographics_data_pivoted.select("State","state_code","median_age","male_pop_prc",\
                                         "female_pop_prc","veterans_prc","foreign_born_prc",\
                                         col("American Indian and Alaska Native").alias("native_american"),\
                                         col("Asian"),col("Black or African-American").alias("Black"),\
                                         col("Hispanic or Latino").alias("hispanic_or_latino"),"White")

In [115]:
final_demographics_data=demographics_data_pivoted_renamed.groupBy("State","state_code").avg("median_age","male_pop_prc","female_pop_prc",\
                                                       "veterans_prc","foreign_born_prc","native_american",\
                                                       "Asian","Black","hispanic_or_latino","White").orderBy("State")

In [116]:
#Round the percentages and fix column names
final_demographics_data=final_demographics_data.select("State","state_code",round(col("avg(median_age)"),1).alias("median_age"),\
                  round(col("avg(male_pop_prc)"),2).alias("male_pop_prc"),\
                   round(col("avg(female_pop_prc)"),2).alias("female_pop_prc"),\
                   round(col("avg(veterans_prc)"),2).alias("veterans_prc"),\
                   round(col("avg(foreign_born_prc)"),2).alias("foreign_born_prc"),\
                   round(col("avg(native_american)"),2).alias("native_american"),\
                   round(col("avg(Asian)"),2).alias("Asian"),\
                   round(col("avg(hispanic_or_latino)"),2).alias("hispanic_or_latino"),\
                   round(col("avg(Black)"),2).alias("Black"),\
                   round(col('avg(White)'),2).alias('White')
                  )

In [117]:
final_demographics_data.show(10)

+--------------------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+
|               State|state_code|median_age|male_pop_prc|female_pop_prc|veterans_prc|foreign_born_prc|native_american|Asian|hispanic_or_latino|Black|White|
+--------------------+----------+----------+------------+--------------+------------+----------------+---------------+-----+------------------+-----+-----+
|             Alabama|        AL|      36.2|       47.25|         52.75|        6.76|            5.13|           0.81| 2.91|              3.57|45.01|52.04|
|              Alaska|        AK|      32.2|        51.2|          48.8|         9.2|           11.13|          12.17|12.33|              9.13| 7.74|71.21|
|             Arizona|        AZ|      35.0|       48.81|         51.19|        6.61|           12.64|           2.82| 5.13|             28.77| 6.01|82.68|
|            Arkansas|        AR|      32.8|       48.41|       

## Demographics Table
- State : City visited
- state_code : code of visited city
- median_age : median age of visitor
- male_pop_prc : male population percentage
- female_pop_prc : female population percentage
- veterans_prc : veterans population percentage
- foreign_born_prc : foreign born population percentage
- native_american : native_american population percentage
- Asian : Asian population percentage
- hispanic_or_latino : hispanic_or_latino population percentage
- Black : balck population percentage
- White : white population percentage

### U.S. Airport Data by State

In [118]:
#Filter airport data for 'small_airport' in the U.S. and use substring to show state
airport_data=df_airports.filter(df_airports["type"]=="small_airport")\
.filter(df_airports["iso_country"]=="US")\
.withColumn("iso_region",substring(df_airports["iso_region"],4,2))\
.withColumn("elevation_ft",col("elevation_ft").cast("float"))

In [119]:
airport_data.show(10)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
| 00AA|small_airport|Aero B Ranch Airport|      3435.0|       NA|         US|        KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|       450.0|       NA|         US|        AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|       820.0|       NA|         US|        AL|     Harvest|    00AL|     null|      00AL|-86.7703018188476...|
| 00AS|small_airport|      Fulton Airport|      1100.0|       NA|         US|     

In [120]:
airport_data.count()

13720

In [121]:
final_airport_data=airport_data.groupBy("iso_country","iso_region").avg("elevation_ft").select(col("iso_country").alias("country"),\
                                               col("iso_region").alias("state"),\
                                               round(col("avg(elevation_ft)"),1).alias("avg_elevation_ft")).orderBy("iso_region")

In [122]:
final_airport_data.show(10)

+-------+-----+----------------+
|country|state|avg_elevation_ft|
+-------+-----+----------------+
|     US|   AK|           545.1|
|     US|   AL|           414.6|
|     US|   AR|           488.4|
|     US|   AZ|          3098.0|
|     US|   CA|          1261.4|
|     US|   CO|          5912.8|
|     US|   CT|           490.3|
|     US|   DE|            47.4|
|     US|   FL|            77.7|
|     US|   GA|           649.5|
+-------+-----+----------------+
only showing top 10 rows



## Airport Table
- country : country visited (US)
- state : city visited code
- avg_elevation_ft : average elevation in specific city

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model



## Temperature Table 

* year : year
* month : month
* avg_temp : average of temperature 
* port_code : code of the city visited
* City : name of the city visited
* Country : United States for all data 

## Immigration Table

* cicid
* year : year
* month : month
* city_port_name : name of the arrival city
* origin_country : country of residence
* i94port : arrival airport (city visited)
* state_code : code of arrival state
* arrdate : arrival date
* i94mode : 1 digit travel code
* depdate = departure date from the USA
* i94visa = reason for immigration


## Demographics Table 
    
* State : City visited
* state_code : code of visited city
* median_age : median age of visitor
* male_pop_prc : male population percentage
* female_pop_prc : female population percentage
* veterans_prc : veterans population percentage
* foreign_born_prc : foreign born population percentage
* native_american : native_american population percentage
* Asian : Asian population percentage
* hispanic_or_latino : hispanic_or_latino population percentage
* Black : balck population percentage
* White : white population percentage

## Airport Table 

* country : country visited (US)
* state : city visited code
* avg_elevation_ft : average elevation in specific city

# Immigration_US Fact Table
* avg_temp
* female_pop_prc
* male_pop_prc
* month
* year
* foreign_born_prc
* native_american
* avg_elevation_ft
* male_pop_prc
* Asian
* hispanic_or_latino
* White
* Black
* origin_country
* city_port_name

This schema was chosen because of it's simplicity and it's use in data analytics.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

The following steps are followed:
- create dimension tables from cleaned data
- create a fact table by joining dimension tables using sql query
- convert the fact table to spark df
- parquet fact table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [125]:
# Create Dimension tables
immigration_data_df.createOrReplaceTempView("immigration")
final_demographics_data.createOrReplaceTempView("demographics")
final_airport_data.createOrReplaceTempView("airport")
df_temp_us_valid_port_filtered.createOrReplaceTempView("temperature")

In [131]:
#allow unlimited time for SQL joins and parquet writes.
sql_context.setConf("spark.sql.autoBroadcastJoinThreshold", "0")

## Face table

In [133]:
immigration_us = spark.sql("""SELECT avg_temp,d.female_pop_prc,d.male_pop_prc,m.month,m.year,d.foreign_born_prc,d.native_american,a.avg_elevation_ft,\
                                    d.Asian,\
                                    d.hispanic_or_latino,\
                                    d.White,\
                                    d.Black,\
                                    m.origin_country,\
                                    t.City,\
                                    m.city_port_name,\
                                    COUNT(m.state_code) AS state_counter,\
                                    COUNT(m.city_port_name) AS city_counter\
                                    FROM temperature t\
                                    JOIN demographics d\
                                    ON d.State = t.City\
                                    JOIN immigration m\
                                    ON t.port_code = m.i94port\
                                    JOIN airport a\
                                    ON a.state=d.state_code\
                                    GROUP BY m.year,m.month, m.origin_country,\
                                    m.state_code,t.avg_temp,a.avg_elevation_ft,\
                                    d.female_pop_prc,male_pop_prc,d.foreign_born_prc,d.native_american,\
                                    d.Asian,d.hispanic_or_latino,\
                                    d.hispanic_or_latino,d.White,\
                                    d.Black,t.City,m.city_port_name""")


In [134]:
immigration_us.show(10)

+--------+--------------+------------+-----+------+----------------+---------------+----------------+-----+------------------+-----+-----+--------------+----------+--------------------+-------------+------------+
|avg_temp|female_pop_prc|male_pop_prc|month|  year|foreign_born_prc|native_american|avg_elevation_ft|Asian|hispanic_or_latino|White|Black|origin_country|      City|      city_port_name|state_counter|city_counter|
+--------+--------------+------------+-----+------+----------------+---------------+----------------+-----+------------------+-----+-----+--------------+----------+--------------------+-------------+------------+
|    12.0|         51.68|       48.32|  4.0|2016.0|           22.07|           1.94|           860.9| 6.07|             21.09|56.67|30.76|         107.0|  New York|  NEW YORK          |            1|           1|
|    12.0|         51.68|       48.32|  4.0|2016.0|           22.07|           1.94|           860.9| 6.07|             21.09|56.67|30.76|         2

In [135]:
immigration_us.count()

1236

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [136]:
# Perform quality checks here
immigration_us.select(isnull('year').alias('year'),\
                             isnull('month').alias('month'),\
                             isnull('origin_country').alias('country')).dropDuplicates().show()

+-----+-----+-------+
| year|month|country|
+-----+-----+-------+
|false|false|  false|
+-----+-----+-------+



In [137]:
# Count the total number of immigrants from the source table 
spark.sql('SELECT COUNT(*) as immigrants FROM immigration').show()

+----------+
|immigrants|
+----------+
|       940|
+----------+



In [138]:
def count_check(df):
    return df.count() == 0 

In [139]:
def integrity_check(df_immigration, df_temp):
    return df_immigration.select(col("i94port")).distinct() \
         .join(df_temp, df_immigration["i94port"] == df_temp["i94port"], "left_anti") \
         .count() == 0

In [140]:
def quality_check(df_immigration, df_temp):
    return count_check(df_immigration) and count_check(df_temp) \
        and integrity_check(df_immigration, df_temp)

In [145]:
quality_check(immigration_us,df_temp_us_valid_port_filtered)

False

In [142]:
count_check(df_demographics)

False

In [143]:
count_check(df_airports)

False

In [144]:
# people immigrant to united states from fact table
immigration_us.select(sum('city_counter').alias('fact_table_count')).show()

+----------------+
|fact_table_count|
+----------------+
|            2112|
+----------------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Please refer the data dictionary in the uppper section (dimentional tables and the fact table.)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. Apache Spark is chosen to read, transform, and create data outputs for further data analysis. The reason for this was due to the small amount of data and the speed of Spark.
2. The data should be updated quarterly. This gives the most up-to-date data for government and organizations.
3. If the data set increases by 100x, Amazon Redshift would be uesed. This is because it is an analytical database that can be optimized for aggregation and read-heavy workloads.
If the data needs to be populated and updated on a daily basis, Airflow would be used. DAGs can be created to retry and send emails on failures. 
If the database needs to be accessed by 100+ people, Redshift can be used to have the data stored. AWS workspace can be efficiently accessed by many users.