# Project Title
### Data Engineering Capstone Project

### IMPORTANT: HOW TO RUN 

* Please download the temperature data from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) 
* De-compress it and load it to the Workspace 
* Run the Capstone Jupyter Notebook


#### Project Summary
The goal of this project is to use several data sources to create a star schema like data model for analytical purposes such as finding out 

* from where
* to where
* at what age
* with which educational background

immigrants come to the United States in April 2016.

Additionally, the data can be used for creating correlation models e.g. to find out, if the immigrant's home country's temperature  has an influence on the state the immigrant migrates to.


The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

Start SparkSession

In [2]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. 

The projects overall goal is to create fact and dimension data sets in star schema form for analyzing immigration data from April 2016 to create a prototype that, if accepted by the use case owners, can later be scaled-up.

* **What data do you use?** US Immigration, airport key data, U.S. city demographics data, temperature data
* **What is your end solution look like?** Star schema with immigration fact table and dimension tables regarding airports, U.S. city demographics and average temperatures per city
* **What tools did you use?** Python pandas, Spark Data Frame functions

#### Describe, Gather and Explore Data 
Describe the data sets you're using. 

* **Where did it come from?** See data set descriptions below
* **What type of information is included?** See 4.3 Data Dictionnary for details

**I94_Immigration data:** 
Originally coming from the [US National Tourism and Trade Office](https://www.trade.gov/national-travel-and-tourism-office)


In [3]:
immi_df = pd.read_csv("immigration_data_sample.csv")
print(f"The dataset has {len(immi_df)} entries.")
immi_df.head()


The dataset has 1000 entries.


Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


Have a look at entire immigration data from April 2016 using spark:

In [4]:
immi_big_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

In [5]:
immi_big_df.show(n=5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [6]:
immi_big_df.cache()
immi_big_df.count()

3096313

**Airport data:** Originally from [Datahub.io](https://datahub.io/core/airport-codes#data)


Have a look at data using pandas

In [7]:
airport_df = pd.read_csv("airport-codes_csv.csv")
print(f"The dataset has {len(airport_df)} entries.")
airport_df.head()

The dataset has 55075 entries.


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


Load dataset in Spark for further work:

In [8]:
airport_df = spark.read.format('csv').option('delimiter',',').option('header','true').load("airport-codes_csv.csv")

Port keys are extracted from SAS-file using the following function:

In [9]:
def create_valid_ports():
    """
    Use the i94 labels SAS file to create valid port labels
    """
    # Create list of valid ports
    with open('I94_SAS_Labels_Descriptions.SAS') as f:
        lines = f.readlines()
    ports = []
    for line in lines[302:962]:
        ports.append(line.strip())
    ports_split = []    
    for port in ports:
        ports_split.append(port.split("="))
    port_codes = []
    for code in ports_split:
        port_codes.append(code[0].replace("'","").strip())
    port_locations = []
    for location in ports_split:
        port_locations.append(location[1].replace("'","").strip())
    port_cities = []
    for city in port_locations:
        port_cities.append(city.split(",")[0])
    port_states = []
    for state in port_locations:
        port_states.append(state.split(",")[-1])

    port_location_df = pd.DataFrame({"port_code" : port_codes, "port_city": port_cities, "port_state": port_states})

    return port_location_df

In [10]:
port_location_df = create_valid_ports()
print(f"The dataset has {len(port_location_df)} entries.")
port_location_df.head(10)

The dataset has 660 entries.


Unnamed: 0,port_code,port_city,port_state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK
5,DTH,DUTCH HARBOR,AK
6,EGL,EAGLE,AK
7,FRB,FAIRBANKS,AK
8,HOM,HOMER,AK
9,HYD,HYDER,AK


**U.S. City demographics data:** Originally from [Opensoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

In [11]:
cities_df = pd.read_csv("us-cities-demographics.csv", delimiter=";")
print(f"The dataset has {len(cities_df)} entries.")
cities_df.head()

The dataset has 2891 entries.


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


Load data in Spark:

In [12]:
cities_df = spark.read.format('csv').option('delimiter',';').option('header', 'true').load("us-cities-demographics.csv")

**World Temperature Data since 1750:**
Originally from [Kaggle](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data?resource=download)

In [14]:
temp_df = pd.read_csv("GlobalLandTemperaturesByCity.csv")
print(f"The dataset has {len(temp_df)} entries.")
temp_df.head()

The dataset has 1480204 entries.


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


Load data in Spark:

In [15]:
temp_df = spark.read.format('csv').option('delimiter',',').option('header','true').load("GlobalLandTemperaturesByCity.csv")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In this fictional case, the departments that will be using the data model later on demanded that there should be no missing values in the sets. Therefore, cleaning the data sets using `.dropna` is obligatory. For this reason, a function to drop all rows with NA entries from a given table is defined.

In [16]:

def clean_table(df, subset=None):
    """
    Drop empty rows in dataframe
    """
    print(f"number of rows in table before removing empty rows: {df.count()}")    
    clean_df = df.dropna(subset=subset)
    print(f"number of rows in table after removing empty rows: {clean_df.count()}")
    return clean_df

#### 2.1 Explore and clean immigration data 

The following impressions can be won from the "immigration_data_sample.csv" file:
* the imported data only contains data points from April 2016, as needed
* the columns "insnum", "entdepu", "occup", "visapost" contain mostly NA entries or are not needed for further analysis


In [17]:
immi_big_df.describe("i94mon").show()

+-------+-------+
|summary| i94mon|
+-------+-------+
|  count|3096313|
|   mean|    4.0|
| stddev|    0.0|
|    min|    4.0|
|    max|    4.0|
+-------+-------+



In [18]:
immi_big_df.describe("i94yr").show()

+-------+--------------------+
|summary|               i94yr|
+-------+--------------------+
|  count|             3096313|
|   mean|              2016.0|
| stddev|4.282829613261096...|
|    min|              2016.0|
|    max|              2016.0|
+-------+--------------------+



In [19]:
immi_big_df.describe(["insnum", "entdepu", "occup", "visapost"]).show()

+-------+-----------------+-------+-----------------+--------+
|summary|           insnum|entdepu|            occup|visapost|
+-------+-----------------+-------+-----------------+--------+
|  count|           113708|    392|             8126| 1215063|
|   mean|4131.050016327899|   null|          885.675|   999.0|
| stddev|8821.743471773656|   null|264.6551105950961|     0.0|
|    min|                0|      U|              049|     999|
|    max|           YM0167|      Y|              WTR|     ZZZ|
+-------+-----------------+-------+-----------------+--------+



Cleaning Immigration data: 

* take out data from airports that have the same name in city and state, as the data might be corrupted
* drop columns with mostly NA values, as they are not needed for the analysis
* drop all entries with NA entries

In [20]:
#take out data from ports that have same name in city and state
invalid_ports = list(set(port_location_df[port_location_df["port_city"] == port_location_df["port_state"]]["port_code"].values))

print(f"number of rows in immigration dataset before cleaning invalid ports: {immi_big_df.count()}")
filtered_immi_df = immi_big_df[~immi_big_df["i94port"].isin(invalid_ports)]
print(f"number of rows in immigration dataset after cleaning invalid ports: {filtered_immi_df.count()}")
# "~" is "not"

# Drop columns with mostly NA entries
dropped_columns = ("insnum", "entdepu", "occup", "visapost")
dropped_immi_df = filtered_immi_df.drop(*dropped_columns)

# Drop all NA entries in rows

number of rows in immigration dataset before cleaning invalid ports: 3096313
number of rows in immigration dataset after cleaning invalid ports: 2995590


In [21]:
immi_df_cleaned = clean_table(dropped_immi_df)

number of rows in table before removing empty rows: 2995590
number of rows in table after removing empty rows: 2306750


Immigration data is cleaned for transferring data into data model.

#### 2.2 Explore and clean Airport data

Have a look at entries with continent is "NA", as these ports might not be interesting for immigration, as they are too small.

In [22]:
airport_df.show(n=10)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [23]:
airport_df.describe("continent").show()

+-------+---------+
|summary|continent|
+-------+---------+
|  count|    55075|
|   mean|     null|
| stddev|     null|
|    min|       AF|
|    max|       SA|
+-------+---------+



How often does each class occur?

In [24]:
airport_df.select("continent").groupby("continent").count().show()

+---------+-----+
|continent|count|
+---------+-----+
|       NA|27719|
|       SA| 7709|
|       AS| 5350|
|       AN|   28|
|       OC| 3067|
|       EU| 7840|
|       AF| 3362|
+---------+-----+



It seems like most entries are NA. Let's find out, what types and names these have:

In [25]:
print(f"There are {airport_df.select(['type', 'name']).where(airport_df.continent == 'NA').count()} airports with NA as continent. Their types are the following:")
airport_df.select(['type', 'name']).where(airport_df.continent == 'NA').groupby('type').count().orderBy('type').show()

There are 27719 airports with NA as continent. Their types are the following:
+--------------+-----+
|          type|count|
+--------------+-----+
|   balloonport|   19|
|        closed| 2072|
|      heliport| 7035|
| large_airport|  209|
|medium_airport| 1247|
| seaplane_base|  956|
| small_airport|16181|
+--------------+-----+



As most Ariports with 'NA' as continent seem too small to be international airports for immigration, we can drop them.

In [26]:
airport_df_cleaned = airport_df.filter(airport_df.continent != 'NA')

In [27]:
print(f"There are in total {airport_df_cleaned.count() + airport_df.select(['type', 'name']).where(airport_df.continent == 'NA').count()} \
airports, {airport_df_cleaned.count()} with continents other than NA")

There are in total 55075 airports, 27356 with continents other than NA


Now, we need to delete all other entries with missing values:

In [28]:
airport_df_cleaned_final = clean_table(airport_df)

number of rows in table before removing empty rows: 55075
number of rows in table after removing empty rows: 2746


After cleaning the airport table, there are 198 large airports remaining that can be used for analysis.

In [29]:
airport_df_cleaned_final.groupby('type').count().show()

+--------------+-----+
|          type|count|
+--------------+-----+
| large_airport|  198|
| seaplane_base|   54|
|      heliport|   19|
|        closed|   24|
|medium_airport|  876|
| small_airport| 1575|
+--------------+-----+



#### 2.3 Explore and clean U.S. City Demographics data

The cities data has just a small amount of missing values. Additionally, the `port key` data set is used to add the necessary airport to every city as the data sets primary key where it is possible.

In [30]:
cities_df.summary('count').show()

+-------+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|summary|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+-------+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|  count|2891| 2891|      2891|           2888|             2888|            2891|              2878|        2878|                  2875|      2891|2891| 2891|
+-------+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



Functions used for adding airports to cities using function `create_valid_ports()` defined in the beginning of the document:

In [31]:
from pyspark.sql.functions import udf
@udf()
def get_port(city):
    """
    Add valid port based on city data
    
    Key Arguments:
    city -- column from a data frame containing names of cities
    """
    port_location_df = create_valid_ports()
    for port_city in port_location_df.port_city:
        if city.lower() in port_city.lower():
            return port_location_df[port_location_df["port_city"]==port_city]["port_code"].values[0]

In [32]:
def add_port(df):
    """
    Add valid ports to table to allow joining of tables
    
    Key Arguments:
    df -- dataframe containing at least a column called 'City' containing city names 
    """
    port_location_df = create_valid_ports()
    df = df.withColumn("i94port", get_port(df.City))
    new_df = df.filter(df.i94port != 'null')
    return new_df

In [33]:
#clean demographics data set and add ports as primary key
cities_df = add_port(cities_df)
cities_df = clean_table(cities_df)

number of rows in table before removing empty rows: 879
number of rows in table after removing empty rows: 875


In [34]:
# change columns names for better usability
cities_df_final = cities_df.withColumnRenamed('Median Age','median_age') \
                                                        .withColumnRenamed('Male Population', 'male_population') \
                                                        .withColumnRenamed('Female Population', 'female_population') \
                                                        .withColumnRenamed('Total Population', 'total_population') \
                                                        .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
                                                        .withColumnRenamed('Foreign-born', 'foreign_born') \
                                                        .withColumnRenamed('Average Household Size', 'average_household_size') \
                                                        .withColumnRenamed('State Code', 'state_code')

In [35]:
cities_df_final.show(n=5)

+------------+------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+-------+
|        City|       State|median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|state_code|                Race| Count|i94port|
+------------+------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+-------+
|      Newark|  New Jersey|      34.6|         138040|           143873|          281913|              5829|       86253|                  2.73|        NJ|               White| 76402|    NEW|
|      Peoria|    Illinois|      33.1|          56229|            62432|          118661|              6634|        7517|                   2.4|        IL|American Indian a...|  1343|    PIA|
|Philadelphia|Pennsylvania|      34.1|  

U.S. City Demographics Data is now cleaned and ready for model building.

#### 2.4 Explore and clean Temperature data

In [36]:
temp_df.orderBy('dt', ascending=False).show(n=5)

+----------+------------------+-----------------------------+---------+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|     City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+---------+-------+--------+---------+
|2013-09-01|              null|                         null| A Coruña|  Spain|  42.59N|    8.73W|
|2013-09-01|              null|                         null|Abakaliki|Nigeria|   5.63N|    8.07E|
|2013-09-01|              null|                         null|   Aachen|Germany|  50.63N|    6.34E|
|2013-09-01|              null|                         null|    Çorlu| Turkey|  40.99N|   27.69E|
|2013-09-01|              null|                         null|  Aalborg|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+---------+-------+--------+---------+
only showing top 5 rows



In [37]:
temp_df.select(max(col('dt'))).show(n=5)

+----------+
|   max(dt)|
+----------+
|2013-09-01|
+----------+



Data set only contains data until 2013, so we can only use average temperatures from 2013 for a regression model that compares city temperatures. 

If the prototype should be scaled, the temperature data should be more up-to-date. The 2013 data is enough, though, to create a proof of concept.

Cleaning involves:

* filtering for entries from 2013 or newer
* only using entries that are not `null`
* adding ports using the `add_port()` function defined beforehand
* using the `clean_table()` function to get rid of remaining entries that are `null`

In [38]:
temp_df_cleaned = temp_df.filter(temp_df.dt >= '2013-01-01')
temp_df_cleaned = temp_df_cleaned.filter(temp_df_cleaned.AverageTemperature != 'null')
temp_df_cleaned = add_port(temp_df_cleaned)
temp_df_cleaned_final = clean_table(temp_df_cleaned)

number of rows in table before removing empty rows: 364
number of rows in table after removing empty rows: 364


In [39]:
temp_df_cleaned_final.show(n=5)

+----------+------------------+-----------------------------+--------+--------------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|    City|       Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+--------+--------------+--------+---------+-------+
|2013-01-01|5.1930000000000005|           0.7040000000000001|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|2013-02-01|4.1160000000000005|                        0.614|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|2013-03-01|             2.786|                         0.43|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|2013-04-01|             5.374|                        0.718|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
|2013-05-01|             8.238|           0.5870000000000001|Aberdeen|United Kingdom|  57.05N|    1.48W|    ABE|
+----------+------------------+-----------------------------+--------+--------------+--------+--

In [40]:
temp_df_cleaned_final.count()

364

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model:

The data model consists of an `Immigration` fact table and three dimension tables: `Temperature`, `Demographics` and `Airport`. The Tables `Temperature` and `Demographics` can be connected to the fact table using the `i94port` key, which allows the creation of regression models using temperatures or locations where immigrants come from or go to.

| table | columns | description | type |
|---|---|---|---|
| Immigrations | cicid; i94yr; i94mon; i94cit; i94res; i94port; arrdate; i94mode; i94addr; depdate; i94bir; i94visa; count; dtadfile; entdepa; entdepd; matflag; biryear; dtaddto; gender; airline; admnum; fltno; visatype;  | Contains i94 immigration data | Fact Table |
| Temperature | dt; AverageTemperature; AverageTemperatureUncertainty;  City; Country; Latitude; Longitude; i94port; | Contains temperature data | Dimension Table |
| Demographics | City; State; Median Age; Male Population; Female Population; Total Population; Number of Veterans; Foreign-born; Average Household Size; State Code; Race; Count; i94port; | Contains city demographics data  | Dimension Table |
| Airport | ident; type; name; elevation_ft; continent; iso_country; iso_region;  municipality; gps_code; iata_code; local_code; coordinates; | Contains airport data | Dimension Table |

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Clean provided data sets 
2. Create fact table
3. Create dimention tables
4. Run Data Quality Checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

To build the data model, several functions have to be defined to create the final tables:

##### 4.1.1 Create fact table

In [41]:
def write_immigration_data(immi_df):
    """
    Write immigration data to parquet
    """
    immigration_table = immi_df.select(["cicid" ,"i94yr" ,"i94mon" ,"i94cit" ,"i94res" ,"i94port" ,"arrdate" \
                                               ,"i94mode" ,"i94addr" ,"depdate" ,"i94bir" ,"i94visa" ,"count" ,"dtadfile" \
                                               ,"entdepa" ,"entdepd" ,"matflag" ,"biryear" ,"dtaddto" ,"gender" ,"airline" \
                                               ,"admnum" ,"fltno" ,"visatype"])
    immigration_table.write.mode("append").partitionBy("i94port").parquet("./output/immigration_table.parquet")

##### 4.1.2 Create the dimension tables

In [42]:
   
def write_temperature_data(temp_df_cleaned_final):
    """
    Write temperature data to parquet
    """
    temperature_table = temp_df_cleaned_final.select(["dt" ,"AverageTemperature" ,"AverageTemperatureUncertainty" ,"City" \
                                                       ,"Country" ,"Latitude" ,"Longitude","i94port"])
    temperature_table.write.mode("append").partitionBy("i94port").parquet("./output/temperature_table.parquet")
    
def write_demographics_data(cities_df_final):
    """
    Write demographics data to parquet
    """
    demographics_table = cities_df_final.select(["City" ,"State" ,"median_age" ,"male_population" \
                                                               ,"female_population" ,"total_population" \
                                                               ,"number_of_veterans" ,"foreign_born" \
                                                               ,"average_household_size" ,"state_code" ,"Race" ,"Count","i94port"])
    demographics_table.write.mode("append").partitionBy("i94port").parquet("./output/demographics_table.parquet")
    
def write_airport_code_data(airport_df_cleaned_final):
    """
    Write demographics data to parquet
    """
    airport_code_table = airport_df_cleaned_final.select(["ident" ,"type" ,"name" ,"elevation_ft" ,"continent" ,"iso_country" ,"iso_region" ,"municipality" ,"gps_code" ,"iata_code" ,"local_code" ,"coordinates"])
    airport_code_table.write.mode("append").partitionBy("iata_code").parquet("./output/airport_code_table.parquet")

Create all tables as `parquet` files to create entire model:

In [43]:
%rm -rf ./output/
write_immigration_data(immi_df_cleaned)
write_temperature_data(temp_df_cleaned_final)
write_demographics_data(cities_df_final)
write_airport_code_data(airport_df_cleaned_final)

Now, all fact and dimension tables are available as parquet-files and can be read for further analysis

#### 4.2 Data Quality Checks
Two quality checks will be included:

* check if table exists
* check if table is empty

Define functions for quality checks and a function to run all checks on a data table:

In [44]:
def table_exists(df):
    """
    Checks if the dataframe that was entered exists
    """
    return df is not None

# Check if table is empty
def table_empty(df):
    """
    Checks if the dataframe that was entered contains any rows
    """
    return df.count() != 0

def data_quality_check(df):
    """
    Checks if the dataframe entered exists and contains any data 
    """
    if table_exists(df):
        print("Data quality check #1 passed, table exists\n")
    else:
        print("Data quality check #1 failed, table is missing\n")
    if table_empty(df):
        print("Data quality check #2 passed, table not empty\n")
    else:
        print("Data quality check #2 failed, table is empty\n")

Run Data Quality Checks for every table:

In [45]:
print("Immigration table check")
data_quality_check(immi_df_cleaned)

print("Temperature table check")
data_quality_check(temp_df_cleaned_final)

print("U.S. City Demographics table check")
data_quality_check(cities_df_final)

print("Airport table check")
data_quality_check(airport_df_cleaned_final)

Immigration table check
Data quality check #1 passed, table exists

Data quality check #2 passed, table not empty

Temperature table check
Data quality check #1 passed, table exists

Data quality check #2 passed, table not empty

U.S. City Demographics table check
Data quality check #1 passed, table exists

Data quality check #2 passed, table not empty

Airport table check
Data quality check #1 passed, table exists

Data quality check #2 passed, table not empty



Read data tables from parquet files to use them for example query:

In [50]:
# Read parquet files
parI94 = spark.read.parquet("./output/immigration_table.parquet")
parTemp= spark.read.parquet("./output/temperature_table.parquet")
parDem = spark.read.parquet("./output/demographics_table.parquet")
parAir = spark.read.parquet("./output/airport_code_table.parquet")

parI94.createOrReplaceTempView("immigration")
parTemp.createOrReplaceTempView("temperature")
parDem.createOrReplaceTempView("demographics")
parAir.createOrReplaceTempView("airport")

In [49]:
# have a look at tables, if necessary
parI94.show(n=5)
parTemp.show(n=5)
parDem.show(n=5)
parAir.show(n=5)

+---------+------+------+------+------+-------+-------+-------+-------+------+-------+-----+--------+-------+-------+-------+-------+--------+------+-------+---------------+-----+--------+-------+
|    cicid| i94yr|i94mon|i94cit|i94res|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|entdepa|entdepd|matflag|biryear| dtaddto|gender|airline|         admnum|fltno|visatype|i94port|
+---------+------+------+------+------+-------+-------+-------+-------+------+-------+-----+--------+-------+-------+-------+-------+--------+------+-------+---------------+-----+--------+-------+
|5750212.0|2016.0|   4.0| 254.0| 209.0|20574.0|    1.0|     GU|20576.0|  62.0|    2.0|  1.0|20160430|      G|      O|      M| 1954.0|07282016|     M|     UA|5.9504946533E10|00150|      WT|    AGA|
|5750213.0|2016.0|   4.0| 254.0| 209.0|20574.0|    1.0|     GU|20576.0|  44.0|    2.0|  1.0|20160430|      G|      O|      M| 1972.0|07282016|     F|     UA|5.9504949233E10|00150|      WT|    AGA|
|5750214.0|2016

**Example Query**: find out, if cities with the most immigrants are indeed cities with the largest foreign-born counts.

Make sure that column `count` in `immigration` only contains `1` as entry so that columns `cicid` can be used to count total immigrants.

In [53]:
parI94.select('count').groupby('count').count().show()

+-----+-------+
|count|  count|
+-----+-------+
|  1.0|2306750|
+-----+-------+



Run Query:

In [82]:
example_query = spark.sql('''
        SELECT city AS City, CAST(foreign_born AS INT) AS No_Foreign_Born, COUNT(cicid) AS No_Immigrants
        FROM immigration JOIN demographics on (immigration.i94port = demographics.i94port)
        GROUP BY City, No_Foreign_Born
        ORDER BY (No_Immigrants, No_Foreign_Born) DESC
        ''')

In [83]:
example_query.show(n=20)

+---------------+---------------+-------------+
|           City|No_Foreign_Born|No_Immigrants|
+---------------+---------------+-------------+
|       New York|        3212500|      1871590|
|          Miami|         260789|      1386775|
|    Los Angeles|        1485425|      1191015|
|  San Francisco|         297199|       657900|
|         Newark|          86253|       635170|
|        Orlando|          50558|       582210|
|        Chicago|         573463|       491460|
|        Houston|         696210|       419280|
|Fort Lauderdale|          47582|       382450|
|      Las Vegas|         127609|       381630|
|        Atlanta|          32016|       301930|
|         Dallas|         326825|       242265|
|         Boston|         190123|       203270|
|        Seattle|         119840|       181595|
|        Phoenix|         300702|       154250|
|        Detroit|          39861|       120525|
|   Philadelphia|         205339|       111780|
|          Tampa|          58795|       

After retrieving the data from the query, it is saved as `pandas` dataframe and a correlation matrix is created, which shows a high correlation between the number of immigrants and the number of foreign-born citizens.


This shows that cities with the most immigrants are indeed cities with the largest foreign-born counts.

In [89]:
# write to pandas dataframe and create correlation matrix
example_query_pd = example_query.toPandas()
print(example_query_pd.corr())

                 No_Foreign_Born  No_Immigrants
No_Foreign_Born         1.000000       0.795511
No_Immigrants           0.795511       1.000000


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### Dimension Tables:

**Temperature:**

| Feature | Description |
|---|---|
| dt | Datetime stamp |
| AverageTemperature | Avg Temp of city |
| AverageTemperatureUncertainty | Uncertainty of Avg Temp of city|
| City | City name |
| Country | Country name |
| Latitude | Latitude coordinates |
| Longitude | Longitude coordinates|
| i94port | Port of entry |

**Demographics:**

| Feature | Description |
|---|---|
| City | City name |
| State | State name|
| Median Age | Average age of residents |
| Male Population | Number of male residents |
| Female Population | Number of female residents |
| Total Population | Number of total residents |
| Number of Veterans | Number of residents that are veterans |
| Foreign-born | Number of residents not born in country |
| Average Household Size | Average size of residents in single house |
| State Code | Two letter state code |
| Race | Most promemnant race in country |
| Count | Count of largest race demographic |
| i94port | Port of entry |

**Airport Code:**

| Feature | Description |
|---|---|
| ident | Airport identity number |
| type | Type of airport by size |
| name | Airport name |
| elevation_ft | Elevation of airport in feet |
| continent | Continent of the airport |
| iso_country | Country of airport |
| iso_region | Region of airport within country |
| municipality | Municipality of airport |
| gps_code | GPS code |
| iata_code | IATA code |
| local_code | Local identity code |
| coordinates | Latitude and Longitude of airport |


### Fact Table:

**Immigration:**

| Feature | Description |
|---|---|
| cicid | Record ID |
| i94yr | 4 digit year|
| i94mon | Month |
| i94cit | Country of citizenship |
| i94res | Country of residence |
| i94port | Port of entry |
| arrdate | Arrivate date |
| i94mode | Mode of transportation |
| i94addr | State of arrival in USA |
| depdate | Departure date |
| i94bir | Birth year of respondant |
| i94visa | Visa type |
| count | Summary statistics |
| dtadfile | Date added to i94 files |
| entdepa | Arrival flag |
| entdepd | Departure flag |
| matflag | Match flag |
| biryear | Birth year |
| dtaddto | Date admitted to USA |
| gender | Gender of respondant |
| airline | Airline of entry |
| admnum | Admission number |
| fltno | Flight number |
| visatype | Type of visa held by repondant |




#### Step 5: Complete Project Write Up

##### 5.1 Clearly state the rationale for the choice of tools and technologies for the project.
* Spark was chosen as technology as at least two of the data sets used contain more than one million rows and might receive updates daily. 

* The immigration data could even be used with streaming data which can be well handled by Spark. 

* In addition, Spark can deal with rapidly increasing amounts of data, e.g. in case of an international conflict that leads to a spike in immigration cases.

##### 5.2 Propose how often the data should be updated and why
The fact table should be updated monthly, as new immigration is published monthly.
The dimension tables should be updated less often, e.g. checked every month for inconsistencies or if they are out of date. If they are not up-to-date, they should be updated immediatly.

##### 5.3 Write a description of how you would approach the problem differently under the following scenarios:
 - **The data was increased by 100x:** To scale up Spark, the number of worker nodes can simply be increased to handle higher volumes of data
 - **The data populates a dashboard that must be updated on a daily basis by 7am every day:** 
 If the prototype is accepted by the Use Case Owner, Apache Airflow can be used in the scaled-up version to run the ETL process as a scheduled DAG so that it finishes every day by 7am
 - **The database needed to be accessed by 100+ people**:
 If a database is needed, it could be migrated to AWS Redshift to allow auto-scaling to handle an increasing amount of users.

#### 6 Delete Output, if necessary

In [51]:
%rm -rf ./output/