# US Data 
### Data Engineering Capstone Project

#### Overview
The Purpose of this project is to work what I learn on Data Engineering Nanodegree

#### Project Summary
The goal of this project is to aggregate big data using Spark SQL to make a datawarehouse
It's used a star schema with a facts table an dimensional tables.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import glob
import numpy as np
from pyspark.sql import SparkSession, GroupedData,SQLContext
from pyspark import SparkConf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *
%matplotlib inline
import matplotlib.pyplot as plt

In [2]:
#Build spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
#Bulid an object 
sqlContext = SQLContext(spark)

### Step 1: Scope the Project and Gather Data

#### Scope 
The Scope of this project is to pull data from all sources and create fact and dimension tables to show movement of immigration.
#### Describe and Gather Data

-I94 Immigration Data: comes from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports of entry.

-World Temperature Data: comes from kaggle and includes data on temperature changes in the U.S. since 1850.

-Airport Code Table: comes from datahub.io and includes airport codes and corresponding cities.


In [4]:
# Read in the data here
df_spark=spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat")
temperature=spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
airport=spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")

In [4]:
airport.show(10)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [5]:
airport.take(1)

[Row(ident='00A', type='heliport', name='Total Rf Heliport', elevation_ft='11', continent='NA', iso_country='US', iso_region='US-PA', municipality='Bensalem', gps_code='00A', iata_code=None, local_code='00A', coordinates='-74.93360137939453, 40.07080078125')]

In [6]:
airport.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [7]:
airport.toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


In [20]:
temperature.show(10)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01|5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|            10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01|14.050999999999998|                   

In [19]:
temperature.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [13]:
df_spark.show(10)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|  4.0|2016.0|   6.0| 135.0| 135.0|    XXX|20612.0|   null|   null|   null|  59.0|    2.0|  1.0|     1.0|        0.0|    

In [34]:
df_spark.take(1)

[Row(cicid=4.0, i94yr=2016.0, i94mon=6.0, i94cit=135.0, i94res=135.0, i94port='XXX', arrdate=20612.0, i94mode=None, i94addr=None, depdate=None, i94bir=59.0, i94visa=2.0, count=1.0, validres=1.0, delete_days=0.0, delete_mexl=0.0, delete_dup=0.0, delete_visa=0.0, delete_recdup=0.0, dtadfile=None, visapost=None, occup=None, entdepa='Z', entdepd=None, entdepu='U', matflag=None, biryear=1957.0, dtaddto='10032016', gender=None, insnum=None, airline=None, admnum=14938462027.0, fltno=None, visatype='WT')]

# Step 2: Explore and Assess the Data
## Explore the Data
Identify data quality issues, like missing values, duplicate data, etc.

## Cleaning Steps
1-Filter airport data for "large_airport" and use substring to return the state code.
2-Delete the nulls, Filter temperature data for country to US and year=2000 and drop duplicates
3-Remove nulls ,convert i94yr and i94mon codes to integer 

## 1-Filter airport data for "large_airport"

In [5]:
#Filter airport data for "large_airport" and use substring to return the state code.
airport_data=airport.filter(airport["type"]=="large_airport")\
.filter(airport["iso_country"]=="US")\
.filter(airport.iata_code.isNotNull())\
.filter(airport.continent.isNotNull())\
.withColumn("iso_region",substring(airport["iso_region"],4,2))\
.withColumn("elevation_ft",col("elevation_ft").cast("float"))


elevation=airport_data.groupBy("iso_country","iso_region").avg("elevation_ft")


new_airport=elevation.select(col("iso_country").alias("Country"),\
                                                col("iso_region").alias("state"),\
                                                round(col("avg(elevation_ft)"),1).alias("avg_elevation_ft")).orderBy("iso_region")
        
                    

In [6]:
new_airport.show(10) 

+-------+-----+----------------+
|Country|state|avg_elevation_ft|
+-------+-----+----------------+
|     US|   AK|           295.5|
|     US|   AL|           429.8|
|     US|   AR|           365.5|
|     US|   AZ|          1621.0|
|     US|   CA|           342.4|
|     US|   CO|          5431.0|
|     US|   CT|           173.0|
|     US|   DC|           163.5|
|     US|   DE|            24.0|
|     US|   FL|            38.3|
+-------+-----+----------------+
only showing top 10 rows



In [11]:
airport.createOrReplaceTempView("airport")

In [26]:
#DISTINCT state airport
spark.sql('''
          SELECT DISTINCT state
          FROM  airport
          ORDER BY state ASC
          '''
          ).show()

+-----+
|state|
+-----+
|   AK|
|   AL|
|   AR|
|   AZ|
|   CA|
|   CO|
|   CT|
|   DC|
|   DE|
|   FL|
|   GA|
|   HI|
|   IA|
|   ID|
|   IL|
|   IN|
|   KS|
|   KY|
|   LA|
|   MA|
+-----+
only showing top 20 rows



## 2-Delete the nulls, Filter temperature data for country to US and year=2000 and drop duplicates

In [21]:
#Delete nulls, Filter country to US and year=2000 and drop duplicates
temperature_data=temperature.filter(temperature["country"]=="United States")\
.filter(temperature.AverageTemperature.isNotNull())\
.filter(temperature.AverageTemperatureUncertainty.isNotNull())\
.filter(year(temperature["dt"])==2000)\
.withColumn("year",year(temperature["dt"]))\
.withColumn("month",month(temperature["dt"]))

