# Project Title
### Data Engineering Capstone Project

#### Project Summary
In this project we will be working with Spark to create an ETL Pipeline for **I94 Immigration Data** the main purpose of this project is to create an DWH using star schema for the Immigration to US. <br>
We will be using two more datasets to help us improve our output analysis by the end of the ETL process 
1. U.S. City Demographic Data 
 - The US City Demography provide us more info about the US states and cities population and provide some statistics so we can ask questions and find pattern between the number of immigrants and the state that he will go to and why some states or cities have more immigrants that the others <br>
2. Airport Code Table
 - The Airport code table provide more info about the Airports codes and types and locations and continent and region so we can find patterns between the immigration and the type of airports or the regions of them.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

#### Import Needed Libiraries 

In [2]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql.functions import  udf , col
from pyspark.sql.types import StringType


#### Create Spark Session

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
At the Beginning we need to learn more about the Datasets that we are going to use 
Let's have alook on the table below and feel free to follow the links to learn more about the Data

#### Describe and Gather Data 

|Dataset Name| Format| Path| Description |
|--|--|--|--|
|I94 Immigration Data| sas7bdat | '../../data/18-83510-I94-Data-2016/*.sas7bdat' |This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.|
| U.S. City Demographic Data | CSV| 'us-cities-demographics.csv' | This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). |
| Airport Code Table | CSV| 'airport-codes_csv.csv' | This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data). |

The Steps that we are going to do is as Follows : 
1. Use Spark to load I94 Immigration Data
2. Take a look on the table and identify whick columns we are going to use
3. Use Spark to load U.S. City Demographic Data
4. Take a look on the data and the columns
5. Use Spark to load Airport Code Table
6. Take a look on the data and the columns

- After That we will dive deep into each table transforming data and cleaning data 
- From There we will be able to create out DWH Schema 
- After creating the schema we will use spark to create the Fact table and the Dim table
- And at the end we will define our own Data Quality Checks to test the quality of the data

### Initial Dataset loads with quick view
#### 1) I94 Immigration Data


In [4]:
# Read csv
# read in the data
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(fname)

In [5]:
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [5]:
immigration_df.count()

3096313

| Col | Description |
|--|--|
cicid | Unique record ID 
i94yr | 4 digit year
i94mon | Numeric month
i94cit | 3 digit code for immigrant country of birth
i94res | 3 digit code for immigrant country of residence
i94port | Port of admission
arrdate | Arrival Date in the USA
i94mode | Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
i94addr | USA State of arrival
depdate | Departure Date from the USA
i94bir | Age of Respondent in Years
i94visa | Visa codes collapsed into three categories
count | Field used for summary statistics
dtadfile | Character Date Field - Date added to I-94 Files
visapost | Department of State where where Visa was issued
occup | Occupation that will be performed in U.S
entdepa | Arrival Flag - admitted or paroled into the U.S.
entdepd | Departure Flag - Departed, lost I-94 or is deceased
entdepu | Update Flag - Either apprehended, overstayed, adjusted to perm residence
matflag | Match flag - Match of arrival and departure records
biryear | 4 digit year of birth
dtaddto | Character Date Field - Date to which admitted to U.S. (allowed to stay until)
gender | Non-immigrant sex
insnum | INS number
airline | Airline used to arrive in U.S.
admnum | Admission Number
fltno | Flight number of Airline used to arrive in U.S.
visatype | Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

#### 2) US Cities Demographics Data

In [6]:
cities_df = spark.read.option("delimiter"  , ";"  ).option("Header" , True).csv('us-cities-demographics.csv' )
cities_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [7]:
cities_df.count()

2891

|Col |Description |
|--|--|
City | City Name
State | US State where city is located
Median Age | Median age of the population
Male Population | Count of male population
Female Population | Count of female population
Total Population | Count of total population
Number of Veterans | Count of total Veterans
Foreign born | Count of residents of the city that were not born in the city
Average Household Size | Average city household size
State Code | Code of the US state
Race | Respondent race
Count | Count of city's individual per race

#### 3) Airport Codes Data

In [8]:
Airport_df = spark.read.option("Header" , True).csv('airport-codes_csv.csv' )
Airport_df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [9]:
Airport_df.count()

55075

### Step 2: Explore and Assess the Data
#### Explore and Clearning the Datasets
In this Step we will check each table and search for null values and useless columns that will not help us in the analysis 
After Cleaning the data we will do some transformations if needed to be able to JOIN the tables 
> Here we will be search for NULL values and useless columns and duplicates to keep only useful data

