# Project Title
### Data Engineering Capstone Project

#### Project Summary
**Tools:**
- PySpark
- AWS S3
- AWS Redshift

**Raw data using:**
- Immigration data
- Airport-code data
- Cities demographics data
- Global temperature (Don't use)


**Pipeline:**

![Pipeline](./images/pipeline.png)
![AirFlow Pipeline](./images/airflow.png)

```
Fact Table: (us_immgration)
    cic_id FLOAT,
	i94_year FLOAT,
	i94_mode FLOAT,
	i94_port VARCHAR,
	i94_age FLOAT,
	birth_year FLOAT,
	gender VARCHAR(1),
	i94_address VARCHAR,
	i94_visa FLOAT,
	visa_type VARCHAR,
	arrival_date DATE,
	departure_date VARCHAR,
	airport_name VARCHAR,
	state_code VARCHAR,
	state_name VARCHAR,
	airport_city VARCHAR,
	total_population INT,
	PRIMARY KEY (cic_id)
```

```
Dim Table: (city_demographic)
    city varchar,
	"state" varchar,
	median_age FLOAT,
	male_population INT,
	female_population INT,
	total_Population INT,
	state_code varchar,
	PRIMARY KEY (state_code)
```

```
Dim Table: (airport)
    ident varchar,
	"type" varchar,
	"name" varchar,
	continent varchar,
	iso_country varchar,
	municipality varchar,
	iata_code varchar,
	"state" varchar,
	PRIMARY KEY (iata_code)
```

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
import pyspark.sql.functions as F
import configparser
from time import time
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id, year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import TimestampType

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [2]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

#### Read Immigration sas data

In [3]:

df_imm=spark.read.parquet("sas_data")
df_imm.printSchema()
print('Total Immigration records: ')
print(df_imm.count())
df_imm.limit(20).toPandas()


root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,...,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,...,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,...,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


#### Read airport csv file

In [4]:
df_airport = spark.read.csv('./airport-codes_csv.csv', header=True, inferSchema=True)

In [5]:
df_airport.printSchema()
print('Total airport records: ')
print(df_airport.count())
df_airport = df_airport.drop('local_code', 'coordinates', 'gps_code', 'elevation_ft')
df_airport.limit(20).toPandas()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

Total airport records: 
55075


Unnamed: 0,ident,type,name,continent,iso_country,iso_region,municipality,iata_code
0,00A,heliport,Total Rf Heliport,,US,US-PA,Bensalem,
1,00AA,small_airport,Aero B Ranch Airport,,US,US-KS,Leoti,
2,00AK,small_airport,Lowell Field,,US,US-AK,Anchor Point,
3,00AL,small_airport,Epps Airpark,,US,US-AL,Harvest,
4,00AR,closed,Newport Hospital & Clinic Heliport,,US,US-AR,Newport,
5,00AS,small_airport,Fulton Airport,,US,US-OK,Alex,
6,00AZ,small_airport,Cordes Airport,,US,US-AZ,Cordes,
7,00CA,small_airport,Goldstone /Gts/ Airport,,US,US-CA,Barstow,
8,00CL,small_airport,Williams Ag Airport,,US,US-CA,Biggs,
9,00CN,heliport,Kitchen Creek Helibase Heliport,,US,US-CA,Pine Valley,


#### Read us-cities demographics csv file

In [7]:
df_city = spark.read.csv('./us-cities-demographics.csv', header=True, inferSchema=True, sep=';')

In [8]:
df_city = df_city.withColumnRenamed('City', 'city').withColumnRenamed('State', 'state').withColumnRenamed('Median Age', 'median_age') \
.withColumnRenamed('Male Population', 'male_population').withColumnRenamed('Female Population', 'female_population') \
.withColumnRenamed('Total Population', 'total_Population').withColumnRenamed('State Code', 'state_code').withColumnRenamed('Race', 'race')
df_city.printSchema()
print('Total us-cities records: ')
print(df_city.count())
city_columns = ['city', 'state', 'median_age', 'male_population', 'female_population', 'total_Population', 'state_code', 'race']
df_city = df_city.select([col for col in city_columns])
df_city.limit(20).toPandas()


root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- race: string (nullable = true)
 |-- Count: integer (nullable = true)

Total us-cities records: 
2891


Unnamed: 0,city,state,median_age,male_population,female_population,total_Population,state_code,race
0,Silver Spring,Maryland,33.8,40601,41862,82463,MD,Hispanic or Latino
1,Quincy,Massachusetts,41.0,44129,49500,93629,MA,White
2,Hoover,Alabama,38.5,38040,46799,84839,AL,Asian
3,Rancho Cucamonga,California,34.5,88127,87105,175232,CA,Black or African-American
4,Newark,New Jersey,34.6,138040,143873,281913,NJ,White
5,Peoria,Illinois,33.1,56229,62432,118661,IL,American Indian and Alaska Native
6,Avondale,Arizona,29.1,38712,41971,80683,AZ,Black or African-American
7,West Covina,California,39.8,51629,56860,108489,CA,Asian
8,O'Fallon,Missouri,36.0,41762,43270,85032,MO,Hispanic or Latino
9,High Point,North Carolina,35.5,51751,58077,109828,NC,Asian


#### Read World Temperature csv file

In [8]:
df_temp = spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv', header=True, inferSchema=True)

In [9]:
df_temp.printSchema()
print('Total Temperature records: ')
print(df_temp.count())
df_temp.filter("Country = 'United States'").limit(5).toPandas()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

Total Temperature records: 
8599212


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
1,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
2,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
3,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
4,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [9]:
def count_null_column(spark_df):
    print('Total rows: ' + str(spark_df.count()))
    print('=== Count null value of each column ===')
    for column in spark_df.columns:
        print(column + ': ' + str(spark_df.filter("{} is NULL".format(column)).count()))

#### 2.1 Airport data exploration

In [10]:
count_null_column(df_airport)

Total rows: 55075
=== Count null value of each column ===
ident: 0
type: 0
name: 0
continent: 0
iso_country: 0
iso_region: 0
municipality: 5676
iata_code: 45886


=> [iata_code] coulmn have 45886 null value -> 45886/55075(Total record) = 83,3% => 83,3% of this Airport data is missing value for [iata_code] => **Solution 1:  remove null value of [iata_code] column**

In [11]:
# Solution 1: remove null value of [iata_code] column
print('Total airport records before: ')
print(df_airport.count())
df_airport = df_airport.dropna(how='any',subset='iata_code') # remove iata_code column
print('Total airport records after: ')
print(df_airport.count())
df_airport.limit(5).toPandas()

Total airport records before: 
55075
Total airport records after: 
9189


Unnamed: 0,ident,type,name,continent,iso_country,iso_region,municipality,iata_code
0,03N,small_airport,Utirik Airport,OC,MH,MH-UTI,Utirik Island,UTK
1,07FA,small_airport,Ocean Reef Club Airport,,US,US-FL,Key Largo,OCA
2,0AK,small_airport,Pilot Station Airport,,US,US-AK,Pilot Station,PQS
3,0CO2,small_airport,Crested Butte Airpark,,US,US-CO,Crested Butte,CSE
4,0TE7,small_airport,LBJ Ranch Airport,,US,US-TX,Johnson City,JCY


In [12]:
df_airport.createOrReplaceTempView("air_table")

In [13]:
# Solution 2: Select only US airport
df_airport = spark.sql("""SELECT *
                FROM air_table WHERE LEFT(iso_region, 2) = 'US'""")
df_airport.count()

2019

In [14]:
df_airport.createOrReplaceTempView("air_table")

In [15]:
# Solution 3: Convert iso_region to US state
df_airport = spark.sql("""SELECT *, SUBSTRING (iso_region, 4) as state
                FROM air_table""")

In [16]:
df_airport = df_airport.drop('iso_region')
df_airport.show(5)

+-----+-------------+--------------------+---------+-----------+-------------+---------+-----+
|ident|         type|                name|continent|iso_country| municipality|iata_code|state|
+-----+-------------+--------------------+---------+-----------+-------------+---------+-----+
| 07FA|small_airport|Ocean Reef Club A...|       NA|         US|    Key Largo|      OCA|   FL|
|  0AK|small_airport|Pilot Station Air...|       NA|         US|Pilot Station|      PQS|   AK|
| 0CO2|small_airport|Crested Butte Air...|       NA|         US|Crested Butte|      CSE|   CO|
| 0TE7|small_airport|   LBJ Ranch Airport|       NA|         US| Johnson City|      JCY|   TX|
| 13MA|small_airport|Metropolitan Airport|       NA|         US|       Palmer|      PMX|   MA|
+-----+-------------+--------------------+---------+-----------+-------------+---------+-----+
only showing top 5 rows



In [17]:
df_airport.createOrReplaceTempView("air_table")

In [18]:
spark.sql("""
SELECT iata_code, name, COUNT (name) as count
FROM air_table
GROUP BY iata_code, name
HAVING COUNT > 1
""").show()

+---------+--------------------+-----+
|iata_code|                name|count|
+---------+--------------------+-----+
|      AHT|Amchitka Army Air...|    2|
+---------+--------------------+-----+



There are some duplicated rows in Airport data => **Solution 2: drop some duplicated value**

In [19]:
# Solution 4: drop some duplicated value
print('Total airport records before: ')
print(df_airport.count())
df_airport = df_airport.dropDuplicates(["name"])
print('Total records after dropping duplicates: ')
print(df_airport.count())


Total airport records before: 
2019
Total records after dropping duplicates: 
2006


In [20]:
# Check no duplicated values in airport table
df_airport.createOrReplaceTempView("air_table_final")

In [21]:
spark.sql("""
SELECT iata_code, name, COUNT (name) as count
FROM air_table_final
GROUP BY iata_code, name
HAVING COUNT > 1
""").show()

+---------+----+-----+
|iata_code|name|count|
+---------+----+-----+
+---------+----+-----+



In [22]:
df_airport.show(5)
df_airport.printSchema()

+-----+--------------+--------------------+---------+-----------+------------+---------+-----+
|ident|          type|                name|continent|iso_country|municipality|iata_code|state|
+-----+--------------+--------------------+---------+-----------+------------+---------+-----+
| PAGQ| small_airport|    Big Lake Airport|       NA|         US|    Big Lake|      BGQ|   AK|
| KBNO|medium_airport|Burns Municipal A...|       NA|         US|       Burns|      BNO|   OR|
| PAFV| small_airport|   Five Mile Airport|       NA|         US|   Five Mile|      FMC|   AK|
| KFTW| large_airport|Fort Worth Meacha...|       NA|         US|  Fort Worth|      FTW|   TX|
|  FEW|      heliport|Francis E Warren ...|       NA|         US|    Cheyenne|      FEW|   WY|
+-----+--------------+--------------------+---------+-----------+------------+---------+-----+
only showing top 5 rows

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- c

##### Conclusion In Raw airport data: there 4 solutions to clean data:
- Solution 1: remove null value of [iata_code] column
- Solution 2: Select only US airport
- Solution 3: Convert iso_region to US state
- Solution 4: drop some duplicated value

final Airport Data Frame: df_airport

#### 2.2 Us-cities demographics data exploration

In [23]:
count_null_column(df_city)
df_city.show(2)

Total rows: 2891
=== Count null value of each column ===
city: 0
state: 0
median_age: 0
male_population: 3
female_population: 3
total_Population: 0
state_code: 0
race: 0
+-------------+-------------+----------+---------------+-----------------+----------------+----------+------------------+
|         city|        state|median_age|male_population|female_population|total_Population|state_code|              race|
+-------------+-------------+----------+---------------+-----------------+----------------+----------+------------------+
|Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|        MD|Hispanic or Latino|
|       Quincy|Massachusetts|      41.0|          44129|            49500|           93629|        MA|             White|
+-------------+-------------+----------+---------------+-----------------+----------------+----------+------------------+
only showing top 2 rows



There are 3 records with Null value for male_population and female_population

try to figure out these 3 records

In [24]:
df_city.createOrReplaceTempView("city_table")

In [25]:
spark.sql("""
SELECT * FROM city_table
where male_population is NULL OR female_population is Null OR (male_population is NULL AND female_population is Null)
""").show()

+------------+-------+----------+---------------+-----------------+----------------+----------+--------------------+
|        city|  state|median_age|male_population|female_population|total_Population|state_code|                race|
+------------+-------+----------+---------------+-----------------+----------------+----------+--------------------+
|The Villages|Florida|      70.5|           null|             null|           72590|        FL|  Hispanic or Latino|
|The Villages|Florida|      70.5|           null|             null|           72590|        FL|Black or African-...|
|The Villages|Florida|      70.5|           null|             null|           72590|        FL|               White|
+------------+-------+----------+---------------+-----------------+----------------+----------+--------------------+



The Village city in Florida have total_Population = 72590 included 3 race as above table => there are duplicated value => **Solution 2: Remove [race] column**

In [26]:
#  Solution 2.1: Remove [race] column
city_columns = ['city', 'state', 'median_age', 'male_population', 'female_population', 'total_Population', 'state_code']
df_city = df_city.select([col for col in city_columns])
df_city.limit(2).toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_Population,state_code
0,Silver Spring,Maryland,33.8,40601,41862,82463,MD
1,Quincy,Massachusetts,41.0,44129,49500,93629,MA


In [27]:
df_city.createOrReplaceTempView("city_table2")

In [28]:
spark.sql("""
SELECT city, state, median_age, male_population, female_population, total_Population, state_code, count(*) as count
FROM city_table2
GROUP BY city, state, median_age, male_population, female_population, total_Population, state_code
HAVING COUNT >1
""").createOrReplaceTempView("temp1")

spark.sql("""
SELECT city, state, median_age, male_population, female_population, total_Population, state_code, count(*) as count
FROM city_table2
GROUP BY city, state, median_age, male_population, female_population, total_Population, state_code
HAVING COUNT = 1
""").createOrReplaceTempView("temp2")

spark.sql("""
SELECT * FROM temp1
""").show(5)

spark.sql("""
SELECT * FROM temp2
""").show()

spark.sql("""
SELECT count(*) as total_distinct_value from temp2
""").show()

spark.sql("""
SELECT count(*) as total_duplicated_value from temp1
""").show()

+----------+----------+----------+---------------+-----------------+----------------+----------+-----+
|      city|     state|median_age|male_population|female_population|total_Population|state_code|count|
+----------+----------+----------+---------------+-----------------+----------------+----------+-----+
|     Bryan|     Texas|      29.4|          41761|            40345|           82106|        TX|    5|
|Fort Smith|  Arkansas|      34.9|          43346|            44849|           88195|        AR|    5|
|   Houston|     Texas|      32.6|        1149686|          1148942|         2298628|        TX|    5|
|   Phoenix|   Arizona|      33.8|         786833|           776168|         1563001|        AZ|    5|
|    Pomona|California|      32.1|          74945|            78307|          153252|        CA|    5|
+----------+----------+----------+---------------+-----------------+----------------+----------+-----+
only showing top 5 rows

+-------+-----------+----------+---------------+

We got 594 total_duplicated_value and 2 total_distinct_value => 594 + 2 = 596 distinct records => **Solution 3: drop duplicated values and expected total distinct records = 596**

In [29]:
#  Solution 3: drop duplicated values and expected total distinct records = 596
print('Total demographics records before: ')
print(df_city.count())
df_city = df_city.dropDuplicates(city_columns)
print('Total records after dropping duplicates: ')
print(df_city.count())

Total demographics records before: 
2891
Total records after dropping duplicates: 
596


In [30]:
df_city.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_Population: integer (nullable = true)
 |-- state_code: string (nullable = true)



##### Conclusion In Raw demographics  data: there 3 solution to clean data:
- Solution 1: Filter Country = United States only
- Solution 2: remove some unused column, included [race] column
- Solution 3: drop duplicated values => 596 records in total

final demographics Data Frame: df_city

#### 2.3 Global temperature data exploration

In [26]:
df_temp.show(3)

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 3 rows



In [27]:
# Define min time and max time of dataset
df_temp.agg(F.min("dt"), F.max("dt")).show()

+-------------------+-------------------+
|            min(dt)|            max(dt)|
+-------------------+-------------------+
|1743-11-01 00:00:00|2013-09-01 00:00:00|
+-------------------+-------------------+



 max year in temperature dataset is 2013=> No temperature data available for immigration dataset (2016 only)
 
 **=>I dont use this dataset for my capstone project**

#### 2.4 Immigration data exploration

In [71]:
df_imm.show(2)
df_imm.count()

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

3096313

In [33]:
df_imm.createOrReplaceTempView("imm_table")

In [34]:
# Solution 1:convert arrdate, depdate to datetime, create column name arrival_date and departure_date
spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM imm_table").createOrReplaceTempView("imm_table")
df_imm = spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                FROM imm_table""")
df_imm.createOrReplaceTempView("imm_table")

In [35]:
df_imm.show(3)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+--------------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|arrival_date|departure_date|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+--------------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|  2016-04-30|

In [38]:
# Solution 2: remove unused column
df_imm = df_imm.withColumnRenamed('cicid', 'cic_id').withColumnRenamed('i94yr', 'i94_year').withColumnRenamed('i94mode', 'i94_mode').withColumnRenamed('i94port', 'i94_port') \
.withColumnRenamed('i94addr', 'i94_address').withColumnRenamed('i94bir', 'i94_age').withColumnRenamed('biryear', 'birth_year') \
.withColumnRenamed('i94visa', 'i94_visa').withColumnRenamed('visatype', 'visa_type')
imm_columns = ['cic_id', 'i94_year', 'i94_mode', 'i94_port', 'i94_age', 'birth_year', 'gender', 'i94_address', 'i94_visa', 'visa_type', 'arrival_date', 'departure_date']
df_imm = df_imm.select([col for col in imm_columns])
df_imm.limit(5).toPandas()


Unnamed: 0,cic_id,i94_year,i94_mode,i94_port,i94_age,birth_year,gender,i94_address,i94_visa,visa_type,arrival_date,departure_date
0,5748517.0,2016.0,1.0,LOS,40.0,1976.0,F,CA,1.0,B1,2016-04-30,2016-05-08
1,5748518.0,2016.0,1.0,LOS,32.0,1984.0,F,NV,1.0,B1,2016-04-30,2016-05-17
2,5748519.0,2016.0,1.0,LOS,29.0,1987.0,M,WA,1.0,B1,2016-04-30,2016-05-08
3,5748520.0,2016.0,1.0,LOS,29.0,1987.0,F,WA,1.0,B1,2016-04-30,2016-05-14
4,5748521.0,2016.0,1.0,LOS,28.0,1988.0,M,WA,1.0,B1,2016-04-30,2016-05-14


In [39]:
# Solution 3: Ensure that departure_date >= arrival_date
df_imm = df_imm.filter("departure_date >= arrival_date")
df_imm.count()

2953481

For i94mode defination 
1 = 'Air'
2 = 'Sea'
3 = 'Land'
9 = 'Not reported'
=> **Filter Air only to map with Airport data**

In [40]:
df_imm = df_imm.filter("i94_mode == 1")

In [41]:
df_imm.printSchema()
df_imm.count()

root
 |-- cic_id: double (nullable = true)
 |-- i94_year: double (nullable = true)
 |-- i94_mode: double (nullable = true)
 |-- i94_port: string (nullable = true)
 |-- i94_age: double (nullable = true)
 |-- birth_year: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- i94_address: string (nullable = true)
 |-- i94_visa: double (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: string (nullable = true)



2871184

In [42]:
df_imm.createOrReplaceTempView("imm_table")

In [43]:
spark.sql("""
SELECT gender, count(*) as count
FROM imm_table
GROUP BY gender
""").show()

+------+-------+
|gender|  count|
+------+-------+
|     F|1190456|
|  null| 405106|
|     M|1274898|
|     U|     13|
|     X|    711|
+------+-------+



In [44]:
# Solution 4: Keep only Male and Female gender
df_imm = df_imm.filter("gender in('M', 'F')").dropDuplicates()

In [45]:
print(df_imm.count())
df_imm.show(3)
df_imm.printSchema()

2465354
+------+--------+--------+--------+-------+----------+------+-----------+--------+---------+------------+--------------+
|cic_id|i94_year|i94_mode|i94_port|i94_age|birth_year|gender|i94_address|i94_visa|visa_type|arrival_date|departure_date|
+------+--------+--------+--------+-------+----------+------+-----------+--------+---------+------------+--------------+
| 206.0|  2016.0|     1.0|     NYC|   27.0|    1989.0|     M|         CO|     2.0|       WT|  2016-04-01|    2016-04-09|
| 358.0|  2016.0|     1.0|     ORL|   36.0|    1980.0|     M|         FL|     2.0|       WT|  2016-04-01|    2016-04-17|
| 529.0|  2016.0|     1.0|     PHI|   30.0|    1986.0|     M|         NV|     2.0|       WT|  2016-04-01|    2016-04-08|
+------+--------+--------+--------+-------+----------+------+-----------+--------+---------+------------+--------------+
only showing top 3 rows

root
 |-- cic_id: double (nullable = true)
 |-- i94_year: double (nullable = true)
 |-- i94_mode: double (nullable = tru

##### Conclusion Immigration data: there 4 solutions to clean data:
- Solution 1: convert arrdate, depdate to datetime, create column name arrival_date and departure_date
- Solution 2: remove unused column
- Solution 3: Ensure that departure_date >= arrival_date
- Solution 4: Keep only Male and Female gender

final Immigration Data Frame: df_imm

#### Cleaning Steps
Document steps necessary to clean the data

please check at exploration step

#### Upload to Staging S3

Document steps necessary to clean the data

In [46]:
config_file_path = 'dwh_p.cfg'
config = configparser.ConfigParser()
config.read_file(open(config_file_path))

In [47]:
os.environ['AWS_ACCESS_KEY_ID'] = config.get('AWS', 'key')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('AWS', 'secret')
S3_BUCKET_OUTPUT = config.get("S3", "S3_BUCKET_OUTPUT")
output_data = "s3a://{}/".format(S3_BUCKET_OUTPUT)

print(os.environ['AWS_ACCESS_KEY_ID'])
print(os.environ['AWS_SECRET_ACCESS_KEY'])



AKIATG7HQRUYE6T2DXEF
vdLGc2XA/K3zK9dxZj2NADD9BC1VvVKk8F599fR4


In [48]:
df_imm.limit(40).write.mode('overwrite').parquet('output_data_workspace/df_imm')

In [134]:
df_airport.limit(40).write.mode('overwrite').parquet('output_data_workspace/df_airport')

In [99]:
df_city.limit(40).write.mode('overwrite').parquet('output_data_workspace/df_city')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Run pipline

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

Run pipline

#### 4.3 Data dictionary 
![Pipeline](./images/pipeline.png)

1. Create/use IAM user with attach policy ```AdministratorAccess```, ```AmazonRedshiftFullAccess```, ```AmazonS3FullAccess```
2. Modify that Aws_access_key and aws_secret_key to ```dwh_p.cfg``` file
3. Run ```python aws_helper/create_cluster.py``` to create Redshift cluster (~ 3 mins wait)
4. Run ```python aws_helper/create_s3.py``` to create S3 bucket
5. In  ```dwh_p.cfg``` file will autofill redshift cluster host name: (or capture host name from log):
6. Read and Run ```CapstoneProject Template.ipynb``` to explore and clean data
7. Start airflow UI: ```/opt/airflow/start.sh``` then click ```Access Airflow``` button
8. Config redshift aws credential manually
9. Run airflow dag file

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

**Clearly state the rationale for the choice of tools and technologies for the project.**
Based on Project 4 and Project 5 in Udacity I decided to apply Data lake and Data Pipeline to practice DE course

**Propose how often the data should be updated and why.**
immigration data will be updated monthly as current behaviour

**The data was increased by 100x.**
PySpark can handle these data and AWS redshift can increase the number of nodes to improve the speed

**The data populates a dashboard that must be updated on a daily basis by 7am every day.**
AirFlow pipeline can help that with scheduler

**The database needed to be accessed by 100+ people.**
https://docs.aws.amazon.com/redshift/latest/mgmt/amazon-redshift-limits.html
Maximum number of connections that you can create using the query editor v2 in this account in the current Region is 500
So AWS Redshift can help with this