# Project Title
### Data Engineering Capstone Project

#### Project Summary
This porject aims to study which US cities are most popular for immigration. Providing data on demographics on the arrivals, such as gender, visa types, median ages, etc. As well as providing data on the climates of different cities to study any corelation between temperature and rate of immigrations. And airport data to identify trends in which airlines are most used and departure destinations.Spark was used for the ETL pipeline and data stored in parquet for analysis. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from data_quality_check import data_quality_check
from etl import create_valid_ports, clean_table, clean_i94, add_port
from create_model import create_model

In [2]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

**I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace.

In [3]:
# Read immigration file
immigration_fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
i94_df = spark.read.format('com.github.saurfang.sas.spark').load(immigration_fname)

In [4]:
# Display immigration file data
i94_df.show(n=5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

**World Temperature Data:** This dataset came from Kaggle. 

In [5]:
# Read temperature file
temperature_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = spark.read.format('csv').option('delimiter',',').option('header','true').load(temperature_fname)

In [6]:
temperature_df.show(n=5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



**U.S. City Demographic Data:** This data comes from OpenSoft.

In [7]:
# Read demographics file
demographics_fname = 'us-cities-demographics.csv'
demographics_df = spark.read.format('csv').option('delimiter',';').option('header','true').load(demographics_fname)

In [8]:
demographics_df.show(n=5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

**Airport Code Table:** This is a simple table of airport codes and corresponding cities.

In [9]:
# Read airport code file
airport_code_fname = 'airport-codes_csv.csv'
airport_code_df = spark.read.format('csv').option('delimiter',',').option('header','true').load(airport_code_fname)

In [10]:
airport_code_df.show(n=5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.



In [11]:
i94_df.cache()
i94_df.count()

3096313

In [12]:
temperature_df.cache()
temperature_df.count()

8599212

In [13]:
demographics_df.cache()
demographics_df.count()

2891

In [14]:
airport_code_df.cache()
airport_code_df.count()

55075

#### Cleaning Steps
Document steps necessary to clean the data

In [15]:
# Create a list of valid ports and drop empty columns 
dropped_i94_df = clean_i94(i94_df)

number of rows in i94 before clenaing: 3096313
number of rows in i94 after clenaing: 2995590


In [16]:
# Drop empty rows in i94 dataframe
cleaned_i94_df = clean_table(dropped_i94_df)

start cleaning 9.5367431640625e-07
number of rows in table before removing empty rows: 2995590
number of rows in table after removing empty rows: 2306750
finish cleaning 6.036678075790405
show
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-------+-------+-------+-------+--------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|entdepa|entdepd|matflag|biryear| dtaddto|gender|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-------+-------+-------+-------+--------+------+-------+--------------+-----+--------+
| 27.0|2016.0|   4.0| 101.0| 101.0|    BOS|20545.0|    1.0|     MA|20549.0|  58.0|    1.0|  1.0|20160401|      G|      O|      M| 1958.0|04062016|     M|     LH|9.247876383E10|00422|      B1|
| 28.0|2016.0|   4.0| 101.0| 101.0|    

In [None]:
# Drop empty rows in temperature dataframe
temperature_df = temperature_df.filter(temperature_df.AverageTemperature != 'NaN')
temperature_df = temperature_df.filter(temperature_df.AverageTemperature != 'null')
temperature_df.cache()
new_temperature_df = add_port(temperature_df)
new_temperature_df.cache()
new_temperature_df.count()
cleaned_temperature_df = clean_table(new_temperature_df)
cleaned_temperature_df.cache()

Add port
complete adding 0.06515860557556152
+----------+-------------------+-----------------------------+-----+-------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+-------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-06-01| 14.050999999999998|                        1.347|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-07-01|             16.082|                        1.396|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-09-01| 12.780999999999999|                        1.454|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-10-01|       

In [None]:
# Drop empty rows in demographics dataframe
new_demographics_df = add_port(demographics_df)
cleaned_demographics_df = clean_table(new_demographics_df)
fixed_cleaned_demographics_df = cleaned_demographics_df.withColumnRenamed('Median Age','median_age') \
                                                        .withColumnRenamed('Male Population', 'male_population') \
                                                        .withColumnRenamed('Female Population', 'female_population') \
                                                        .withColumnRenamed('Total Population', 'total_population') \
                                                        .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
                                                        .withColumnRenamed('Foreign-born', 'foreign_born') \
                                                        .withColumnRenamed('Average Household Size', 'average_household_size') \
                                                        .withColumnRenamed('State Code', 'state_code')

In [None]:
# Drop empty rows in airport code dataframe
cleaned_airport_code_df = clean_table(airport_code_df)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

| table | columns | description | type |
|---|---|---|---|
| Immigrations | cicid; i94yr; i94mon; i94cit; i94res; i94port; arrdate; i94mode; i94addr; depdate; i94bir; i94visa; count; dtadfile; entdepa; entdepd; matflag; biryear; dtaddto; gender; airline; admnum; fltno; visatype;  | Contains i94 immigration data | Fact Table |
| Temperature | dt; AverageTemperature; AverageTemperatureUncertainty;  City; Country; Latitude; Longitude;  | Contains temperature data | Dimension Table |
| Demographics | City; State; Median Age; Male Population; Female Population; Total Population; Number of Veterans; Foreign-born; Average Household Size; State Code; Race; Count;  | Contains airport data | Dimension Table |
| Airport | ident; type; name; elevation_ft; continent; iso_country; iso_region;  municipality; gps_code; iata_code; local_code; coordinates; | Contains demographicsdata | Dimension Table |

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Clean data.
2. Create fact table from immigrations file.
3. Create dimensions tables from temperature, airport code, and emographics files.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
%rm -rf ./output/
create_model(cleaned_i94_df, cleaned_temperature_df, fixed_cleaned_demographics_df, cleaned_airport_code_df)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
data_quality_check(cleaned_i94_df)
data_quality_check(cleaned_temperature_df)
data_quality_check(fixed_cleaned_demographics_df)
data_quality_check(cleaned_airport_code_df)

In [None]:
# Read parquet files
parI94 = spark.read.parquet("./output/immigration_table.parquet")
parTemp= spark.read.parquet("./output/temperature_table.parquet")
parDem = spark.read.parquet("./output/demographics_table.parquet")
parAir = spark.read.parquet("./output/airport_code_table.parquet")

parI94.createOrReplaceTempView("immigration")
parTemp.createOrReplaceTempView("temperature")
parDem.createOrReplaceTempView("demographics")
parAir.createOrReplaceTempView("airport")

parI94.show()
parTemp.show()
parDem.show()
parAir.show()

In [None]:
# Query parquet files to test successful ETL processing
# Get the average household size of all immagrant to Los Angeles that are older than ~30 years old
spark.sql("SELECT AVG(average_household_size) FROM immigration JOIN demographics on (immigration.i94port = demographics.i94port) WHERE demographics.city='Los Angeles' AND immigration.i94bir < 1980.0").show()

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### Dimension Tables:

**Temperature:**

| Feature | Description |
|---|---|
| dt | Datetime stamp |
| AverageTemperature | Avg Temp of city |
| AverageTemperatureUncertainty | Uncertainty of Avg Temp of city|
| City | City name |
| Country | Country name |
| Latitude | Latitude coordinates |
| Longitude | Longitude coordinates|

**Demographics:**

| Feature | Description |
|---|---|
| City | City name |
| State | State name|
| Median Age | Average age of residents |
| Male Population | Number of male residents |
| Female Population | Number of female residents |
| Total Population | Number of total residents |
| Number of Veterans | Number of residents that are veterans |
| Foreign-born | Number of residents not born in country |
| Average Household Size | Average size of residents in single house |
| State Code | Two letter state code |
| Race | Most promemnant race in country |
| Count | Count of largest race demographic |


**Airport Code:**

| Feature | Description |
|---|---|
| ident | Airport identity number |
| type | Type of airport by size |
| name | Airport name |
| elevation_ft | Elevation of airport in feet |
| continent | Continent of the airport |
| iso_country | Country of airport |
| iso_region | Region of airport within country |
| municipality | Municipality of airport |
| gps_code | GPS code |
| iata_code | IATA code |
| local_code | Local identity code |
| coordinates | Latitude and Longitude of airport |


### Fact Table:

**Immigration:**

| Feature | Description |
|---|---|
| cicid | Record ID |
| i94yr | 4 digit year|
| i94mon | Month |
| i94cit | Country of citizenship |
| i94res | Country of residence |
| i94port | Port of entry |
| arrdate | Arrivate date |
| i94mode | Mode of transportation |
| i94addr | State of arrival in USA |
| depdate | Departure date |
| i94bir | Birth year of respondant |
| i94visa | Visa type |
| count | Summary statistics |
| dtadfile | Date added to i94 files |
| entdepa | Arrival flag |
| entdepd | Departure flag |
| matflag | Match flag |
| biryear | Birth year |
| dtaddto | Date admitted to USA |
| gender | Gender of respondant |
| airline | Airline of entry |
| admnum | Admission number |
| fltno | Flight number |
| visatype | Type of visa held by repondant |




#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

### Rationale:
Using Spark allows for the ability to handle multiple file formats that contians many rows of data. And allows integration with Redshift or increasing nodes should requirments change.

### Data Update Schedule 
This depends on the update cycle of the data itself, in the ase of temperature data, since it is updated monthly, we can adopt a monthly update cycle.

### Scenarios:
- The data was increased by 100x.
    * Using Spark, we can scale up by increasing the number of worker nodes working on the data.
- The data populates a ddashboardashboard that must be updated on a daily basis by 7am every day.
    * Integration with Apache Airflow can be performed to allow a scheduled DAG to query the data everyday at 7am.
- The database needed to be accessed by 100+ people.
    * Data can be migrated to Redshift to allow auto-scaling capabilities to handle the load of increased access by users.