#### 1) I94 Immigration Data

In [10]:
# Load data from april 2016
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(fname)

In [11]:
immigration_df.columns

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype']

In [12]:
# Check Flags
immigration_df.select( 'entdepa','entdepd','entdepu','matflag').limit(10).toPandas()

Unnamed: 0,entdepa,entdepd,entdepu,matflag
0,T,,U,
1,G,,Y,
2,T,O,,M
3,O,O,,M
4,O,O,,M
5,O,O,,M
6,O,K,,M
7,O,K,,M
8,O,O,,M
9,O,O,,M


> Not giving us detailed info so we will not be using them 
#### Now let's select the columns that we are intersted in the most

In [13]:
# Select Needed Columns and renaming them while loading using Exp
Img_Clean_df = immigration_df.selectExpr('cicid as Img_ID'  , 'i94yr as Img_Year' ,'i94mon as Img_Month', 'i94cit as Img_BCountry',
                     'i94res as Img_ResidenceCountry','i94port as Img_City', 'i94mode as Transportation_Mode',
                      'i94addr as State_Arrival', 'i94bir as Age' ,'i94visa as Visa_Codes', 
                     'occup as OccupationState','biryear as Img_BYear',  'gender as Img_gender',
                      'airline as Airline' , 'fltno as Flight_Number',  'visatype as Visa_Type' , 'visapost as Visa_Post' , 'insnum as INS_Number')

Img_Clean_df.limit(5).toPandas()


Unnamed: 0,Img_ID,Img_Year,Img_Month,Img_BCountry,Img_ResidenceCountry,Img_City,Transportation_Mode,State_Arrival,Age,Visa_Codes,OccupationState,Img_BYear,Img_gender,Airline,Flight_Number,Visa_Type,Visa_Post,INS_Number
0,6.0,2016.0,4.0,692.0,692.0,XXX,,,37.0,2.0,,1979.0,,,,B2,,
1,7.0,2016.0,4.0,254.0,276.0,ATL,1.0,AL,25.0,3.0,,1991.0,M,,296.0,F1,SEO,
2,15.0,2016.0,4.0,101.0,101.0,WAS,1.0,MI,55.0,2.0,,1961.0,M,OS,93.0,B2,,
3,16.0,2016.0,4.0,101.0,101.0,NYC,1.0,MA,28.0,2.0,,1988.0,,AA,199.0,B2,,
4,17.0,2016.0,4.0,101.0,101.0,NYC,1.0,MA,4.0,2.0,,2012.0,,AA,199.0,B2,,


In [14]:
Img_Clean_df.count()

3096313

In [16]:
# Check for NaN values for Visa_Post , OccupationState , Img_gender
Img_Clean_df.select('Visa_Post').groupBy('Visa_Post').count().sort("count", ascending=False).limit(10).toPandas()

Unnamed: 0,Visa_Post,count
0,,1881250
1,MEX,84720
2,SPL,65678
3,BNS,62032
4,GUZ,48298
5,BGT,46074
6,CRS,37137
7,BEJ,36703
8,SHG,35507
9,GDL,30970


In [17]:
f"None % = {1881250/3096313 *100}%"

'None % = 60.75774639062653%'

> Nearly 60% of the data is None 

In [18]:
Img_Clean_df.select('INS_Number').groupBy('INS_Number').count().sort("count", ascending=False).limit(10).toPandas()

Unnamed: 0,INS_Number,count
0,,2982605
1,3692.0,2155
2,3697.0,2033
3,3703.0,1986
4,3893.0,1866
5,3661.0,1820
6,3693.0,1690
7,3939.0,1680
8,3672.0,1678
9,3882.0,1673


In [19]:
f"None % = {2916398/3096313 *100}%"

'None % = 94.18937943289325%'

> Nearly 94% of the data is None so we will delete this column

In [20]:
Img_Clean_df.select('OccupationState').groupBy('OccupationState').count().sort("count", ascending=False).limit(10).toPandas()

Unnamed: 0,OccupationState,count
0,,3088187
1,STU,4719
2,OTH,661
3,NRR,345
4,MKT,280
5,EXA,196
6,GLS,189
7,ULS,175
8,ADM,125
9,TIE,124


In [21]:
f"None % = {3088187/3096313 *100}%"

'None % = 99.73755883206898%'

In [22]:
Img_Clean_df.select('Img_gender').groupBy('Img_gender').count().sort("count", ascending=False).limit(10).toPandas()

