# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, date_add
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project we are working on the immigration data in the United states,it aims to create an ETL pipeline that takes data , processes them and uploads them to a data warehouse. The data warehouse facilitates the analysis of the US immigration phenomenon.

#### Describe and Gather Data 
The main data we will work on  comes from the US National Tourism and Trade Office.
There is also another data that we will work on it in this project:
- World Temperature Data: This dataset came from Kaggle,it provides historical information about     monthly average temperatures in different cities around the world. 
- U.S. City Demographic Data: This data comes from OpenSoft,it contains information on the demographics of all US cities with a population greater than 63 000
- Airport Code Table: This is a simple table of airport codes and corresponding cities.

# Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

# we have four data sets we will explore and asses them:
## 1-the immigration data
## 2-the demographic data
## 3-the air ports data
## 4-the temperature data

### 1-Read the immigration data
this is th main data set of the project,it comes from the US National Tourism and Trade Office,based on the citizen id it contain data like:
- country of citizenship
- country of residence
- arrival air port
- arrival date
- age
- gender
- visa type

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [3]:
#write to parquet
#df_spark_immigration.write.parquet("sas_data")
df_spark_immigration=spark.read.parquet("sas_data")

In [4]:
df_spark_immigration.head()

Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')

In [5]:
df_spark_immigration.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [6]:
df_spark_immigration.count()

3096313

In [7]:
df_spark_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

### 2- Read the demographic data
this data set include demographic data in the united states,based on the city it shows information about:
- population
- male and female population
- median age
- average house hold size
- foreign born
- race

In [8]:
df_demographics=pd.read_csv('us-cities-demographics.csv',';')

In [9]:
df_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [10]:
df_demographics.shape

(2891, 12)

In [11]:
df_demographics.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


### 3- Read the airports data
this data gives data about the air ports in the united states based on an airport identity,this data includes:
- airport name
- airport type
- airport region
- airport municipality
- airport codes(gps,local,iata)codes
- airport cordinates

In [12]:
df_airports=pd.read_csv('airport-codes_csv.csv')

In [13]:
df_airports.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [14]:
df_airports.shape

(55075, 12)

In [15]:
df_airports.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


### 4-Read the temperature data
this data give information about temperature based on date for different cities
- average temperature
- average temperature uncertainity
- latitude and longitude for the city


In [16]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(fname)

In [17]:
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [18]:
df_temperature.shape

(8599212, 7)

In [19]:
df_temperature.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [20]:
#convert date to date time format
df_temperature['dt'] = pd.to_datetime(df_temperature.dt)

## Now we have read the 4 data sets:
- the immigration data set:df_spark_immigration
- the temperature data set:df_temperature
- the demographic data set:df_demographics
- the airports data set:df_airports

## Adding data dictionary
there is two data dictionaries included with these datasets:
- country codes:includes the country for each  country code found in the  data sets
- port codes:include the airports name,city for each airport code found in the data sets

### 1- Add contries code dictionary

In [21]:
df_countries_codes = pd.read_csv('194cntyl-codes.csv','=')

In [22]:
df_countries_codes.head()

Unnamed: 0,code','country'
0,582,'MEXICO Air Sea'
1,236,'AFGHANISTAN'
2,101,'ALBANIA'
3,316,'ALGERIA'
4,102,'ANDORRA'


In [23]:
df_countries_codes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 289 entries, 0 to 288
Data columns (total 2 columns):
code'        289 non-null object
'country'    285 non-null object
dtypes: object(2)
memory usage: 4.6+ KB


### 2- Add ports code dictionary

In [24]:
df_ports_codes=pd.read_csv('194port-codes.csv')

In [25]:
df_ports_codes.head()

Unnamed: 0,code,airport,state
0,'ALC','ALCAN,AK
1,'ANC','ANCHORAGE,AK
2,'BAR','BAKER AAF - BAKER ISLAND,AK
3,'DAC','DALTONS CACHE,AK
4,'PIZ','DEW STATION PT LAY DEW,AK


In [26]:
df_ports_codes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 660 entries, 0 to 659
Data columns (total 3 columns):
code       660 non-null object
airport    660 non-null object
state      584 non-null object
dtypes: object(3)
memory usage: 15.5+ KB




