# Imigrations, aeroports, demographics and temperature data
### Data Engineering Capstone Project

#### Project Summary
This project aims to provide information on immigration in the United States, such as, which cities with the highest incidence of occurrences, which the average temperature of these cities over time, demographic data of the cities chosen for arrival, which airports at which occurrences were recorded and etc. For this, 4 data sources were used: 2016 I94 immigration data, OpenSoft demographic data, Kaggle temperature data and airport data. I designed 1 fact table, called immigration and 3 dimension tables, called airports, demographics and temperature.
    
#### The project follows the follow steps:

Step 1: Scope the Project and Gather Data <br>
Step 2: Explore and Assess the Data <br>
Step 3: Define the Data Model <br>
Step 4: Run pipelines to Model the Data <br>
Step 5: Complete Project Write Up <br>

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import glob
import re
import psycopg2
from datetime import datetime, timedelta
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType, IntegerType
from sql_queries2 import airport_insert, demographic_insert, immigration_insert, temperature_insert
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-redshift_2.10:2.0.1 pyspark-shell'

### Step 1: Scope the Project and Gather Data

#### Scope 
The aim of this project is to extract data from 4 different sources and create fact and dimension tables to be able to analyze US immigration using factors of temperature, demography and airports.

#### Describe and Gather Data 
1 - I94 Immigration Data: comes from the U.S. National Tourism and Trade Office and contains various statistics on international visitor arrival in USA and comes from the US National Tourism and Trade Office. The dataset contains data from 2016. <br>
2 - World Temperature Data: comes from Kaggle and contains average weather temperatures by city. <br>
3 - U.S. City Demographic Data: comes from OpenSoft and contains information about the demographics of all US cities such as average age, male and female population. <br>
4 - Airport Code Table: This is a simple table of airport codes and corresponding cities.   

#### I94 immigration data

In [2]:
# i94 data
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
i94_df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")
i94_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### Temperature data

In [3]:
# Temperature data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = pd.read_csv(fname)
temp_df.head(20)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


#### Show country values for Temperature data

In [4]:
set(temp_df["Country"].values)