Unnamed: 0,Img_gender,count
0,M,1377224
1,F,1302743
2,,414269
3,X,1610
4,U,467


> As per our Exploration we will exclude OccupationState and Visa_Department columns

In [23]:
columns_to_drop = ['OccupationState','INS_Number']
Img_Clean_df = Img_Clean_df.drop(*columns_to_drop)

In [24]:
Img_Clean_df.where(col("Airline").isNull()).count()

83627

In [25]:
Img_Clean_df.limit(5).toPandas()

Unnamed: 0,Img_ID,Img_Year,Img_Month,Img_BCountry,Img_ResidenceCountry,Img_City,Transportation_Mode,State_Arrival,Age,Visa_Codes,Img_BYear,Img_gender,Airline,Flight_Number,Visa_Type,Visa_Post
0,6.0,2016.0,4.0,692.0,692.0,XXX,,,37.0,2.0,1979.0,,,,B2,
1,7.0,2016.0,4.0,254.0,276.0,ATL,1.0,AL,25.0,3.0,1991.0,M,,296.0,F1,SEO
2,15.0,2016.0,4.0,101.0,101.0,WAS,1.0,MI,55.0,2.0,1961.0,M,OS,93.0,B2,
3,16.0,2016.0,4.0,101.0,101.0,NYC,1.0,MA,28.0,2.0,1988.0,,AA,199.0,B2,
4,17.0,2016.0,4.0,101.0,101.0,NYC,1.0,MA,4.0,2.0,2012.0,,AA,199.0,B2,


In [26]:
Img_Clean_df = Img_Clean_df.na.drop(subset=["Airline"])

In [27]:
Img_Clean_df.where(col("Airline").isNull()).count()

0

#### 2) U.S. City Demographic Data

In [28]:
# Read CSV
cities_df = spark.read.option("delimiter"  , ";"  ).option("Header" , True).csv('us-cities-demographics.csv' )
cities_df.limit(5).toPandas()

#Rename Columns
cities_df = cities_df.withColumnRenamed("Median Age","Median_Age") \
    .withColumnRenamed("Male Population","Male_Population") \
    .withColumnRenamed("Female Population","Female_Population") \
    .withColumnRenamed("Total Population","Total_Population") \
    .withColumnRenamed("Number of Veterans","Num_Veterans") \
    .withColumnRenamed("Foreign-born","Foreign_born") \
    .withColumnRenamed("Average Household Size","Average_Household_Size") \
    .withColumnRenamed("State Code","State_Code")


In [29]:
cities_df.limit(5).toPandas()

Unnamed: 0,City,State,Median_Age,Male_Population,Female_Population,Total_Population,Num_Veterans,Foreign_born,Average_Household_Size,State_Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


### We will group our data by State and take the avg of the statistics 

In [30]:
cities_df.createOrReplaceTempView("city_view")

cities_df = spark.sql("""
SELECT  State_Code ,
        MAX(State) AS State_Name ,
        COUNT(*) AS Num_Cities ,
        AVG(Median_Age) AS AVG_Median_Age ,
        AVG(Male_Population) AS AVG_Male_Population,
        AVG(Female_Population) AS AVG_Female_Population ,
        INT(SUM(Total_Population)) AS Total_Population , 
        INT(SUM(Num_Veterans)) AS Num_Veterans,
        INT(SUM(Foreign_born)) AS Foreign_born,
        AVG(Average_Household_Size) AS Average_Household_Size
FROM  city_view
GROUP BY  State_Code
LIMIT 5
""")

In [31]:
cities_df.limit(10).toPandas()

Unnamed: 0,State_Code,State_Name,Num_Cities,AVG_Median_Age,AVG_Male_Population,AVG_Female_Population,Total_Population,Num_Veterans,Foreign_born,Average_Household_Size
0,AZ,Arizona,80,35.0375,139215.9375,142005.4375,22497710,1322525,3411565,2.774375
1,SC,South Carolina,24,33.825,52720.458333,55070.208333,2586976,163334,134019,2.469583
2,LA,Louisiana,40,34.625,78374.75,84199.625,6502975,348855,417095,2.465
3,MN,Minnesota,54,35.57963,64422.277778,66025.222222,7044165,321738,1069888,2.496852
4,NJ,New Jersey,57,35.254386,60053.210526,61543.701754,6931024,146632,2327750,2.960877


### 3) Airport Code Table

In [22]:
Airport_df = spark.read.option("Header" , True).csv('airport-codes_csv.csv' )
Airport_df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


##### Data Exploration