## Cleaning Steps
### the immigration data set:
- we need only data for the united states that are with air travel
- remove rows where gender is not defined
- compute the age and add it 
- compute the arrival and departure dates to a useable value and add it
- add the visa type from code to string 
### temperature data set:
- we need only data for the united states,remove data for all other countries
- we don't need data before 1960
### airports:
- we don't need data from closed airports,helicopter airports,sea planes port and ballons ports as they are not used in immigration


### the immigration data

In [27]:
#in order to start cleaning the immigration data create the table immig_table
df_spark_immigration.createOrReplaceTempView("immig_table")

In [28]:
spark.sql("""
SELECT *
FROM immig_table""")

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string]

### from the dictionary,the arrival modes definitions are:
value i94model
- 	1 = 'Air'
- 	2 = 'Sea'
- 	3 = 'Land'
- 	9 = 'Not reported'

In [29]:
# Remove all entries into the united states that weren't via air travel
spark.sql("""
SELECT *
FROM immig_table
WHERE i94mode = 1
""").createOrReplaceTempView("immig_table")

In [30]:
# drop rows where the gender values entered is undefined
spark.sql("""SELECT * FROM immig_table WHERE gender IN ('F', 'M')""").createOrReplaceTempView("immig_table")

In [31]:
# Compute the age of each individual and add it to the view
spark.sql("""
SELECT *, (2023-biryear) AS age 
FROM immig_table
""").createOrReplaceTempView("immig_table")

In [32]:
# convert the arrival dates into a useable value
spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immig_table").createOrReplaceTempView("immig_table")

In [33]:
# convert the departure dates into a useable value
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        
                FROM immig_table""").createOrReplaceTempView("immig_table")

I94VISA - Visa codes collapsed into three categories:
-    1 = Business
-    2 = Pleasure
-    3 = Student

In [34]:

# Add visa character string aggregation
spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE 'N/A' END AS detailed_visa_type 
                        
                FROM immig_table""").createOrReplaceTempView("immig_table")

In [35]:
spark.sql("SELECT * FROM immig_table LIMIT 10").show()

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+----+------------+--------------+------------------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype| age|arrival_date|departure_date|detailed_visa_type|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+----+------------+--------------+------------------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10

### port codes

In [36]:
# remove all entries with null values as they are either un reported or outside the US
df_ports_codes = df_ports_codes[~df_ports_codes.state.isna()].copy()

In [37]:
df_ports_codes.head()

Unnamed: 0,code,airport,state
0,'ALC','ALCAN,AK
1,'ANC','ANCHORAGE,AK
2,'BAR','BAKER AAF - BAKER ISLAND,AK
3,'DAC','DALTONS CACHE,AK
4,'PIZ','DEW STATION PT LAY DEW,AK


In [38]:
# We need to exclude values for airports outside of the US. 
nonUSstates = ['CANADA', 'Canada', 'NETHERLANDS', 'NETH ANTILLES', 'THAILAND', 'ETHIOPIA', 'PRC', 'BERMUDA', 'COLOMBIA', 'ARGENTINA', 'MEXICO', 
               'BRAZIL', 'URUGUAY', 'IRELAND', 'GABON', 'BAHAMAS', 'MX', 'CAYMAN ISLAND', 'SEOUL KOREA', 'JAPAN', 'ROMANIA', 'INDONESIA',
               'SOUTH AFRICA', 'ENGLAND', 'KENYA', 'TURK & CAIMAN', 'PANAMA', 'NEW GUINEA', 'ECUADOR', 'ITALY', 'EL SALVADOR']

In [39]:
df_ports_codes = df_ports_codes[~df_ports_codes.state.isin(nonUSstates)].copy()

In [40]:
df_ports_codes.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 584 entries, 0 to 590
Data columns (total 3 columns):
code       584 non-null object
airport    584 non-null object
state      584 non-null object
dtypes: object(3)
memory usage: 18.2+ KB


### temperature





In [41]:
#filter only the temperature of the united states
df_temperature = df_temperature[df_temperature['Country']=='United States']

In [42]:
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [43]:
df_temperature.shape

(687289, 7)

