# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
import datetime as dt
from datetime import datetime, timedelta, date
import pyspark
from pyspark.sql.functions import udf, to_date, col, year, month, date_format, format_number, isnull
from pyspark.sql.functions import isnan, when, count, col ,udf, monotonically_increasing_id, year, month
from pyspark.sql import SparkSession
import pyspark.sql.types as t
from pyspark.sql.types import IntegerType, FloatType, StringType, StructType
from pyspark.sql.types import *
import logging
import configparser
import os




### Step 1: Scope the Project and Gather Data

#### Scope 
- Create a data warehouse using Apache Spark by execute ETL pipeline after cleaning the data, and then write them to parquet files and store them in an output folder.
- Star schema will be created that contains 1 fact table and 3 dimension tables.
- Another tools will be used tp prepare our data like panadas.
- The goal of the project is to analyze the U.S. immigration data regarding several perspectives.

#### Describe and Gather Data 

- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
- World Temperature Data: This dataset came from Kaggle. 
- U.S. City Demographic Data: This data comes from OpenSoft. 
- Airport Code Table: This is a simple table of airport codes and corresponding cities. 

### Load Configuration Data

In [2]:
#config = configparser.ConfigParser()
#config.read('config.cfg')

#os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
#os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

- ### Immigration Data

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [4]:
#write to parquet
#df_immigration.write.parquet("sas_data")
#df_immigration=spark.read.parquet("sas_data")


#df_immigration.write.mode('overwrite').parquet("sas_data")
#df_immigration=spark.read.parquet("sas_data")
#df_immigration.limit(10).show

In [5]:
df_immigration.count()

3096313

In [6]:
#print its head
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [7]:
print((df_immigration.count(), len(df_immigration.columns)))

(3096313, 28)


In [8]:
df_immigration.summary("count").show()

+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+-----+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+--------+
|summary|  cicid|  i94yr| i94mon| i94cit| i94res|i94port|arrdate|i94mode|i94addr|depdate| i94bir|i94visa|  count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto| gender|insnum|airline| admnum|  fltno|visatype|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+-----+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+--------+
|  count|3096313|3096313|3096313|3096313|3096313|3096313|3096313|3096074|2943721|2953856|3095511|3096313|3096313| 3096312| 1215063| 8126|3096075|2957884|    392|2957884|3095511|3095836|2682044|113708|3012686|3096313|3076764| 3096313|
+-------+-------+-------+-------+-------+-------+-------+-------

As shown from the previous code:
- some columns' names are not clear enough.
- some columns have high missing values ("visapost", "occup", "entdepu" and "insnum"). This columns need to be dropped.

In [9]:
#check for duplicates
df_immigration.distinct().count() == df_immigration.count()

True

As shown, the dataset does not have any duplicates

In [10]:
#print the type of each column
df_immigration.dtypes

[('cicid', 'double'),
 ('i94yr', 'double'),
 ('i94mon', 'double'),
 ('i94cit', 'double'),
 ('i94res', 'double'),
 ('i94port', 'string'),
 ('arrdate', 'double'),
 ('i94mode', 'double'),
 ('i94addr', 'string'),
 ('depdate', 'double'),
 ('i94bir', 'double'),
 ('i94visa', 'double'),
 ('count', 'double'),
 ('dtadfile', 'string'),
 ('visapost', 'string'),
 ('occup', 'string'),
 ('entdepa', 'string'),
 ('entdepd', 'string'),
 ('entdepu', 'string'),
 ('matflag', 'string'),
 ('biryear', 'double'),
 ('dtaddto', 'string'),
 ('gender', 'string'),
 ('insnum', 'string'),
 ('airline', 'string'),
 ('admnum', 'double'),
 ('fltno', 'string'),
 ('visatype', 'string')]

As shown the type of the date should be fixed

- ### Temperature Data

In [11]:
#read the data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(fname)

In [12]:
#read the data's head
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [13]:
print(df_temperature['Country'].unique())