In [32]:
Airport_df.groupBy('continent').count().toPandas()

Unnamed: 0,continent,count
0,,27719
1,SA,7709
2,AS,5350
3,AN,28
4,OC,3067
5,EU,7840
6,AF,3362


In [33]:
# Check if gps_code and ident and local_Code are the same 
Airport_df.select('gps_code' , 'ident','local_code').limit(10).toPandas()

Unnamed: 0,gps_code,ident,local_code
0,00A,00A,00A
1,00AA,00AA,00AA
2,00AK,00AK,00AK
3,00AL,00AL,00AL
4,,00AR,
5,00AS,00AS,00AS
6,00AZ,00AZ,00AZ
7,00CA,00CA,00CA
8,00CL,00CL,00CL
9,00CN,00CN,00CN


In [34]:
# Check Duplicates
if Airport_df.count() > Airport_df.dropDuplicates(['ident']).count():
    raise ValueError('Data has duplicates')

#### Check iata_code

In [35]:
Airport_df.where(col('iata_code').isNull()).count()

45886

In [36]:
Airport_df.count()

55075

> So we will drop gps_code , local_code  ,iata_code

In [37]:
columns_to_drop = ['gps_code','local_code' , 'iata_code']
Airport_df = Airport_df.drop(*columns_to_drop)

In [38]:
Airport_df.groupBy('type').count().toPandas()

Unnamed: 0,type,count
0,large_airport,627
1,balloonport,24
2,seaplane_base,1016
3,heliport,11287
4,closed,3606
5,medium_airport,4550
6,small_airport,33965


##### Compare Airport code from the two datasets 

In [39]:
Airport_df.select("ident").limit(5).toPandas()

Unnamed: 0,ident
0,00A
1,00AA
2,00AK
3,00AL
4,00AR


In [40]:
Airport_df[Airport_df['ident'] == 'AA'].toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,coordinates


In [41]:
Airport_df[Airport_df['ident'] == '00AA'].toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,coordinates
0,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,"-101.473911, 38.704022"


#### So we Need to transform All the Airline codes in Img dataset from for example 'AA' to '00AA'

In [42]:
AirlineFormat_udf = udf(lambda code: "00"+code if len(code) == 2 else code, StringType())

In [43]:
Img_Clean_df = Img_Clean_df.withColumn('Airline' , AirlineFormat_udf(Img_Clean_df['Airline']))

In [44]:
Img_Clean_df.limit(5).toPandas()

Unnamed: 0,Img_ID,Img_Year,Img_Month,Img_BCountry,Img_ResidenceCountry,Img_City,Transportation_Mode,State_Arrival,Age,Visa_Codes,Img_BYear,Img_gender,Airline,Flight_Number,Visa_Type,Visa_Post
0,15.0,2016.0,4.0,101.0,101.0,WAS,1.0,MI,55.0,2.0,1961.0,M,00OS,93,B2,
1,16.0,2016.0,4.0,101.0,101.0,NYC,1.0,MA,28.0,2.0,1988.0,,00AA,199,B2,
2,17.0,2016.0,4.0,101.0,101.0,NYC,1.0,MA,4.0,2.0,2012.0,,00AA,199,B2,
3,18.0,2016.0,4.0,101.0,101.0,NYC,1.0,MI,57.0,1.0,1959.0,,00AZ,602,B1,
4,19.0,2016.0,4.0,101.0,101.0,NYC,1.0,NJ,63.0,2.0,1953.0,,00AZ,602,B2,


In [45]:
Airport_df.limit(3).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,"-151.695999146, 59.94919968"


### Here we will define function for each Dataset to Clean the data
This is just a summary to the cells above so we don't need to run it one by one

In [77]:
def CleanImmiData(path):
    # Load data 
    immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(path)
    # Select Needed Columns and renaming them while loading using Exp
    Img_Clean_df = immigration_df.selectExpr('cicid as Img_ID'  , 'i94yr as Img_Year' ,'i94mon as Img_Month', 'i94cit as Img_BCountry',
                     'i94res as Img_ResidenceCountry','i94port as Img_City', 'i94mode as Transportation_Mode',
                      'i94addr as State_Arrival', 'i94bir as Age' ,'i94visa as Visa_Codes', 
                     'occup as OccupationState','biryear as Img_BYear',  'gender as Img_gender',
                      'airline as Airline' , 'fltno as Flight_Number',  'visatype as Visa_Type' , 'visapost as Visa_Post' , 'insnum as INS_Number')
    # Drop Useless Columns
    columns_to_drop = ['OccupationState','INS_Number']
    Img_Clean_df = Img_Clean_df.drop(*columns_to_drop)
    # Remove NULL values from Airline Column
    Img_Clean_df = Img_Clean_df.na.drop(subset=["Airline"])
    # Reformat the Airline Column from for Example : AA -> 00AA
    # Define udf
    AirlineFormat_udf = udf(lambda code: "00"+code if len(code) == 2 else code, StringType())
    # Run udf of Airline Column
    Img_Clean_df = Img_Clean_df.withColumn('Airline' , AirlineFormat_udf(Img_Clean_df['Airline']))
    
    # Return cleaned dataframe
    return Img_Clean_df