{'Afghanistan',
 'Albania',
 'Algeria',
 'Angola',
 'Argentina',
 'Armenia',
 'Australia',
 'Austria',
 'Azerbaijan',
 'Bahamas',
 'Bahrain',
 'Bangladesh',
 'Belarus',
 'Belgium',
 'Benin',
 'Bolivia',
 'Bosnia And Herzegovina',
 'Botswana',
 'Brazil',
 'Bulgaria',
 'Burkina Faso',
 'Burma',
 'Burundi',
 'Cambodia',
 'Cameroon',
 'Canada',
 'Central African Republic',
 'Chad',
 'Chile',
 'China',
 'Colombia',
 'Congo',
 'Congo (Democratic Republic Of The)',
 'Costa Rica',
 'Croatia',
 'Cuba',
 'Cyprus',
 'Czech Republic',
 "Côte D'Ivoire",
 'Denmark',
 'Djibouti',
 'Dominican Republic',
 'Ecuador',
 'Egypt',
 'El Salvador',
 'Equatorial Guinea',
 'Eritrea',
 'Estonia',
 'Ethiopia',
 'Finland',
 'France',
 'Gabon',
 'Gambia',
 'Georgia',
 'Germany',
 'Ghana',
 'Greece',
 'Guatemala',
 'Guinea',
 'Guinea Bissau',
 'Guyana',
 'Haiti',
 'Honduras',
 'Hong Kong',
 'Hungary',
 'Iceland',
 'India',
 'Indonesia',
 'Iran',
 'Iraq',
 'Ireland',
 'Israel',
 'Italy',
 'Jamaica',
 'Japan',
 'Jorda

#### Show only United States Temperature data

In [5]:
temp_us_df = temp_df[temp_df["Country"] == "United States"]
temp_us_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


#### Demographics data

In [6]:
demo_df = pd.read_csv("./us-cities-demographics.csv", delimiter=";")
demo_df.head(20)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229.0,62432.0,118661,6634.0,7517.0,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712.0,41971.0,80683,4815.0,8355.0,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629.0,56860.0,108489,3800.0,37038.0,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762.0,43270.0,85032,5783.0,3269.0,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751.0,58077.0,109828,5204.0,16315.0,2.65,NC,Asian,11060


#### Airport data

In [7]:
port_df = pd.read_csv("./airport-codes_csv.csv")
port_df.head(20)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038.0,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87.0,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350.0,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Creating Spark Session

In [8]:
# Create Spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

#### I94 null count

In [9]:
i94_df.isnull().sum()

cicid             0
i94yr             0
i94mon            0
i94cit            0
i94res            0
i94port           0
arrdate           0
i94mode         239
i94addr      152372
depdate      142457
i94bir          802
i94visa           0
count             0
dtadfile          1
visapost    1881250
occup       3088187
entdepa         238
entdepd      138429
entdepu     3095921
matflag      138429
biryear         802
dtaddto         477
gender       414269
insnum      2982605
airline       83627
admnum            0
fltno         19549
visatype          0
dtype: int64

#### Temperature null count

In [10]:
temp_us_df.isnull().sum()

dt                                   0
AverageTemperature               25765
AverageTemperatureUncertainty    25765
City                                 0
Country                              0
Latitude                             0
Longitude                            0
dtype: int64

#### Airport null count

In [11]:
port_df.isnull().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

#### Demographics null count

In [12]:
demo_df.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

#### Cleaning Steps

In [None]:
!python data_cleaning.py

#### Registering port locations

In [5]:
# Get port locations from SAS text file
with open("./I94_SAS_Labels_Descriptions.SAS") as f:
    content = f.readlines()
content = [x.strip() for x in content]
ports = content[302:962]
splitted_ports = [port.split("=") for port in ports]
port_codes = [x[0].replace("'","").strip() for x in splitted_ports]
port_locations = [x[1].replace("'","").strip() for x in splitted_ports]
port_cities = [x.split(",")[0] for x in port_locations]
port_states = [x.split(",")[-1] for x in port_locations]
df_port_locations = pd.DataFrame({"port_code" : port_codes, "port_city": port_cities, "port_state": port_states})
df_port_locations.head(20)

Unnamed: 0,port_code,port_city,port_state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK
5,DTH,DUTCH HARBOR,AK
6,EGL,EAGLE,AK
7,FRB,FAIRBANKS,AK
8,HOM,HOMER,AK
9,HYD,HYDER,AK


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The star schema was chosen as a data model because it is simple and yet effective, with no need to make major changes to the data structure for this application, making it possible to join fact and dimension tables with simple queries to analyze the data.

<img src="data_model_dend_capstone.png">

#### 3.2 Mapping Out Data Pipelines

1 - Create tables by executing create_tables2.py. <br>
2 - Join city to I94 data. <br>
3 - Insert data.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Creating tables

In [19]:
!python create_tables2.py

#### Connecting database

In [20]:
# After running create_tables.py, insert the data into the database
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()

#### Join i94 and city data

In [6]:
df_i94_filtered = pd.read_csv("./df_i94_filtered.csv")
df_i94_filtered = df_i94_filtered.merge(df_port_locations, left_on="i94port", right_on="port_code")
df_i94_filtered.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,port_code,port_city,port_state
0,1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,ATL,ATLANTA,GA
1,13,28.0,2016.0,4.0,101.0,101.0,ATL,20545.0,ATL,ATLANTA,GA
2,14,29.0,2016.0,4.0,101.0,101.0,ATL,20545.0,ATL,ATLANTA,GA
3,15,30.0,2016.0,4.0,101.0,101.0,ATL,20545.0,ATL,ATLANTA,GA
4,16,31.0,2016.0,4.0,101.0,101.0,ATL,20545.0,ATL,ATLANTA,GA


In [7]:
df_i94_filtered = df_i94_filtered[["cicid","i94yr","i94mon","i94cit","i94res","i94port","arrdate","port_city"]]

In [8]:
df_i94_filtered.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,port_city
0,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,ATLANTA
1,28.0,2016.0,4.0,101.0,101.0,ATL,20545.0,ATLANTA
2,29.0,2016.0,4.0,101.0,101.0,ATL,20545.0,ATLANTA
3,30.0,2016.0,4.0,101.0,101.0,ATL,20545.0,ATLANTA
4,31.0,2016.0,4.0,101.0,101.0,ATL,20545.0,ATLANTA


#### Pivoting port data

In [24]:
clean_port_df = pd.read_csv("./clean_port_df.csv")
clean_port_df = clean_port_df[["iata_code", "name", "type", "local_code", "coordinates", "elevation_ft", "continent", "iso_country", "iso_region", "municipality", "gps_code"]]
clean_port_df.head(20)

Unnamed: 0,iata_code,name,type,local_code,coordinates,elevation_ft,continent,iso_country,iso_region,municipality,gps_code
223,UTK,Utirik Airport,small_airport,03N,"169.852005, 11.222",4.0,OC,MH,MH-UTI,Utirik Island,K03N
440,OCA,Ocean Reef Club Airport,small_airport,07FA,"-80.274803161621, 25.325399398804",8.0,,US,US-FL,Key Largo,07FA
594,PQS,Pilot Station Airport,small_airport,0AK,"-162.899994, 61.934601",305.0,,US,US-AK,Pilot Station,
673,CSE,Crested Butte Airpark,small_airport,0CO2,"-106.928341, 38.851918",8980.0,,US,US-CO,Crested Butte,0CO2
1088,JCY,LBJ Ranch Airport,small_airport,0TE7,"-98.62249755859999, 30.251800537100003",1515.0,,US,US-TX,Johnson City,0TE7


#### Load airport data

In [25]:
for index, row in clean_port_df.iterrows():
    cur.execute(airport_insert, list(row.values))
    conn.commit()

#### Load Demographics data

In [31]:
for index, row in demo_df.iterrows():
    cur.execute(demographic_insert, list(row.values))
    conn.commit()

#### Load immigration data

In [None]:
for index, row in df_i94_filtered.iterrows():
    cur.execute(immigration_insert, list(row.values))
    conn.commit()

#### Load Temperature data

In [32]:
for index, row in temp_us_df.iterrows():
    cur.execute(temperature_insert, list(row.values))
    conn.commit()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
!python quality_check.py

#### 4.3 Data dictionary 

#### Fact Table

**immigrations** <br>
&nbsp;&nbsp;&nbsp;&nbsp;cicid : immigration occurrence id <br>
&nbsp;&nbsp;&nbsp;&nbsp;year : immigration occurrence year<br>
&nbsp;&nbsp;&nbsp;&nbsp;month :  immigration occurrence month<br>
&nbsp;&nbsp;&nbsp;&nbsp;cit : 3 digit code of source city for immigration (Born country)<br>
&nbsp;&nbsp;&nbsp;&nbsp;res : 3 digit code of source country for immigration (Residence country)<br>
&nbsp;&nbsp;&nbsp;&nbsp;iata immigration occurrence month airport<br>
&nbsp;&nbsp;&nbsp;&nbsp;arrdate : arrival date in the USA<br>
&nbsp;&nbsp;&nbsp;&nbsp;city : immigration occurrence city<br>

#### Dimension tables

**airports** <br>
&nbsp;&nbsp;&nbsp;&nbsp;iata_code : Unique identifier<br>
&nbsp;&nbsp;&nbsp;&nbsp;name : Airport Name<br>
&nbsp;&nbsp;&nbsp;&nbsp;type : Type of the airport<br>
&nbsp;&nbsp;&nbsp;&nbsp;local_code : Local code of the airport<br>
&nbsp;&nbsp;&nbsp;&nbsp;coordinates : GPS coordinates of the airport<br>
&nbsp;&nbsp;&nbsp;&nbsp;elevation_ft : Altitude of the airport<br>
&nbsp;&nbsp;&nbsp;&nbsp;continent : Continent<br>
&nbsp;&nbsp;&nbsp;&nbsp;iso_country : ISO code of the country of the airport<br>
&nbsp;&nbsp;&nbsp;&nbsp;iso_region : ISO code for the region of the airport<br>
&nbsp;&nbsp;&nbsp;&nbsp;municipality : City where the airport is located<br>
&nbsp;&nbsp;&nbsp;&nbsp;gps_code : GPS code of the airport<br>

**demographics** <br>
&nbsp;&nbsp;&nbsp;&nbsp;city : Reference city<br>
&nbsp;&nbsp;&nbsp;&nbsp;state : Reference state<br>
&nbsp;&nbsp;&nbsp;&nbsp;media_age : Media age of reference city<br>
&nbsp;&nbsp;&nbsp;&nbsp;male_population : Male population count of reference city<br>
&nbsp;&nbsp;&nbsp;&nbsp;female_population : Female population count of reference city<br>
&nbsp;&nbsp;&nbsp;&nbsp;total_population : Total population count of reference city<br>
&nbsp;&nbsp;&nbsp;&nbsp;num_veterans : Veteran count of reference city<br>
&nbsp;&nbsp;&nbsp;&nbsp;foreign_born : Foreign born population count of reference city<br>
&nbsp;&nbsp;&nbsp;&nbsp;average_household_size : Average househound size of reference city<br>
&nbsp;&nbsp;&nbsp;&nbsp;state_code : State code <br>
&nbsp;&nbsp;&nbsp;&nbsp;race : Race class<br>
&nbsp;&nbsp;&nbsp;&nbsp;count : Number of individual of each race<br>

**temperature** <br>
&nbsp;&nbsp;&nbsp;&nbsp;timestamp : Measurement date<br>
&nbsp;&nbsp;&nbsp;&nbsp;average_temperature : Average temperature<br>
&nbsp;&nbsp;&nbsp;&nbsp;average_temperature_uncertainty : Average temperature uncertainty<br>
&nbsp;&nbsp;&nbsp;&nbsp;city : City<br>
&nbsp;&nbsp;&nbsp;&nbsp;country : Country<br>
&nbsp;&nbsp;&nbsp;&nbsp;latitude : Longitude<br>
&nbsp;&nbsp;&nbsp;&nbsp;longitude : Latitude <br>

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project. <br>
    Spark was chosen for this project because of its ability to process large amounts of data quickly, scale easily with additional work nodes, support various types of files and integrate well with cloud storage. <br>
* Propose how often the data should be updated and why. <br>
    Due to the reporting cycle criteria, and the periodicity of new data to be loaded, we are able to establish a monthly data update cycle. <br>
 * Write a description of how you would approach the problem differently under the following scenarios:
    
 * The data was increased by 100x. <br>
    Use Spark to perform the process in a distributed manner, using EMR or consider creating larger instances that also host Spark, as well as additional Spark nodes. <br>
 * The data populates a dashboard that must be updated on a daily basis by 7am every day. <br>
    We can consider scheduling and automating data pipeline processes through the use of Airflow. To meet the user's requirements, we could count on the retry and the integrated monitoring mechanism. <br>
 * The database needed to be accessed by 100+ people. <br>
    We can consider increasing our capacity to serve users by hosting our solution in the cloud and using Redshift, making it possible to manage the workload to ensure a satisfactory performance balance.