In [44]:
df_temperature.tail()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
8439242,2013-05-01,15.544,0.281,Yonkers,United States,40.99N,74.56W
8439243,2013-06-01,20.892,0.273,Yonkers,United States,40.99N,74.56W
8439244,2013-07-01,24.722,0.279,Yonkers,United States,40.99N,74.56W
8439245,2013-08-01,21.001,0.323,Yonkers,United States,40.99N,74.56W
8439246,2013-09-01,17.408,1.048,Yonkers,United States,40.99N,74.56W


### we need only temperature after year 1960

In [45]:
#filter on data from 1960
df_temperature=df_temperature[df_temperature['dt']>"1960-01-01"].copy()

In [46]:
# convert the city names to upper case
df_temperature.City = df_temperature.City.str.strip().str.upper()

In [47]:
df_temperature.shape

(165508, 7)

In [48]:
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
49236,1960-02-01,4.995,0.325,ABILENE,United States,32.95N,100.53W
49237,1960-03-01,8.575,0.303,ABILENE,United States,32.95N,100.53W
49238,1960-04-01,18.452,0.282,ABILENE,United States,32.95N,100.53W
49239,1960-05-01,21.709,0.286,ABILENE,United States,32.95N,100.53W
49240,1960-06-01,27.714,0.387,ABILENE,United States,32.95N,100.53W


### Airports

In [49]:
df_airports.groupby('type')['type'].count()

type
balloonport          24
closed             3606
heliport          11287
large_airport       627
medium_airport     4550
seaplane_base      1016
small_airport     33965
Name: type, dtype: int64

In [50]:
#remove all data from closed airports,helicopter airports,sea planes port and ballons ports as they are not used in immigration
excludedValues = ['closed', 'heliport', 'seaplane_base', 'balloonport']
df_airports = df_airports[~df_airports['type'].str.strip().isin(excludedValues)].copy()

In [51]:
df_airports.groupby('type')['type'].count()

type
large_airport       627
medium_airport     4550
small_airport     33965
Name: type, dtype: int64

In [52]:
# Write code here

### Adding the dictionary to the immigration data set

In [53]:
# make a schema that will be used to covert the data dictionary to spark context
from pyspark.sql.types import *
df_schema = StructType([StructField("country_code", StringType(), True)\
                       ,StructField("country_name", StringType(), True)])

In [54]:
# let's convert the country code data dictionary to views in our spark context to perform SQL operations with them
spark_df_country_codes = spark.createDataFrame(df_countries_codes,schema=df_schema )
spark_df_country_codes .createOrReplaceTempView("country_codes")

In [55]:
spark_df_country_codes.show(5)

+------------+-----------------+
|country_code|     country_name|
+------------+-----------------+
|         582| 'MEXICO Air Sea'|
|        236 |    'AFGHANISTAN'|
|        101 |        'ALBANIA'|
|        316 |        'ALGERIA'|
|        102 |        'ANDORRA'|
+------------+-----------------+
only showing top 5 rows



In [56]:
#to find all temp. tables
#spark.catalog.listTables()

In [57]:
# make a schema that will be used to covert the data dictionary to spark context
df_schema2 = StructType([StructField("code", StringType(), True)\
                       ,StructField("airport", StringType(), True)\
                       ,StructField("state", StringType(), True)])

In [58]:
# let's convert the port code data dictionary to views in our spark context to perform SQL operations with them
spark_df_ports_codes = spark.createDataFrame(df_ports_codes,schema=df_schema2)
spark_df_ports_codes .createOrReplaceTempView("ports_codes")

In [59]:
spark_df_ports_codes.show(5)

+--------+--------------------+----------------+
|    code|             airport|           state|
+--------+--------------------+----------------+
|   'ALC'|              'ALCAN| AK             |
|   'ANC'|          'ANCHORAGE|     AK         |
|   'BAR'|'BAKER AAF - BAKE...|              AK|
|   'DAC'|      'DALTONS CACHE|         AK     |
|   'PIZ'|'DEW STATION PT L...|              AK|
+--------+--------------------+----------------+
only showing top 5 rows



### Add the country name instead of code in the immigration data set

In [60]:
# we use an inner join to drop invalid codes
#country of citizenship

spark.sql("""
SELECT im.*, cc. country_name AS citizenship_country
FROM immig_table im
INNER JOIN country_codes cc
ON im.i94cit = cc.country_code
""").createOrReplaceTempView("immig_table")