In [78]:
def CleanCitiesData(path):
    # Read the CSV
    cities_df = spark.read.option("delimiter"  , ";"  ).option("Header" , True).csv(path )
    cities_df.limit(5).toPandas()

    # Rename Columns with easier names
    cities_df = cities_df.withColumnRenamed("Median Age","Median_Age") \
        .withColumnRenamed("Male Population","Male_Population") \
        .withColumnRenamed("Female Population","Female_Population") \
        .withColumnRenamed("Total Population","Total_Population") \
        .withColumnRenamed("Number of Veterans","Num_Veterans") \
        .withColumnRenamed("Foreign-born","Foreign_born") \
        .withColumnRenamed("Average Household Size","Average_Household_Size") \
        .withColumnRenamed("State Code","State_Code")

    # Create view to run sql query on the data
    cities_df.createOrReplaceTempView("city_view")

    # Create new table with states summarization not for each city
    states_df = spark.sql("""
        SELECT  State_Code ,
                MAX(State) AS State_Name ,
                COUNT(*) AS Num_Cities ,
                AVG(Median_Age) AS AVG_Median_Age ,
                AVG(Male_Population) AS AVG_Male_Population,
                AVG(Female_Population) AS AVG_Female_Population ,
                INT(SUM(Total_Population)) AS Total_Population , 
                INT(SUM(Num_Veterans)) AS Num_Veterans,
                INT(SUM(Foreign_born)) AS Foreign_born,
                AVG(Average_Household_Size) AS Average_Household_Size
        FROM  city_view
        GROUP BY  State_Code
        LIMIT 5
        """)
    # Return cleaned dataframe
    return states_df

In [80]:
def CleanAirlineData(path):
    Airport_df = spark.read.option("Header" , True).csv('airport-codes_csv.csv' )
    
    # drop useless columns
    columns_to_drop = ['gps_code','local_code' , 'iata_code']
    Airport_df = Airport_df.drop(*columns_to_drop)
    
    # Return cleaned dataframe
    return Airport_df

## Saving Cleaned Datasets 

In [46]:
Img_Clean_df.write.parquet("/Output_Clean/Clean_I94_Immigration_Data.parquet")

In [47]:
Airport_df.write.parquet("/Output_Clean/Clean_AirportCode.parquet") 

In [48]:
cities_df.write.parquet("/Output_Clean/Clean_US_CityDemographic.parquet") 

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The Conceptual Data Model is as Follows:  
![](Star_Proj.png)
We chose the star schema to be able to do analysis easier and by having the Airline Data as dimension table we will be able to do our analysis based on the airline and the Airline type,name etc.
Just as the Airline dim table we have State dim table which will provide our analysis with deeper understanding about the states info and cities in each state.
and also the visa and Immigrants Dim table give us more personal info about the Immigrants


#### 3.2 Mapping Out Data Pipelines
Steps Needed to Create our Tables is : 
1. Load Cleaned Img Data frame to create :
- Fact_l94_Immigration_Table
- Dim_Immigrant_Table
- Dim_Visa_Table
2. Load Modified Airport_df to create :
- Dim_AirLine_Table
3. Load Modified cities_df to create : 
- Dim_States_Table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### 1) Read Saved files from stages 1,2

In [49]:
# Read parquet Files
Img_Clean_df=spark.read.parquet("/Output_Clean/Clean_I94_Immigration_Data.parquet")
Airport_df=spark.read.parquet("/Output_Clean/Clean_AirportCode.parquet")
cities_df=spark.read.parquet("/Output_Clean/Clean_US_CityDemographic.parquet")

#### 2) Create Fact Table 

In [50]:
Fact_l94_Immigration_Table =Img_Clean_df.select('Img_ID','Img_Year','Img_Month','Img_City','Transportation_Mode',
                                                 'State_Arrival','Airline','Flight_Number','Visa_Type')

