# I94 - IMMIGRATION AND TEMPERATURES
### Data Engineering Capstone Project

#### Project Summary

The Immigration and Temperature project is a ETL that gets the information from the different files provided by Udacity, clean them and stores them in a Data Warehouse for future investigation in the correlation of temperature and immigration.

The project is divided in two main parts, first an exploration of the provided data and second the creation of the ETL.

For the first point, the first step will be an **exploration of the data**. Each table will be loaded using Spark, the number of rows will be counted and the schema will be studied and tranformed into a Data Dictionary. Second step it will be testing and cleaning the data. By finding duplicate records, measuring the number of NULL rows or droping columns without interest, it will be possible to design the cleaning transformation. Finnaly, with all that exploration of the data, we will be able to define the Data Model the data must have at the end.

The second part if the project is **the ETL creation**. Here the data model created in the previous steps will be sumarized in a script where it also loads it to a Data Warehouse. For this, the data will be first written as parquet files in a S3 bucket and copied from the to a Redshift cluster. The script must also create the database, dropping old tables and make a quality check to prove that everything has runned well.

## Import libraries

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import configparser
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import functions
import re

## Load Configuration Data

In [2]:
config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

## Spark Session Init

In [3]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

spark = SparkSession.builder.getOrCreate()

## Step 1: Scope the Project and Gather Data

### Scope 

The scope of this project is to create a database with the follwing data: Immigration data provided by the i94 Arrival and Departure form, historic temperature data from Berkeley Earth data page, US demographics information for all cities in the US bigger than 65.000 persons, provided by US Census Bureau's 2015 American Community Survey and an Airport IATA code list provided by www.ourairports.com.

This data must be studied to understand the transformation process needed before loading it in a data base for the analytical use of the researchers. 

The tools that are going to be use is Jupyter Notebook for the testing, Python as the main code language and Apache Spark as the dataframe engine for the large dataset we are going to explore. Finally, AWS tools Redshift and S3 will be used for storage purposes.

### Describe and Gather Data 

After the exploration of each file, a data dictionay with the description of each field is included.


### I94 Immigration data

In [4]:
i94data_df = spark.read.load('./sas_data')
i94data_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [5]:
i94data_df.count()

3096313

In [6]:
i94data_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

**Data dictionary: I94 Immigration data**

| Column | Description   |
|--------|---------------|
| CICID | Id |
| I94YR | 4 digit year |
| I94MON | Numeric month |
| I94CIT & I94RES | Code for immigrant country of birth and residence |
| I94PORT | Code for immigrant port of arrival |
| ARRDATE | Arrival Date in the USA. |
| I94MODE | Mode of transport (1 = Air, 2 = Sea, 3 = Land, 9 = Not reported) |
| I94ADDR | Arrival USA state |
| DEPDATE | Departure Date from the USA |
| I94BIR | Age of Respondent in Years |
| I94VISA | Visa codes collapsed into three categories (1 = Business, 2 = Pleasure, 3 = Student)|
| COUNT | Used for summary statistics |
| DTADFILE | Character Date Field - Date added to I-94 Files |
| VISAPOST | Department of State where where Visa was issued |
| OCCUP | Occupation that will be performed in U.S. |
| ENTDEPA | Arrival Flag - admitted or paroled into the U.S. |
| ENTDEPD | Departure Flag - Departed, lost I-94 or is deceased |
| ENTDEPU | Update Flag - Either apprehended, overstayed, adjusted to perm residence |
| MATFLAG | Match flag - Match of arrival and departure records |
| BIRYEAR | 4 digit year of birth |
| DTADDTO | Date to which admitted to U.S. (allowed to stay until) |
| GENDER | Non-immigrant sex |
| INSNUM | INS number |
| AIRLINE | Airline used to arrive in U.S. |
| ADMNUM | Admission Number |
| FLTNO | Flight number of Airline used to arrive in U.S. |
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

### Temperature data

In [11]:
temp_df = spark.read.option("header", True).csv('GlobalLandTemperaturesByCity.csv')
temp_df.where(temp_df.Country == 'United States').limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1820-01-01,2.1010000000000004,3.217,Abilene,United States,32.95N,100.53W
1,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
2,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
3,1820-04-01,17.988999999999994,2.202,Abilene,United States,32.95N,100.53W
4,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [8]:
temp_df.count()

8599212

In [9]:
temp_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



**Data Dictionary: Temperature data**

| Column | Description |
|--------|---------------|
| dt | Date of the measurement |
| AverageTemperature | Average temperature in the location on the day |
| AverageTemperatureUncertainty | Uncertainty of the measurement |
| City | City where the measurement was done |
| Country | Country where the measurement was done |
| Latitude & Longitude | Coordintates of the city |

### US demographics

In [10]:
demo_df = spark.read.options(header = True, delimiter=';').csv('us-cities-demographics.csv')
demo_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [11]:
demo_df.filter(demo_df.City == "Newark").show()