new_Temperature=temperature_data.select("year","month",
                                     "City","Country",round(col("AverageTemperature"),1).alias("avg_temp_celcius")).dropDuplicates()

In [22]:
new_Temperature.show(10)

+----+-----+------------+-------------+----------------+
|year|month|        City|      Country|avg_temp_celcius|
+----+-----+------------+-------------+----------------+
|2000|    7|     Antioch|United States|            18.8|
|2000|    3|      Aurora|United States|             4.4|
|2000|    9|  Carrollton|United States|            26.4|
|2000|   12|  Clearwater|United States|            15.7|
|2000|   10|       Flint|United States|            12.3|
|2000|    2|Fort Collins|United States|             2.5|
|2000|   11|   Fullerton|United States|            14.4|
|2000|    2|     Memphis|United States|             9.8|
|2000|    1|        Mesa|United States|            12.8|
|2000|    9|   New Haven|United States|            17.9|
+----+-----+------------+-------------+----------------+
only showing top 10 rows



## 3-Remove nulls ,convert i94yr and i94mon codes to integer 

In [14]:

immigration_data = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df=pd.read_sas(immigration_data, 'sas7bdat', encoding="ISO-8859-1")

In [15]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [17]:
#Remove nulls ,convert i94yr and i94mon codes to integer 
i94_data=df_spark.filter(df_spark.i94addr.isNotNull())\
.filter(df_spark.i94res.isNotNull())\
.withColumn("i94yr",col("i94yr").cast("integer"))\
.withColumn("i94mon",col("i94mon").cast("integer"))\
.withColumn("city_port_name",(df_spark["i94port"]))

new_Data=i94_data.select(col("i94yr").alias("year"),col("i94mon").alias("month"),\
                            "i94port",col("i94addr").alias("state_code"))

In [18]:
new_Data.show(10)

+----+-----+-------+----------+
|year|month|i94port|state_code|
+----+-----+-------+----------+
|2016|    6|    SFR|        CA|
|2016|    6|    SFR|        CA|
|2016|    6|    HOU|        TX|
|2016|    6|    BOS|        MA|
|2016|    6|    NEW|        PA|
|2016|    6|    NYC|        NY|
|2016|    6|    NYC|        NY|
|2016|    6|    NYC|        NY|
|2016|    6|    NYC|        NY|
|2016|    6|    NYC|        OH|
+----+-----+-------+----------+
only showing top 10 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model
#### Star Schema
1-Dimension Tables
     -airport_table: country ,state, avg_elevation_ft
     -temperature_table: country,City,year,month,avg_temp_celcius
     -immigaration_table:year,month,i94port,state_code
     
 
2-Fact Table
        -immigaration_Fact_table :year,month,avg_elevation_ft ,avg_temp_celcius
        
This Schema is simple and easy to use for analysis 
     
           
#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

   1-Fact table is converted back to a spark dataframe.
   2-Fact table is created as a SQL query with joins to dimension tables.
    3-Dimension tables will be created from cleansed data.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [23]:
# Write code here
new_Data.createOrReplaceTempView("immigration")
new_airport.createOrReplaceTempView("airport")
new_Temperature.createOrReplaceTempView("temperature")
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "0")


In [24]:
#join state form airport table and state_code for immigration table 
states=spark.sql('''
          SELECT
          a.Country,
          a.state,
          i.month,
          a.avg_elevation_ft
          FROM airport a JOIN immigration i ON a.state= i.state_code
           
          '''
          )

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [14]:
# Check for NULL values country , state, month, avg_elevation_ft
states.select(isnull('Country').alias('Country'),\
                             isnull('state').alias('state'),\
                             isnull('month').alias('month'),\
                             isnull('avg_elevation_ft').alias('avg_elevation_ft')).dropDuplicates().show()

+-------+-----+-----+----------------+
|Country|state|month|avg_elevation_ft|
+-------+-----+-----+----------------+
|  false|false|false|           false|
+-------+-----+-----+----------------+



In [17]:
#Count the total number of immigration
spark.sql('SELECT COUNT(*) AS total_num_immig FROM immigration').show()

+---------------+
|total_num_immig|
+---------------+
|        3388925|
+---------------+



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### Airport Dataset:
ident: ident for airport
type: type of the airport
name: name of the airport
elevation_ft: elevation in ft
iso_country: Country
iso_region:Region or state

##### Temperature Dataset:
dt:Date in format YYYY-MM-DD
City:Name of city
Country:Name of country
Average Temperatur:Average Temperatur o the city
Latitude:Latitude
Longitude:Longitude

##### 194 immigration Dataset:
i94port:3 character code of destination city
i94yr: year
i94mon:number of month
i94cit:3 digit code of origin city
arrdae:arrival date
depdate:departure date
i94mode:1 digit travel code
i94visa:reason for immigration


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1-For this project I used Apache Spark to create a model and wrangling data. The reason for this is because Spark can scale a lot of data and the library spark.sql has many tools to transform data. The data persisted in parquet files can scale to losts of terabytes without any problems.

2-The data should be update every day to give up to date data for government and organizations

3-Under the following scenarios, I would approch the problem 

-If the data was increased by 100x Apache Spark can do it 

-To update on a daily data I would use Apache Airflow to create a schedule dashboard to update all the data

-If the data needs to be accessed by 100+ people,we can use Apache Spark or Amazon AWS