In [51]:
Fact_l94_Immigration_Table.limit(5).toPandas()

Unnamed: 0,Img_ID,Img_Year,Img_Month,Img_City,Transportation_Mode,State_Arrival,Airline,Flight_Number,Visa_Type
0,878176.0,2016.0,4.0,LOS,1.0,CA,00CI,6,B2
1,878177.0,2016.0,4.0,LOS,1.0,CA,00PR,102,B2
2,878178.0,2016.0,4.0,LOS,1.0,CA,00PR,102,B2
3,878179.0,2016.0,4.0,LOS,1.0,CA,00KE,11,B2
4,878180.0,2016.0,4.0,LOS,1.0,CA,00KE,11,B2


#### 3) Create Dimension Tables

##### 3.1) Dim_Immigrant

In [52]:
Dim_Immigrant_Table = Img_Clean_df.select('Img_ID','Img_BCountry','Img_ResidenceCountry','Age','Img_BYear','Img_gender')

In [53]:
Dim_Immigrant_Table.limit(5).toPandas()

Unnamed: 0,Img_ID,Img_BCountry,Img_ResidenceCountry,Age,Img_BYear,Img_gender
0,878176.0,260.0,260.0,33.0,1983.0,F
1,878177.0,260.0,260.0,62.0,1954.0,M
2,878178.0,260.0,260.0,61.0,1955.0,F
3,878179.0,260.0,260.0,33.0,1983.0,F
4,878180.0,260.0,260.0,9.0,2007.0,F


##### 3.2) Dim_Visa

In [55]:
Dim_Visa_Table = Img_Clean_df.select('Visa_Type' ,'Visa_Codes','Visa_Post')

In [56]:
Dim_Visa_Table.limit(5).toPandas()

Unnamed: 0,Visa_Type,Visa_Codes,Visa_Post
0,B2,2.0,MNL
1,B2,2.0,MNL
2,B2,2.0,MNL
3,B2,2.0,MNL
4,B2,2.0,MNL


##### 3.2) Dim_States

In [57]:
Dim_States_Table = cities_df.select('State_Code','State_Name','Num_Cities','AVG_Median_Age','AVG_Male_Population',
                                     'AVG_Female_Population','Total_Population','Num_Veterans','Foreign_born',
                                                    'Average_Household_Size')

In [58]:
Dim_States_Table.limit(5).toPandas()

Unnamed: 0,State_Code,State_Name,Num_Cities,AVG_Median_Age,AVG_Male_Population,AVG_Female_Population,Total_Population,Num_Veterans,Foreign_born,Average_Household_Size
0,AZ,Arizona,80,35.0375,139215.9375,142005.4375,22497710,1322525,3411565,2.774375
1,SC,South Carolina,24,33.825,52720.458333,55070.208333,2586976,163334,134019,2.469583
2,LA,Louisiana,40,34.625,78374.75,84199.625,6502975,348855,417095,2.465
3,MN,Minnesota,54,35.57963,64422.277778,66025.222222,7044165,321738,1069888,2.496852
4,NJ,New Jersey,57,35.254386,60053.210526,61543.701754,6931024,146632,2327750,2.960877


##### 3.2) Dim_AirLines

In [59]:
Dim_AirLine_Table = Airport_df.selectExpr("ident AS Airline_ID" , "type AS Al_Type" ,'name AS Al_Name', 'elevation_ft AS Al_Elevation_FT',
                      'continent AS Al_Continent','iso_country AS Al_Iso_Country','iso_region AS Al_Iso_Region',
                      'municipality AS Al_Municipality','coordinates AS Al_Coordinates')

In [60]:
Dim_AirLine_Table.limit(5).toPandas()

Unnamed: 0,Airline_ID,Al_Type,Al_Name,Al_Elevation_FT,Al_Continent,Al_Iso_Country,Al_Iso_Region,Al_Municipality,Al_Coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,"-91.254898, 35.6087"


## Here we will define a Full ETL Pipeline Function that takes the raw data and transform it to the Fact/Dim tables

This is a summarization for all of the above steps in the notebook so we don't need to go through each cell one by one


In [81]:
def ETL_Immi_Fact(Originalpath):
    # Cleaning Function
    Clean_df = CleanImmiData(Originalpath)
    # Select the needed Columns
    Fact_l94_Immigration_Table =Clean_df.select('Img_ID','Img_Year','Img_Month','Img_City','Transportation_Mode',
                                                 'State_Arrival','Airline','Flight_Number','Visa_Type')
    # Return Fact Table Immigration
    return Fact_l94_Immigration_Table