['Denmark' 'Turkey' 'Kazakhstan' 'China' 'Spain' 'Germany' 'Nigeria' 'Iran'
 'Russia' 'Canada' "Côte D'Ivoire" 'United Kingdom' 'Saudi Arabia' 'Japan'
 'United States' 'India' 'Benin' 'United Arab Emirates' 'Mexico'
 'Venezuela' 'Ghana' 'Ethiopia' 'Australia' 'Yemen' 'Indonesia' 'Morocco'
 'Pakistan' 'France' 'Libya' 'Burma' 'Brazil' 'South Africa' 'Syria'
 'Egypt' 'Algeria' 'Netherlands' 'Malaysia' 'Portugal' 'Ecuador' 'Italy'
 'Uzbekistan' 'Philippines' 'Madagascar' 'Chile' 'Belgium' 'El Salvador'
 'Romania' 'Peru' 'Colombia' 'Tanzania' 'Tunisia' 'Turkmenistan' 'Israel'
 'Eritrea' 'Paraguay' 'Greece' 'New Zealand' 'Vietnam' 'Cameroon' 'Iraq'
 'Afghanistan' 'Argentina' 'Azerbaijan' 'Moldova' 'Mali'
 'Congo (Democratic Republic Of The)' 'Thailand' 'Central African Republic'
 'Bosnia And Herzegovina' 'Bangladesh' 'Switzerland' 'Equatorial Guinea'
 'Cuba' 'Lebanon' 'Mozambique' 'Serbia' 'Angola' 'Somalia' 'Norway' 'Nepal'
 'Poland' 'Ukraine' 'Guinea Bissau' 'Malawi' 'Burkina Faso' 'Slova

- as shown the dataset has data related to other countaries in countary column

In [14]:
df_temperature.shape

(8599212, 7)

In [15]:
print(df_temperature.count())

dt                               8599212
AverageTemperature               8235082
AverageTemperatureUncertainty    8235082
City                             8599212
Country                          8599212
Latitude                         8599212
Longitude                        8599212
dtype: int64


- As shown from the previous code some columns have missing values.

In [16]:
#check for duplicates
duplicateRows = df_temperature[df_temperature.duplicated()]
print(duplicateRows)

Empty DataFrame
Columns: [dt, AverageTemperature, AverageTemperatureUncertainty, City, Country, Latitude, Longitude]
Index: []


- As shown the data does not have any duplicates

In [17]:
#print the type of each column
df_temperature.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


- The type of date time need to be modified to datetime

- ### U.S. City Demographic Data

In [18]:
#read the data
df_demographic = pd.read_csv('us-cities-demographics.csv', sep=';')

In [19]:
#read the head of the data
df_demographic.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [20]:
df_demographic.shape

(2891, 12)

In [21]:
#print the count of data in each column
print(df_demographic.count())

City                      2891
State                     2891
Median Age                2891
Male Population           2888
Female Population         2888
Total Population          2891
Number of Veterans        2878
Foreign-born              2878
Average Household Size    2875
State Code                2891
Race                      2891
Count                     2891
dtype: int64


As shown from the previous code some columns have some missing values.

In [22]:
#check for duplicates
duplicateRows = df_demographic[df_demographic.duplicated()]
print(duplicateRows)

Empty DataFrame
Columns: [City, State, Median Age, Male Population, Female Population, Total Population, Number of Veterans, Foreign-born, Average Household Size, State Code, Race, Count]
Index: []


As shown the data does not have any duplicates

In [23]:
#print the type of each column
df_demographic.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


As shown, all the column types are correctly coded.

- Drop all the rows that contain other countries than the US

### Step 2: Explore and Assess the Data
#### Explore the Data 
- ### Immigration Data
some columns' names are not clear enough.
some columns have high missing values ("visapost", "occup", "entdepu" and "insnum"). This columns need to be dropped.
Type of the date should be fixed

- ### Temperature Data
some columns have missing values.
The type of date needs to be fixed.
The dataset has data related to other countaries in countary column
The name of column dt is not clear

- ### U.S. City Demographic Data
some columns have some missing values.

#### Cleaning Steps
1- Drop "visapost", "occup" and "insnum" columns in immigration dataset.

2- Replace the missing values and nan in other columns with zero

3- Convert the type of some columns to integer

4- Rename the unclear named columns in immigration dataset.

5- change the type of date column to DateTime in immigration dataset.

6- Drop the missing data rows in temperature dataset.

7- - Drop the rows that has data to other countaries in countary column in temperature dataset.

8- Change the type of dt to datetime in temperature dataset.

9- Rename the dt column.

10- Drop the missing data rows in demographic dataset.


In [24]:
# drop the rows that have missing values in immigration dataset
clean_immigration = df_immigration.drop('visapost', 'occup', 'insnum')

In [25]:
#check the previous code
print(list(clean_immigration.columns))

['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'dtadfile', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'airline', 'admnum', 'fltno', 'visatype']


In [26]:
#replace the missing values and nan in other columns with zero
clean_immigration = clean_immigration.fillna(0)

In [27]:
#rename the unclear named columns

clean_immigration = clean_immigration.select(['cicid', 'i94yr', 'i94mon', 'i94port', \
                                              'arrdate', 'i94addr', 'depdate']).distinct()\
.withColumnRenamed("cicid","cicid")\
.withColumnRenamed("i94yr","year")\
.withColumnRenamed("i94mon","month")\
.withColumnRenamed("i94port","Port_of_admission")\
.withColumnRenamed("arrdate","arrival_date")\
.withColumnRenamed("i94addr","state_code")\
.withColumnRenamed("depdate","departure_date")


In [28]:
#check the previous code
print(list(clean_immigration.columns))

['cicid', 'year', 'month', 'Port_of_admission', 'arrival_date', 'state_code', 'departure_date']


In [29]:
#convert the type of some columns to integer

clean_immigration = clean_immigration.withColumn("year",clean_immigration.year.cast(IntegerType()))\
                  .withColumn("month",clean_immigration.month.cast(IntegerType()))\
                  .withColumn("arrival_date",clean_immigration.arrival_date.cast(IntegerType()))\
                  .withColumn("departure_date",clean_immigration.departure_date.cast(IntegerType()))


In [30]:
#check the previous code
print(clean_immigration.dtypes)

[('cicid', 'double'), ('year', 'int'), ('month', 'int'), ('Port_of_admission', 'string'), ('arrival_date', 'int'), ('state_code', 'string'), ('departure_date', 'int')]


In [31]:
clean_immigration.limit(5).toPandas()

Unnamed: 0,cicid,year,month,Port_of_admission,arrival_date,state_code,departure_date
0,181.0,2016,4,WAS,20545,MD,20554
1,394.0,2016,4,MIA,20545,FL,20559
2,596.0,2016,4,NAS,20545,FL,20547
3,790.0,2016,4,ATL,20545,FL,20559
4,1112.0,2016,4,NEW,20545,NY,20554


In [32]:
#drop the missing data rows
clean_temperature = df_temperature.dropna()

In [33]:
print(clean_temperature.count())

dt                               8235082
AverageTemperature               8235082
AverageTemperatureUncertainty    8235082
City                             8235082
Country                          8235082
Latitude                         8235082
Longitude                        8235082
dtype: int64


In [34]:
#drop other data related to other countaries but United States
clean_temperature = clean_temperature.loc[clean_temperature['Country'] == 'United States'] 

In [35]:
#check the previous code
print(clean_temperature['Country'].unique())

['United States']


In [36]:
#change the type of dt to datetime
clean_temperature['dt']= pd.to_datetime(clean_temperature['dt'])

In [37]:
#check the previous code
clean_temperature.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 661524 entries, 47555 to 8439246
Data columns (total 7 columns):
dt                               661524 non-null datetime64[ns]
AverageTemperature               661524 non-null float64
AverageTemperatureUncertainty    661524 non-null float64
City                             661524 non-null object
Country                          661524 non-null object
Latitude                         661524 non-null object
Longitude                        661524 non-null object
dtypes: datetime64[ns](1), float64(2), object(4)
memory usage: 40.4+ MB


In [38]:
#rename the dt column
clean_temperature = clean_temperature[['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country', 'Latitude', 'Longitude']]
clean_temperature.columns = ['Date_time', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country', 'Latitude', 'Longitude']
clean_temperature.head()

Unnamed: 0,Date_time,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [39]:
#drop the missing data rows
clean_demographic = df_demographic.dropna()

In [42]:
#check the previous code
print(clean_demographic.count())

City                      2875
State                     2875
Median Age                2875
Male Population           2875
Female Population         2875
Total Population          2875
Number of Veterans        2875
Foreign-born              2875
Average Household Size    2875
State Code                2875
Race                      2875
Count                     2875
dtype: int64


### Step 3: Define the Data Model
1- fact table

cicid --> Unique identifier

i94yr --> year

i94mon --> month

i94port --> Port_of_admission

arrdate --> arrival_date

i94addr --> state_code

depdate --> departure_date


# Dimension tables

1- dim_visa

cicid --> Unique identifier

i94mode --> mode of transportation

i94visa --> visa

airline --> airline

fltno --> flight number

visatype --> visa type


2- dim_flag

cicid --> Unique identifier

entdepa --> Arrival_Flag

entdepd --> Departure_Flag

entdepu --> Update_Flag

matflag --> Match_Flag

3- dim_immigrant

cicid --> Unique identifier

i94cit --> citizenship_code

i94res --> residence_code

i94bir -->  birthday

biryear --> birthday_year

gender --> gender

4- dim_city : City ,State, State_Code, Race

5- dim_Population : Median_Age, Male_Population, Female_Population, Total_Population, Number_of_Veterans, Foreign_born'
       Average_Household_Size, State_Code, Race
       
6- dim_temperature: dt (date time), Average Temperature, Average Temperature Uncertainty, City, Country

#### 3.2 Mapping Out Data Pipelines

- Upload the datasets and follow the cleaning steps then parquet them using spark

### Step 4: Run Pipelines to Model the Data 


1- create IAM user on AWS and add the secret and access keys in dl.cfg
 
2- create EMR cluster on AWS and edit the output path in etl.py
    
3- run etl.py file 

#### 4.2 Data Quality Checks

In [44]:
#read the created tables in spark
dim_city = spark.read.csv('dim_city')
dim_flag = spark.read.csv('dim_flag')
dim_immigrant = spark.read.csv('dim_immigrant')
dim_Population = spark.read.csv('dim_Population')
dim_temperature = spark.read.csv('dim_temperature')
fact_immigration = spark.read.csv('fact_immigration')

In [45]:
#lis the tables and the table names
table = ['dim_city', 'dim_flag', 'dim_immigrant', 'dim_Population', 'dim_temperature', 'fact_immigration']
table_name = ['dim_city', 'dim_flag', 'dim_immigrant', 'dim_Population', 'dim_temperature', 'fact_immigration']

In [46]:
#first quality check: check whether there is an empty table or not.

def quality_check(table, table_name):
    print(f'Checking {table_name}')
    rows_num = table.count()
    if rows_num == 0:
        raise ValueError(f'{table_name} table has no rows')
    print(f'{table_name} table has {rows_num} rows')
print(quality_check(dim_city, "dim_city"))
print(quality_check(dim_flag, "dim_flag"))
print(quality_check(dim_immigrant, "dim_immigrant"))
print(quality_check(dim_Population, "dim_Population"))
print(quality_check(dim_temperature, "dim_temperature"))
print(quality_check(fact_immigration, "fact_immigration"))


Checking dim_city
dim_city table has 2892 rows
None
Checking dim_flag
dim_flag table has 236 rows
None
Checking dim_immigrant
dim_immigrant table has 2681244 rows
None
Checking dim_Population
dim_Population table has 2876 rows
None
Checking dim_temperature
dim_temperature table has 661525 rows
None
Checking fact_immigration
fact_immigration table has 2817745 rows
None


In [47]:
#quality check 2: check whether there is a duplicates in the tables or not.

def quality_check_2 (table, table_name):
    duplicates = table.distinct().count() == table.count()
    if duplicates != True:
        raise ValueError(f'{table_name} table has duplicates')
    print(f'{table_name} doesn\'t have duplicates')
    
print(quality_check_2(dim_city, "dim_city"))
print(quality_check_2(dim_flag, "dim_flag"))
print(quality_check_2(dim_immigrant, "dim_immigrant"))
print(quality_check_2(dim_Population, "dim_Population"))
print(quality_check_2(dim_temperature, "dim_temperature"))
print(quality_check_2(fact_immigration, "fact_immigration"))


dim_city doesn't have duplicates
None
dim_flag doesn't have duplicates
None
dim_immigrant doesn't have duplicates
None
dim_Population doesn't have duplicates
None
dim_temperature doesn't have duplicates
None
fact_immigration doesn't have duplicates
None


#### 4.3 Data dictionary 
kindly check the Data dictionary file

#### Step 5: 

# Tools:

- Use Spark to process the data efficiently in a distributed way with EMR. 
- pandas would be helpful in cleaning the data and faster
- Parquet Columnar storage and partitioning of our data allows us to optimize queries for analysis.

# Updating the data:

Since the fact table was extracted from immigration data as well as 3 dimension tables, I recommend updating the data monthly to be current.

# Hypothetical Scenarios:

- Increase the size of the data by 100 times: In this case, I recommend increasing the number of the nodes.

- The data fills a dashboard that needs to be updated daily by 7am:
In this case, it is more helpful to create a DAG and use Airflow.

- The database needs to be accessed by more than 100 people: AWS is definitely the ideal solution in this case.