In [61]:
#country of residence
spark.sql("""
SELECT im.*, cc.country_name AS residence_country
FROM immig_table im
INNER JOIN country_codes cc
ON im.i94res = cc.country_code
""").createOrReplaceTempView("immig_table")

In [62]:
spark.sql("SELECT * FROM immig_table LIMIT 5").show()

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+----+------------+--------------+------------------+-------------------+-----------------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype| age|arrival_date|departure_date|detailed_visa_type|citizenship_country|residence_country|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+----+------------+--------------+------------------+-------------------+-----------------+
|5761447.0|2016.0|   4.0| 299.0| 299.0|    LOS|20574

In [63]:
#spark.catalog.listTables()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### we will create five tables,one fact table and four dimensional tables
## the fact table is:
### immig_table:
## the dimensional tables are:
### airports
### temperature
### demographics
### dim_time_table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### create the fact table

In [64]:
spark.sql("""
SELECT *
FROM immig_table""")

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string, age: double, arrival_date: date, departure_date: string, detailed_visa_type: string, citizenship_country: string, residence_country: string]

In [65]:
# Insert the immigration fact data into a spark dataframe
fact_immigration = spark.sql("""
                        SELECT 
                            cicid, 
                            citizenship_country,
                            residence_country,
                            TRIM(UPPER (i94port)) AS city,
                            arrival_date,
                            departure_date,
                            age,
                            detailed_visa_type

                        FROM immig_table
""")

In [66]:
fact_immigration.show(5)

+---------+-------------------+-----------------+----+------------+--------------+----+------------------+
|    cicid|citizenship_country|residence_country|city|arrival_date|departure_date| age|detailed_visa_type|
+---------+-------------------+-----------------+----+------------+--------------+----+------------------+
|5761447.0|         'MONGOLIA'|       'MONGOLIA'| LOS|  2016-04-30|    2016-05-29|51.0|          Pleasure|
|5761448.0|         'MONGOLIA'|       'MONGOLIA'| CHI|  2016-04-30|    2016-06-15|49.0|          Pleasure|
|5761451.0|         'MONGOLIA'|       'MONGOLIA'| WAS|  2016-04-30|          null|41.0|           Student|
|5761452.0|         'MONGOLIA'|       'MONGOLIA'| WAS|  2016-04-30|    2016-05-19|33.0|          Pleasure|
|5761453.0|         'MONGOLIA'|       'MONGOLIA'| WAS|  2016-04-30|    2016-05-06|57.0|          Business|
+---------+-------------------+-----------------+----+------------+--------------+----+------------------+
only showing top 5 rows



### the airports table

In [67]:
#read the airports table
spark_df_airports = spark.read.format("csv").option("header", "true").load('airport-codes_csv.csv')
spark_df_airports.createOrReplaceTempView("airports")

In [68]:
#remove null values
spark.sql("""
SELECT *
FROM airports
WHERE iso_country IS NOT NULL
AND UPPER(TRIM(iso_country)) LIKE 'US'
""").createOrReplaceTempView("airports")

In [69]:
spark.sql("""
SELECT *
FROM airports
WHERE municipality IS NOT NULL
AND LENGTH(iso_region) = 5
""").createOrReplaceTempView("airports")

In [70]:
dim_airports = spark.sql("""
SELECT TRIM(ident) AS ident, type, name, elevation_ft, SUBSTR(iso_region, 4) AS state, TRIM(UPPER(municipality)) AS municipality, iata_code
FROM airports
""")

In [71]:
dim_airports.show(20)

+-----+-------------+--------------------+------------+-----+------------+---------+
|ident|         type|                name|elevation_ft|state|municipality|iata_code|
+-----+-------------+--------------------+------------+-----+------------+---------+
|  00A|     heliport|   Total Rf Heliport|          11|   PA|    BENSALEM|     null|
| 00AA|small_airport|Aero B Ranch Airport|        3435|   KS|       LEOTI|     null|
| 00AK|small_airport|        Lowell Field|         450|   AK|ANCHOR POINT|     null|
| 00AL|small_airport|        Epps Airpark|         820|   AL|     HARVEST|     null|
| 00AR|       closed|Newport Hospital ...|         237|   AR|     NEWPORT|     null|
| 00AS|small_airport|      Fulton Airport|        1100|   OK|        ALEX|     null|
| 00AZ|small_airport|      Cordes Airport|        3810|   AZ|      CORDES|     null|
| 00CA|small_airport|Goldstone /Gts/ A...|        3038|   CA|     BARSTOW|     null|
| 00CL|small_airport| Williams Ag Airport|          87|   CA|    