In [85]:
def ETL_Immigrant_Dim(Originalpath):
    # Cleaning Function
    Clean_df = CleanImmiData(Originalpath)
    # Select Needed Columns
    Dim_Immigrant_Table = Clean_df.select('Img_ID','Img_BCountry','Img_ResidenceCountry','Age','Img_BYear','Img_gender')
    
    # Return Dim Table Immigrant
    return Dim_Immigrant_Table

In [84]:
def ETL_Visa_Dim(Originalpath):
    # Cleaning Function
    Clean_df = CleanImmiData(Originalpath)
    # Select Needed Columns
    Dim_Visa_Table = Clean_df.select('Visa_Type' ,'Visa_Codes','Visa_Post')
    # Return Dim table Visa
    return Dim_Visa_Table

In [86]:
def ETL_State_Dim(Originalpath):
    # Cleaning Function
    Clean_df = CleanCitiesData(Originalpath)
    # Select Needed Columns
    Dim_States_Table = Clean_df.select('State_Code','State_Name','Num_Cities','AVG_Median_Age','AVG_Male_Population',
                                     'AVG_Female_Population','Total_Population','Num_Veterans','Foreign_born',
                                                    'Average_Household_Size')
    # Return States Dim table
    return Dim_States_Table

In [87]:
def ETL_Airline_Dim(Originalpath):
    # Cleaning Function
    Clean_df = CleanAirlineData(Originalpath)
    # Select Needed Columns
    Dim_AirLine_Table = Clean_df.selectExpr("ident AS Airline_ID" , "type AS Al_Type" ,'name AS Al_Name', 'elevation_ft AS Al_Elevation_FT',
                      'continent AS Al_Continent','iso_country AS Al_Iso_Country','iso_region AS Al_Iso_Region',
                      'municipality AS Al_Municipality','coordinates AS Al_Coordinates')
    
    return Dim_AirLine_Table

#### 4) Save Output Tables 

Save Fact Table 

In [61]:
Fact_l94_Immigration_Table.write.parquet("/Output_Dim_Fact_Tables/Fact_l94_Immigration_Table.parquet") 

Save Dim Tables

In [62]:
Dim_AirLine_Table.write.parquet("/Output_Dim_Fact_Tables/Dim_AirLine_Table.parquet") 

In [63]:
Dim_States_Table.write.parquet("/Output_Dim_Fact_Tables/Dim_States_Table.parquet") 

In [64]:
Dim_Visa_Table.write.parquet("/Output_Dim_Fact_Tables/Dim_Visa_Table.parquet") 

In [65]:
Dim_Immigrant_Table.write.parquet("/Output_Dim_Fact_Tables/Dim_Immigrant_Table.parquet") 

#### 4.2 Data Quality Checks
For the Data Quality Check we will be creating to Functions :
 * Function to check if the Table are empty or not
 * Function to check if the column has duplicate values of not

In case the Check failed it will raise error for the user
So we will have two checks one on the table level and the other on the Columns level 

#### Let's Start by Defining our Functions


In [66]:
# Define Quality Check Function 1
def Check_DF_rows(df ,name):
    """
        Function Takes df and it's name and count number of rows 
        if rows == 0 
            return value error
        else
            test Passed
    """
    if df.count == 0:
        raise ValueError('Empty Table!!')
    else :
        print("Quality Check Passed!!")


In [67]:
# Define Quality Check Function 2
def Check_DF_Dup(df ,col):
    """
        Function Takes dataframe and a column name check if the column has duplicate values
        if has duplicate values 
            Raise Value error
        Else
            Print Check Passed !
    
    """
    if df.count() > df.dropDuplicates([col]).count():
        raise ValueError('Data has duplicates')
    else :
        print("Quality Check Passed!!")
    

In [68]:
# Read Saved Tables
Dim_AirLine_Table=spark.read.parquet("/Output_Dim_Fact_Tables/Dim_AirLine_Table.parquet")

In [69]:
Dim_AirLine_Table.columns

['Airline_ID',
 'Al_Type',
 'Al_Name',
 'Al_Elevation_FT',
 'Al_Continent',
 'Al_Iso_Country',
 'Al_Iso_Region',
 'Al_Municipality',
 'Al_Coordinates']

In [70]:
Check_DF_rows(Dim_AirLine_Table , "Dim_AirLine")

Quality Check Passed!!


In [71]:
Check_DF_Dup(Dim_AirLine_Table ,'Airline_ID' )