+------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|  City|     State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|Newark|New Jersey|      34.6|         138040|           143873|          281913|              5829|       86253|                  2.73|        NJ|               White| 76402|
|Newark|New Jersey|      34.6|         138040|           143873|          281913|              5829|       86253|                  2.73|        NJ|Black or African-...|144961|
|Newark|New Jersey|      34.6|         138040|           143873|          281913|              5829|       86253|       

In [12]:
demo_df.count()

2891

In [13]:
demo_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



**Data Dictionary: US demographics**

| Column | Description |
|--------|---------------|
| City | Name of the city |
| State | US state of the city |
| Median Age | Median age of the registered population |
| Male population | Registered male population |
| Female population | Registered female population |
| Total population | Total registered population |
| Number of Veterans | Total registered veteran population |
| Foreign-born | Total number of registered population born in a different country |
| Average Household Size | Average number of people living in the same household |
| State Code | US state code of the city |
| Race & Count |  Count of registered population per race |

### Airport data

In [14]:
airp_df = spark.read.options(header = True).csv('airport-codes_csv.csv')
airp_df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [15]:
airp_df.count()

55075

In [16]:
airp_df.groupby("type").count().show()

+--------------+-----+
|          type|count|
+--------------+-----+
| large_airport|  627|
|   balloonport|   24|
| seaplane_base| 1016|
|      heliport|11287|
|        closed| 3606|
|medium_airport| 4550|
| small_airport|33965|
+--------------+-----+



In [17]:
airp_df.select("gps_code", "local_code").filter(airp_df.gps_code != airp_df.local_code).count()

4797

In [18]:
airp_df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



**Data Dictionary: Airport data**

| Column | Description |
|--------|---------------|
| Ident | Identification code |
| Type | Type of airport |
| Name | Name of the airport |
| Elevation_ft | Elevation over sea level in feet |
| Continent | Continent of location |
| Iso_country | ISO 3166-1 Alapha-2 code of the country | 
| Iso_region | ISO 3166-2 Alapha-2 code of the region | 
| Municipality | Municipality where the port is situated |
| Gps_code | *UNKNOW* |
| Iata_code | International Air Transport Association airport code |
| Local_code | *UNKNOW* |
| Coordinates | Longitude and latitude of the airport |

## Step 2: Explore and Assess the Data
### Explore the Data 
The first thing we are going to do is find which columns have a low percentage of not null fields, in order to drop the columns, the Integrity check. Then, we will find the number of duplicate rows to erase them, the Duplicate check. And finally, we will set which data is useless to delete it, the Useless check.

### Cleaning Steps
Right after detecting the quality problems explained above, we will clean the data.

Both steps will be done at once.

#### - First check: Integrity

Drop columns with an integrity (% of rows not null) below 80%.

1. I94 Immigration data 

In [19]:
functions.integrity(i94data_df)

Unnamed: 0,Column,Values,% integrity
18,entdepu,392,0.01266
15,occup,8126,0.262441
23,insnum,113708,3.672368
14,visapost,1215063,39.242254
22,gender,2682044,86.620571
8,i94addr,2943721,95.071816
9,depdate,2953856,95.399141
19,matflag,2957884,95.529231
17,entdepd,2957884,95.529231
24,airline,3012686,97.299143


In [20]:
# Add here the columns that you want to delete
del_cols = ['entdepu', 'occup', 'insnum', 'visapost']
old_i94data_df = i94data_df
i94data_df = i94data_df.drop(*del_cols)

2. Temperature data 

In [21]:
functions.integrity(temp_df)

Unnamed: 0,Column,Values,% integrity
1,AverageTemperature,8235082,95.765542
2,AverageTemperatureUncertainty,8235082,95.765542
0,dt,8599212,100.0
3,City,8599212,100.0
4,Country,8599212,100.0
5,Latitude,8599212,100.0
6,Longitude,8599212,100.0


There are no columns under 80% of integrity.

3. US Demographics data 

In [22]:
functions.integrity(demo_df)

Unnamed: 0,Column,Values,% integrity
8,Average Household Size,2875,99.446558
6,Number of Veterans,2878,99.550329
7,Foreign-born,2878,99.550329
3,Male Population,2888,99.89623
4,Female Population,2888,99.89623
0,City,2891,100.0
1,State,2891,100.0
2,Median Age,2891,100.0
5,Total Population,2891,100.0
9,State Code,2891,100.0


There are no columns under 80% of integrity.

4. Airport data

In [23]:
functions.integrity(airp_df)

Unnamed: 0,Column,Values,% integrity
9,iata_code,9189,16.684521
10,local_code,28686,52.085338
8,gps_code,41030,74.498411
3,elevation_ft,48069,87.279165
7,municipality,49399,89.694054
0,ident,55075,100.0
1,type,55075,100.0
2,name,55075,100.0
4,continent,55075,100.0
5,iso_country,55075,100.0


Iata_code is only for airports with type "large_airport". It shouldn't be deleted.

In [24]:
del_cols = ['local_code', 'gps_code']
old_airp_df = airp_df
airp_df = airp_df.drop(*del_cols)

#### - Second check: Duplicates
Dropping the rows duplicated according to the unique columns.


1. I94 Immigration data 