### The time dimensional table

In [72]:
# extract all distinct dates from arrival and departure dates to create dimension table
dim_time = spark.sql("""
SELECT DISTINCT arrival_date AS date
FROM immig_table
UNION
SELECT DISTINCT departure_date AS date
FROM immig_table
WHERE departure_date IS NOT NULL
""")
dim_time.createOrReplaceTempView("dim_time_table")

In [73]:
# extract year, month, day, weekofyear, dayofweek and weekofyear from the date and insert all the values in the dim_time table;
dim_time = spark.sql("""
SELECT date, YEAR(date) AS year, MONTH(date) AS month, DAY(date) AS day, WEEKOFYEAR(date) AS week, DAYOFWEEK(date) as weekday, DAYOFYEAR(date) year_day
FROM dim_time_table
ORDER BY date ASC
""")

In [74]:
dim_time.show(5)

+----------+----+-----+---+----+-------+--------+
|      date|year|month|day|week|weekday|year_day|
+----------+----+-----+---+----+-------+--------+
|2012-04-12|2012|    4| 12|  15|      5|     103|
|2015-05-18|2015|    5| 18|  21|      2|     138|
|2015-10-22|2015|   10| 22|  43|      5|     295|
|2016-01-26|2016|    1| 26|   4|      3|      26|
|2016-01-31|2016|    1| 31|   4|      1|      31|
+----------+----+-----+---+----+-------+--------+
only showing top 5 rows



In [75]:
dim_time.printSchema()

root
 |-- date: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- year_day: integer (nullable = true)



### the temperature table

In [76]:
# convert the dataframes from pandas to spark
spark_df_temperature = spark.createDataFrame(df_temperature)
spark_df_temperature .createOrReplaceTempView("temperature")

In [77]:
dim_temperature = spark.sql("""
SELECT
    DISTINCT dt AS date,
    City,
    AVG(AverageTemperature) OVER (PARTITION BY dt, City) AS average_temperature, 
    AVG(AverageTemperatureUncertainty)  OVER (PARTITION BY dt, City) AS average_termperature_uncertainty
    
FROM temperature
""")

In [78]:
dim_temperature.show(5)

+-------------------+--------------+-------------------+--------------------------------+
|               date|          City|average_temperature|average_termperature_uncertainty|
+-------------------+--------------+-------------------+--------------------------------+
|1964-02-01 00:00:00|        MOBILE| 7.6960000000000015|                           0.247|
|1971-04-01 00:00:00|        EDISON|               7.83|                           0.412|
|1979-06-01 00:00:00|   SIOUX FALLS|             19.373|                           0.327|
|1982-05-01 00:00:00|       DETROIT|             17.233|                           0.241|
|1982-07-01 00:00:00|SAN BERNARDINO|              26.93|                           0.409|
+-------------------+--------------+-------------------+--------------------------------+
only showing top 5 rows



In [79]:
#fact_immigration.show(5)

### the demographics table

In [80]:
# convert the dataframes from pandas to spark
spark_df_demographics = spark.createDataFrame(df_demographics)
spark_df_demographics .createOrReplaceTempView("demographics")

In [81]:
# insert data into the demographics dim table
dim_demographics = spark.sql("""
                                SELECT  City, 
                                        State, 
                                        `Median Age` AS median_age, 
                                        `Male Population` AS male_population, 
                                        `Female Population` AS female_population, 
                                        `Total Population` AS total_population, 
                                        `Foreign-born` AS foreign_born, 
                                        `Average Household Size` AS average_household_size, 
                                        `State Code` AS state_code, 
                                        Race, 
                                        Count
                                FROM demographics
""")

In [82]:
dim_demographics.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|median_age|male_population|female_population|total_population|foreign_born|average_household_size|state_code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|        40601.0|          41862.0|           82463|     30908.0|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|        44129.0|          49500.0|           93629|     32935.0|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|        38040.0|          46799.0|           84839|      8229.0|                  2.58|        AL|              

### let's show the created tables with the loaded data 

In [83]:
#fact_immigration.show(5)

In [84]:
#dim_demographics.show(5)