Quality Check Passed!!


### 4.3 Data dictionary 
#### 1) Fact_l94_Immigration
| Column Name |  Description |
|--|--|
|Img_ID | Unique record ID  |
| Img_Year | 4 digit year |
|Img_Month |  Numeric month |
|Img_City | Port of admission |
|Transportation_Mode | Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported) |
|State_Arrival | USA State of arrival |
|Airline | Airline used to arrive in U.S. |
|Flight_Number | Flight number of Airline used to arrive in U.S.  |
|Visa_Type |  Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

#### 2) Dim_Immigrant_Table

| Column Name |  Description |
|--|--|
|Img_ID|Unique record ID  |
|Img_BCountry| 3 digit code for immigrant country of birth  |
|Img_ResidenceCountry| 3 digit code for immigrant country of residence  |
|Age|  Age of Respondent in Years  |
|Img_BYear|  4 digit year of birth |
|Img_gender|Non-immigrant sex |


#### 3) Dim_Visa_Table
| Column Name |  Description |
|--|--|
|Visa_Type|    Class of admission legally admitting the non-immigrant to temporarily stay in U.S.    |
|Visa_Codes|   Visa codes collapsed into three categories    |
|Visa_Post|    Department of State where where Visa was issued    |


#### 4) Dim_AirLine_Table
| Column Name |  Description |
|--|--|
|Airline_ID|     Airline Unique ID  |
|Al_Type|     Airline Type      |
|Al_Name|       Airline Name   |
|Al_Elevation_FT|      Airline Elevation with FT    |
|Al_Continent|       Airline Continent   |
|Al_Iso_Country|       Airline Country   |
|Al_Iso_Region|        Airline Region  |
|Al_Municipality|        Airline Municipality  |
|Al_Coordinates|     Airline Coordinates     |

#### 5) Dim_States_Table
| Column Name |  Description |
|--|--|
|State_Code|   State Unique Code          |
|State_Name|          State Name   |
|Num_Cities|          Number of Cities in the State   |
|AVG_Median_Age|          AVG of the Median Age in the State   |
|AVG_Male_Population|         AVG of Male Population in the State    |
|AVG_Female_Population|           AVG OF Female Population in the State  |
|Total_Population|           Total Population  |
|Num_Veterans|           Number of Veterans in the state  |
|Foreign_born|           Number of Foreign Born in the State  |
|Average_Household_Size |       Average Household Size in the State      |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
I Used Spark because:
 - it's able to handle multiple file formats with large amounts of data.
 - It can also scale up or down based on the need for future increase of data.
 - Apache Spark is easy to use and give you the freedom to use python or sql to do your proccesses

* Propose how often the data should be updated and why.
 - Monthly because the dataset is given for each month so we will need to the monthly added data to our DWH to be able to do our analysis based on the newest trends
 
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
  - In this case i will be using AWS EC2 clusters to run spark on and Redshift as my DWH and the Dataset will be on S3 by doing so we will have the full freedom of the cloud based solution 
  
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
  - In this case we will be using Airflow DAGS to create our pipeline and schedule it to run everyday so we don't have to do it manually 
  
 * The database needed to be accessed by 100+ people.
  - Using Cloud will help us in that sitiuation and using Redshift as a DWH and even we can have Data marts for each group of peaple that needs to access the data

## Step 6 : Test Data Model

In this section we will try to query our data to prove that our data model works fine

In [88]:
Dim_Airline = ETL_Airline_Dim('airport-codes_csv.csv')
Fact_Immi = ETL_Immi_Fact('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [89]:
# Create views 
Fact_Immi.createOrReplaceTempView("FactImmi")
Dim_Airline.createOrReplaceTempView("DimAirline")

In [90]:
spark.sql("""
SELECT I.Airline , COUNT(*) as CountImmigrants
FROM FactImmi as I JOIN DimAirline A ON I.Airline = A.Airline_id
GROUP BY  I.Airline
ORDER BY CountImmigrants DESC
limit 10
""").toPandas()

Unnamed: 0,Airline,CountImmigrants
0,00AA,310091
1,00LA,43111
2,00CA,26012
3,00NK,20419
4,00AR,18052
5,00AZ,17610
6,00MT,14253
7,00VA,11899
8,00TN,3602
9,00WN,3418


Our Query shows the top 10 Airline that have the most number of immigrants so we can further investigate why specially Airline 00AA has such a very high number <br>
Also Our Query Proived that our Data Model Works Just as Expected