In [25]:
col = (['cicid'])
i94data_df = functions.d_duplicates(i94data_df, col)

Number of duplicated rows: 0


2. Airport data

In [26]:
col = (['ident'])
airp_df = functions.d_duplicates(airp_df, col)

Number of duplicated rows: 0


No duplicate rows in any table.

#### - Third check: Useless data
According to the explored data, we have found that Temp_df has a lot of records with value 'None' that can be deleted. Also, as the demographics and airport info is only related to the US, we can drop the records from other countries.

In [27]:
temp_df = temp_df.drop().where(temp_df.AverageTemperature == 'None')
temp_df = temp_df.filter(temp_df.AverageTemperature != 'None')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

The Data Model is going to be a star/snowflake schema that joins the for important pieces of information: Immigration, Airports, Cities and Temperatures. In that way, the finnal user will be able to create queries to extract the desired information.

**Fact Table** 

- Immigation_table

| Column | TYPE | Other |
|--------|----------|----|
| Id | INT | PRIMARY |
| Arrival_date | INT | KEY of Time Table |
| Departure_date | INT | KEY of Time Table |
| Birth_country | TEXT ||
| Residence_country | TEXT ||
| Port | TEXT | KEY of Airport Table |
| Age | INT ||
| Birth year | INT ||
| Visa | TEXT ||
| Gender | CHAR ||
| Mode | TEXT ||
| State | TEXT ||
| Type | TEXT ||

- Temperature_table

| Column | TYPE | Other |
|--------|----------|----|
| Id | INT | PRIMARY |
| Country | TEXT | KEY of Demographics Table 
| City | TEXT | KEY of Demographics Table |
| Date | INT | KEY of Time Table |
| AverageTemp | FLOAT | |
| Uncertanty | FLOAT | |


**Dimension Tables**

- Airport_table

| Column | TYPE | Other |
|--------|----------|----|
| Id | TEXT | PRIMARY |
| Type | TEXT | |
| Name | TEXT | |
| Country | TEXT | KEY of Demographics Table |
| Region | TEXT | KEY of Demographics Table |
| Municipality | TEXT | KEY of Demographics Table |
| Elevation | INT | |

- Demographics_table

| Column | TYPE | Other |
|--------|----------|----|
| Id | INT | Primary |

- Time_table

| Column | TYPE | Other |
|--------|----------|----|
| Id | INT | Primary |
| City | TEXT ||
| State | TEXT ||
| Median Age | FLOAT ||
| Male Population |



!!!!!!!!TO COMPLETE!!!!!!!!!!!!!!!

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Create staging tables:
    - staging_i94data
    - staging_airport
    - staging_temperature

2. Create database tables:
    - immigration_table
    - temperature_table
    - airport_table
    - demographics_table
    - time_table
    
3. Importa data to staging

4. Substitute simple codes with text
    - visatype
    - birth_country
    - residence_country
    
5. Delete non-large airports

6. Delete duplicates

7. Insert the data into the tables
   

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
spark = functions.spark_init()
immi_df, temp_df, demo_df, airp_df = functions.create_dataframe(spark)
immi_df, temp_df, demo_df, airp_df = functions.clean_dataframe(spark, immi_df, temp_df, demo_df, airp_df)
time_df = functions.create_time(spark, immi_df, temp_df)
functions.upload_s3(spark, immi_df, temp_df, demo_df, airp_df, time_df)


Spark Session created
i94data_df created, 3096313 rows.
temp_df created, 8599212 rows.
demo_df created, 2891 rows.
airp_df created , 55075 rows.
Number of duplicated rows: 0
Number of duplicated rows: 0
Numeber of airport lines: 9189
Number of temperature lines: 661524
Time table created
Starting upload to S3


In [2]:
functions.create_database()
functions.insert_redshift()

Dropping old tables
Old tables dropped
Creating new tables
New tables created

    COPY immigration FROM 's3://acr-udacity-capstone-bucket-2/immi'
    ACCESS_KEY_ID 'AKIAQMAVHBKM3BEBUTOZ'
    SECRET_ACCESS_KEY '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'  
    FORMAT AS PARQUET;

=== DONE IN: 0.25 sec


    COPY temperature FROM 's3://acr-udacity-capstone-bucket-2/temp'
    ACCESS_KEY_ID 'AKIAQMAVHBKM3BEBUTOZ'
    SECRET_ACCESS_KEY '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' 
    FORMAT AS PARQUET;

=== DONE IN: 0.24 sec


    COPY demographics FROM 's3://acr-udacity-capstone-bucket-2/demo'
    ACCESS_KEY_ID 'AKIAQMAVHBKM3BEBUTOZ'
    SECRET_ACCESS_KEY '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'     
    FORMAT AS PARQUET;

=== DONE IN: 0.25 sec


    COPY airport FROM 's3://acr-udacity-capstone-bucket-2/airp'
    ACCESS_KEY_ID 'AKIAQMAVHBKM3BEBUTOZ'
    SECRET_ACCESS_KEY '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' 
    FORMAT AS PARQUET;

=== DONE IN: 0.25 sec


    COPY time FROM 's3://ac

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.