In [85]:
#dim_airports.show(5)

In [86]:
#dim_temperature.show(5)

In [87]:
#dim_time.show(5)

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.


## we have created the fact and dimenasional tables,selected data from the  spark temporary  view and inserted them in these tables, now we have five   tables :
- fact_immigration table: the main table that contains data from the US National Tourism and Trade Office,it's primary key is the citizen id("cicid") it focus on the data of the citizens including their citizenship,residence country,gender,age,visa type
- dim_time table:it contains data about date,it breaks the date into year,month,day,week,week day,day of the year,this table can be joined with the fact_immigration table on the date
- dim_demographics table:this table contains data about the demography of each city like male population,female population,age,foreign it can be joined with the fact_immigration table on the city and the state
- dim_airports:this table contains data about the airports,we filtered them to be the ports only used in immigration in the united states,it has a primary key named("ident") ,it contains infos like:the airport type,name,state,municipality
- dim_temperature:this table contains data about the temperature for cities all around the worls for differnet cities at different days ,and in different locations in the city,we are working on average temperature for each city at certain day


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * check that columns that are used like primary or secondry keys don't have null values
 * Count checks to ensure the  fact immigration table is loaded completely and that the primary key("cicid") is the same number of rows

 
Run Quality Checks

In [88]:
#Let's check some things in our data
dim_demographics.createOrReplaceTempView("dim_demographics")
dim_time.createOrReplaceTempView("dim_time")
dim_airports.createOrReplaceTempView("dim_airports")
dim_temperature.createOrReplaceTempView("dim_temperature")
fact_immigration.createOrReplaceTempView("fact_immigration")

In [89]:
#check for null values in columns not recomended to have null values
def nullValueCheck(spark_ctxt, tables_to_check):
    #tables_to_check=[dim_time,fact_immigration]
    for table in tables_to_check:
        print(f"Performing data quality check on table {table}...")
        for column in tables_to_check[table]:
            returnedVal = spark_ctxt.sql(f"""SELECT COUNT(*) as nbr FROM {table} WHERE {column} IS NULL""")
            if returnedVal.head()[0] > 0:
                raise ValueError(f"Data quality check failed! Found NULL values in {column} column!")
                print("null values present")
        print(f"Table {table} passed.")

In [90]:
#dictionary of tables and columns to be checked
tables_to_check = { 'fact_immigration' : ['cicid'], 'dim_time':['date'], 'dim_demographics': ['City','state_code'], 'dim_airports':['ident'], 'dim_temperature':['date','City']}

#We call our function on the spark context
nullValueCheck(spark, tables_to_check)

Performing data quality check on table fact_immigration...
Table fact_immigration passed.
Performing data quality check on table dim_time...
Table dim_time passed.
Performing data quality check on table dim_demographics...
Table dim_demographics passed.
Performing data quality check on table dim_airports...
Table dim_airports passed.
Performing data quality check on table dim_temperature...
Table dim_temperature passed.


In [91]:
#the primary key count from the immigration tab table
spark.sql("""
SELECT count(distinct cicid) 
FROM immig_table
""").show()


+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              2100733|
+---------------------+



In [92]:
#should match the primary key count from the fact table (2100733 expected)
spark.sql("""
SELECT count(distinct cicid) - 2100733
FROM fact_immigration
""").show()

+-------------------------------------------------+
|(count(DISTINCT cicid) - CAST(2100733 AS BIGINT))|
+-------------------------------------------------+
|                                                0|
+-------------------------------------------------+



In [93]:
#and should match the row count from the fact table since it is also the primary key (2100733 expected)
spark.sql("""
SELECT count(*) - 2100733
FROM fact_immigration
""").show()

+------------------------------------+
|(count(1) - CAST(2100733 AS BIGINT))|
+------------------------------------+
|                                   0|
+------------------------------------+



#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

# how you would approach the problem differently under the following scenarios:

### The data was increased by 100x.
In this case I may use Amazon EMR to run the ETL process and upload the data on a S3 bucket directly

### The data populates a dashboard that must be updated on a daily basis by 7am every day.
In this case I may use Apache airflow and set the execution to be updated on a daily basis by 7am every day

### The database needed to be accessed by 100+ people.
In this case I suggest creating a redshift cluster and move the data to it to be easily accesible for users