# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project is built to prepare data for source-of-truth database where compile and gather data from multiple sources

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [5]:
# Do all imports and installs here
import pandas as pd
import pyspark

### Step 1: Scope the Project and Gather Data

#### Scope 
This project will use two dataset I94 immigration data and US demographic data to create a data warehouse which include dimensional models.

**Data Sets**

- I94 Immigration Data
- U.S. City Demographic Data

**Tools and libraries:**

- *AWS S3*: data storage
- *Python libraries:*
    - *Pandas*  - using to explore and execute on the small dataset
    - *PySpark* - using to explore and execute on the large dataset

#### Describe the dataset 
| Dataset             |Format |Description|
|---------------------|-------|-----------|
|I94 Immigration Data |SAS    |Data gather the information of visitors likes gender, age, visa, etc.|
|U.S. City Demographic Data|CSV|This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.|

### Step 2: Explore and Assess the Data
#### Explore the dataset : I94 Immigration Data

In [2]:
# Read in the data here
df_immi = pd.read_csv("immigration_data_sample.csv")

In [3]:
pd.set_option('display.max_columns', None)

In [4]:
df_immi.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [5]:
df_immi.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float64
airline       967 non

With above information, we can easily remove the columns which are almost null value like: *visapost, occup, entdepu, insnum*. The remain columns will be picked into fact and dimensional tables.

In [6]:
fact_immigration = df_immi[['cicid','i94port','i94addr','arrdate','fltno','visatype']].copy()
fact_immigration.columns = ['cic_id','city_code','state_code','arrival_date','flight_number','visa_type']
fact_immigration['stay_duration'] = df_immi.depdate - df_immi.arrdate
fact_immigration.head(5)

Unnamed: 0,cic_id,city_code,state_code,arrival_date,flight_number,visa_type,stay_duration
0,4084316.0,HHW,HI,20566.0,00782,WT,7.0
1,4422636.0,MCA,TX,20567.0,XBLNG,B2,1.0
2,1195600.0,OGG,FL,20551.0,00464,WT,20.0
3,5291768.0,LOS,CA,20572.0,00739,B2,9.0
4,985523.0,CHM,NY,20550.0,LAND,WT,3.0


In [7]:
arrival_date = list(df_immi.arrdate)
arrival_datetime = [pd.Timestamp('1960-1-1') + pd.to_timedelta(i,unit='D') for i in arrival_date]
arrival_year = list(pd.DatetimeIndex(arrival_datetime).year)
arrival_month = list(pd.DatetimeIndex(arrival_datetime).month)
arrival_day = list(pd.DatetimeIndex(arrival_datetime).day)
dim_arrival_date = pd.DataFrame({'arrival_date':arrival_date,'arrival_datetime':arrival_datetime,'year':arrival_year,'month':arrival_month,'day':arrival_day})
dim_arrival_date.head()

Unnamed: 0,arrival_date,arrival_datetime,year,month,day
0,20566.0,2016-04-22,2016,4,22
1,20567.0,2016-04-23,2016,4,23
2,20551.0,2016-04-07,2016,4,7
3,20572.0,2016-04-28,2016,4,28
4,20550.0,2016-04-06,2016,4,6


In [8]:
dim_visa = df_immi[['i94visa','visatype']].drop_duplicates().reset_index(drop=True)
dim_visa.columns = ['visa_purpose','visa_type']
dim_visa.head()

Unnamed: 0,visa_purpose,visa_type
0,2.0,WT
1,2.0,B2
2,2.0,CP
3,1.0,B1
4,2.0,GMT


In [9]:
dim_personal = df_immi[['cicid','i94cit','i94res','biryear','gender']].copy()
dim_personal.columns = ['cic_id','country_of_birth','country_of_residence','year_of_birth','gender']
dim_personal.head(5)

Unnamed: 0,cic_id,country_of_birth,country_of_residence,year_of_birth,gender
0,4084316.0,209.0,209.0,1955.0,F
1,4422636.0,582.0,582.0,1990.0,M
2,1195600.0,148.0,112.0,1940.0,M
3,5291768.0,297.0,297.0,1991.0,M
4,985523.0,111.0,111.0,1997.0,F


In [10]:
dim_airline = df_immi[['fltno','airline']].copy()
dim_airline.columns = ['flight_number','airline_brand']
dim_airline.drop_duplicates(inplace=True)
dim_airline.head(5)

Unnamed: 0,flight_number,airline_brand
0,00782,JL
1,XBLNG,*GA
2,00464,LH
3,00739,QR
4,LAND,


#### Explore the Data : demography dataset

In [11]:
df_demography = pd.read_csv('us-cities-demographics.csv',sep=';')
df_demography.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [12]:
dim_location = df_demography[['City', 'State','State Code', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size']].copy()
dim_location.columns = ['city_name','state_name','state_code','median_age','male_population','female_population',
                       'total_population','number_of_veterans','foreign_born','avg_household_size']
dim_location.head(5)

Unnamed: 0,city_name,state_name,state_code,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,avg_household_size
0,Silver Spring,Maryland,MD,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6
1,Quincy,Massachusetts,MA,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39
2,Hoover,Alabama,AL,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58
3,Rancho Cucamonga,California,CA,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18
4,Newark,New Jersey,NJ,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73


#### Cleaning Steps

Get the city_code from I94_SAS_Labels_Descriptions.SAS to give the relationship between the fact_table and dim_location through the city's name

In [15]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [16]:
city_dict = {}
for city in contents[303:962]:
    code = city.split("'")[1]
    city_name = city.split("'")[3].split(",")[0]
    city_dict[code] = city_name
pd.DataFrame.from_dict(city_dict,orient='index',columns=['city']).head()

Unnamed: 0,city
ANC,ANCHORAGE
BAR,BAKER AAF - BAKER ISLAND
DAC,DALTONS CACHE
PIZ,DEW STATION PT LAY DEW
DTH,DUTCH HARBOR


In [17]:
fact_immigration['city_name']=[city_dict[i] for i in fact_immigration.city_code]
fact_immigration.head()

Unnamed: 0,cic_id,city_code,state_code,arrival_date,flight_number,visa_type,stay_duration,city_name
0,4084316.0,HHW,HI,20566.0,00782,WT,7.0,HONOLULU
1,4422636.0,MCA,TX,20567.0,XBLNG,B2,1.0,MCALLEN
2,1195600.0,OGG,FL,20551.0,00464,WT,20.0,KAHULUI - MAUI
3,5291768.0,LOS,CA,20572.0,00739,B2,9.0,LOS ANGELES
4,985523.0,CHM,NY,20550.0,LAND,WT,3.0,CHAMPLAIN


In [18]:
#Turn the city's name of dim_location into upper to match with fact_table
dim_location['city_name'] = dim_location['city_name'].str.upper()
dim_location.head(5)

Unnamed: 0,city_name,state_name,state_code,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,avg_household_size
0,SILVER SPRING,Maryland,MD,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6
1,QUINCY,Massachusetts,MA,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39
2,HOOVER,Alabama,AL,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58
3,RANCHO CUCAMONGA,California,CA,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18
4,NEWARK,New Jersey,NJ,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73


Drop duplication in each table

In [19]:
fact_immigration.drop_duplicates(inplace=True,subset=['cic_id'])
dim_airline.drop_duplicates(inplace=True,subset=['flight_number'])
dim_arrival_date.drop_duplicates(inplace=True,subset=['arrival_date'])
dim_location.drop_duplicates(inplace=True,subset=['city_name'])
dim_personal.drop_duplicates(inplace=True,subset=['cic_id'])
dim_visa.drop_duplicates(inplace=True,subset=['visa_type'])

Drop Nan value in each table

In [20]:
fact_immigration.dropna(inplace=True,subset=['cic_id'])
dim_airline.dropna(inplace=True,subset=['flight_number'])
dim_arrival_date.dropna(inplace=True,subset=['arrival_date'])
dim_location.dropna(inplace=True,subset=['city_name'])
dim_personal.dropna(inplace=True,subset=['cic_id'])
dim_visa.dropna(inplace=True,subset=['visa_type'])

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

![schema](images/schema.png)

**dim_personal**: The dimension table about personal information likes gender, year of birth, country of born and living. It help to specify and give more information about each immigrant or traveler. 

 **dim_arrival_date**: The date which immigrants or travelers arrive US. Table will contain the separate time units like year, month, day to help stack holders query more easier.
 
 **dim_visa**: The dimension table which contain visa type and visa purpose.
 
 **dim_location**: The dimension table about location which immigrant arrives. It provide more information about demographics of the place where immigrants or travelers want to visit.
 
 **dim_airline**: The dimension table has information about the airline brand and flight number which immigrants or travelers chose. 
 
 **fact_immigration**: The fact table which has information columns to link to the dimension tables and show the stay duration of the immigrant or traveler. It will help stack holder can research and explore which elements will attract the immigrants and travelers. 

#### 3.2 Mapping Out Data Pipelines

1. Loading datasets from S3 bucket
2. From I94 Immigration dataset we create fact_table, dim_personal, dim_arrival_date, dim_visa, dim_airline
3. From Demography dataset we create the dim_location 
4. Which cleaning steps in step 2, we modify fact_table and dim_location, drop duplicate and drop Nan in each Primary Key of tables
5. Put back the fact and dimension table into S3 bucket

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model. Please refer to elt.py for completed pipeline.

In [1]:
import os
import logging
import configparser
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import udf, col, expr, monotonically_increasing_id, to_timestamp, upper
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql import SparkSession

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()


In [3]:
#loading immigration data
df_immi =spark.read.load('./sas_data')

In [4]:
#transform and loading data into fact_immigration
fact_immigration = df_immi.withColumn('stay_duration', expr('depdate - arrdate'))\
        .selectExpr('cicid as cic_id',
                    'i94port as city_code',
                    'i94addr as state_code',
                    'arrdate as arrival_date',
                    'fltno as flight_number',
                    'visatype as visa_type',
                    'stay_duration')

with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

# create dictionary of city_code and city_name
city_dict = {}
for city in contents[303:962]:
    code = city.split("'")[1]
    city_name = city.split("'")[3].split(",")[0]
    city_dict[code] = city_name

# create udf of transforming between city_code into city_name
get_city_name = udf(
    lambda z: city_dict[z] if z in city_dict.keys() else None)

fact_immigration = fact_immigration\
        .withColumn('city_name', get_city_name(col('city_code')))\
        .drop_duplicates(subset=['cic_id']).dropna(subset=(['cic_id']))

fact_immigration.limit(5).toPandas()

Unnamed: 0,cic_id,city_code,state_code,arrival_date,flight_number,visa_type,stay_duration,city_name
0,299.0,NYC,NY,20545.0,87,WT,5.0,NEW YORK
1,305.0,NYC,NY,20545.0,87,WT,10.0,NEW YORK
2,496.0,CHI,IL,20545.0,65,WB,3.0,CHICAGO
3,558.0,SFR,CA,20545.0,454,WB,2.0,SAN FRANCISCO
4,596.0,NAS,FL,20545.0,221,WT,2.0,NASSAU


In [5]:
# crate get_datetime udf to transform SAS date into datetime
get_datetime = udf(lambda z: datetime(
    1960, 1, 1) + timedelta(days=int(z)) if z is not None else None, TimestampType())

# extract SAS arrival_date from staging table
dim_arrival_date = df_immi.selectExpr('arrdate as arrival_date')

# create new column with datetime type
dim_arrival_date = dim_arrival_date.withColumn(
    'arrival_datetime', get_datetime('arrival_date'))

# wrangling datetime type into different units
dim_arrival_date = dim_arrival_date.selectExpr('arrival_date',
                                               'arrival_datetime',
                                               'year(arrival_datetime) as year',
                                               'month(arrival_datetime) as month',
                                               'dayofmonth(arrival_datetime) as day'
                                               ).drop_duplicates(subset=['arrival_date']).dropna(subset=(['arrival_date']))
dim_arrival_date.limit(5).toPandas()

Unnamed: 0,arrival_date,arrival_datetime,year,month,day
0,20550.0,2016-04-06,2016,4,6
1,20556.0,2016-04-12,2016,4,12
2,20553.0,2016-04-09,2016,4,9
3,20551.0,2016-04-07,2016,4,7
4,20565.0,2016-04-21,2016,4,21


In [6]:
# extract columns into dim_visa
dim_visa = df_immi.selectExpr('visatype as visa_type',
                              'i94visa as visa_purpose'
                              ).drop_duplicates(subset=['visa_type']).dropna(subset=(['visa_type']))
dim_visa.limit(5).toPandas()

Unnamed: 0,visa_type,visa_purpose
0,F2,3.0
1,GMB,1.0
2,B2,2.0
3,F1,3.0
4,CPL,2.0


In [7]:
# extract columns into dim_person
dim_personal = df_immi.selectExpr('cicid as cic_id',
                                      'i94cit as country_of_birth',
                                      'i94res as country_of_residence',
                                      'biryear as year_of_birth',
                                      'gender'
                                      ).drop_duplicates(subset=['cic_id']).dropna(subset=(['cic_id']))
dim_personal.limit(5).toPandas()

Unnamed: 0,cic_id,country_of_birth,country_of_residence,year_of_birth,gender
0,299.0,103.0,103.0,1962.0,
1,305.0,103.0,103.0,1953.0,
2,496.0,103.0,103.0,1952.0,
3,558.0,103.0,103.0,1974.0,M
4,596.0,103.0,103.0,1992.0,M


In [8]:
# extract columns into dim_airline
dim_airline = df_immi.selectExpr('fltno as flight_number',
                                     'airline as airline_brand'
                                     ).drop_duplicates(subset=['flight_number']).dropna(subset=(['flight_number']))
dim_airline.limit(5).toPandas()

Unnamed: 0,flight_number,airline_brand
0,00332,YNT
1,00456,LH
2,00530,DL
3,00556,DL
4,0059C,UA


In [9]:
# Getting data demographic
df_demographic = spark.read.format('csv')\
        .options(header=True, delimiter=';')\
        .load("us-cities-demographics.csv")

In [10]:
# Extract and loading into dim_location
dim_location = df_demographic.selectExpr('upper(City) as city_name',
                                             'State as state_name',
                                             '`State Code` as state_code',
                                             '`Median Age` as median_age',
                                             '`Male Population` as male_population',
                                             '`Female Population` as female_population',
                                             '`Total Population` as total_population',
                                             '`Number of Veterans` as number_of_veterans',
                                             '`Foreign-born` as foreign_born',
                                             '`Average Household Size` as avg_household_size'
                                             ).drop_duplicates(subset=['city_name']).dropna(subset=(['city_name']))
dim_location.limit(5).toPandas()

Unnamed: 0,city_name,state_name,state_code,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,avg_household_size
0,CHINO,California,CA,36.5,50989,34610,85599,4186,18666,3.62
1,STAMFORD,Connecticut,CT,35.4,64941,63936,128877,2269,44003,2.7
2,YAKIMA,Washington,WA,34.0,45147,48553,93700,4705,15717,2.74
3,AMARILLO,Texas,TX,33.8,99391,100260,199651,11008,21124,2.64
4,MUNCIE,Indiana,IN,27.4,32446,37255,69701,2968,1062,2.35


In [38]:
# write the output into s3 bucket
fact_immigration.write.partitionBy(['city_code']).parquet('s3a://vuongnd9-udacity-output/fact_immigration', mode='overwrite')
dim_arrival_date.write.mode("overwrite").parquet(path=output_data + 'dim_arrival_date')
dim_visa.write.mode("overwrite").parquet(path=output_data + 'dim_visa')
dim_personal.write.mode("overwrite").parquet(path=output_data + 'dim_personal')
dim_airline.write.mode("overwrite").parquet(path=output_data + 'dim_airline')
dim_location.write.mode("overwrite").parquet(path=output_data + 'dim_location')

**Scenarios of using data model**

In [55]:
fact_immigration.createOrReplaceTempView("fact_immigration")
dim_airline.createOrReplaceTempView("dim_airline")
dim_arrival_date.createOrReplaceTempView("dim_arrival_date")
dim_location.createOrReplaceTempView("dim_location")
dim_personal.createOrReplaceTempView("dim_personal")
dim_visa.createOrReplaceTempView("dim_visa")

1. Check the top 5 choices of immigrant about airline brand

In [63]:
spark.sql(""" SELECT a.airline_brand, COUNT(*) as people
              FROM fact_immigration f JOIN dim_airline a
                  on f.flight_number = a.flight_number 
              GROUP BY a.airline_brand 
              ORDER BY people DESC
              LIMIT 5""").show()

+-------------+------+
|airline_brand|people|
+-------------+------+
|           DL|388471|
|           AA|384786|
|           UA|273248|
|           BA|190958|
|           AV|125686|
+-------------+------+



2. Check the top 5 location make immigrant want to stay longer

In [69]:
spark.sql(""" SELECT f.city_name, SUM(f.stay_duration) as day, COUNT(*) as visit_turn
              FROM fact_immigration f JOIN dim_location l
                  on f.city_name = l.city_name 
              GROUP BY f.city_name
              ORDER BY day DESC
              LIMIT 5""").show()

+-------------+---------+----------+
|    city_name|      day|visit_turn|
+-------------+---------+----------+
|     NEW YORK|6181103.0|    485916|
|  LOS ANGELES|4617642.0|    310163|
|        MIAMI|4535697.0|    343941|
|SAN FRANCISCO|2344869.0|    152586|
|      CHICAGO|2093979.0|    130564|
+-------------+---------+----------+



3. Check the top 3 day of month which are the most attract immigrant

In [72]:
spark.sql(""" SELECT d.day, SUM(f.stay_duration) as day, COUNT(*) as visit_turn
              FROM fact_immigration f JOIN dim_arrival_date d
                  on f.arrival_date = d.arrival_date 
              GROUP BY d.day
              ORDER BY day DESC
              LIMIT 3""").show()

+---+---------+----------+
|day|      day|visit_turn|
+---+---------+----------+
| 30|1612870.0|    127155|
| 29|1553943.0|    128267|
| 28|1540164.0|    120971|
+---+---------+----------+



#### 4.2 Data Quality Checks
Quality check will check that all tables will have records. Refer to quality_check.py for completed file.

In [22]:
# Write code here
import os
import logging
import configparser
from pyspark.sql import SparkSession


# AWS configuration
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    """ Create spark session to process data """
    spark = SparkSession.builder\
                        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
                        .getOrCreate()
    return spark


def check_records(spark, table_name, s3_bucket,primary_key):
    """ Check the number of record in each table
    ----------------------------
    Args:
        spark: Spark Session
        table_name: name of table
        s3_bucket: s3 bucket endpoint

    Return:
        None
     """
    s3_source = Path(f"{s3_bucket}{table_name}")

    for s3_dir in s3_source.iterdir():
        if s3_dir.is_dir():
            df = spark.read.parquet(str(s3_dir))
            record_num = df.count()
            if record_num <= 0:
                raise ValueError(
                    f"{table_name} table is empty - quality check fail!")
            elif record_num > df.select(primary_key).distinct().count():
                raise ValueError(
                    f"{table_name} table duplicates the unique key {primary_key} - quality check fail!")
            elif df.where(col(primary_key).isNull()).count() != 0:
                raise ValueError(
                    f"{table_name} has missing values in {primary_key} - quality check fail!")
            else:
                print(f"{table_name} have {record_num} rows without duplication - quality check pass!")


def main():
    logging.info("Data quality check starting with create spark session")
    spark = create_spark_session()
    s3_bucket = "s3a://vuongnd9-udacity-output/"
    table_list = ['fact_immigration', 'dim_visa', 'dim_arrival_date', 'dim_airline', 'dim_location', 'dim_personal']
    primary_key_list = ['cic_id','visa_type','arrival_date', 'flight_number','city_name','cic_id']
    for table, primary_key in zip(table_list, primary_key_list):
        check_records(spark, table, s3_bucket,primary_key)

In [25]:
main()

fact_immigration have 3096313 rows without duplication - quality check pass!
dim_visa have 17 rows without duplication - quality check pass!
dim_arrival_date have 30 rows without duplication - quality check pass!
dim_airline have 7153 rows without duplication - quality check pass!
dim_location have 567 rows without duplication - quality check pass!
dim_personal have 3096313 rows without duplication - quality check pass!


#### 4.3 Data dictionary 


Fact table: fact_immigration

|**Attributes**|**Description**|
|:--------------|:---------------|
|cic_id|record ID
|city_code|city code of admission
|city_name|city name of admission
|state_code|state code of arrival
|arrival_date|date of arrival
|flight_number|flight number of Airline used to arrive in U.S.
|stay_duration|stay duration of immigration


Dimension table: dim_arrival_date

|**Attributes**|**Description**|
|:--------------|:---------------|
|arrival_date|arrival date in SAS format
|arrival_datetime|arrival date in datetime format
|year|year of arrival
|month|month of arrival
|day|day of arrival

Dimension table: dim_visa
    
|**Attributes**|**Description**|
|:--------------|:---------------|
|visa_type|type of visa
|visa_purpose|common purpose of visa


Dimension table: dim_personal

|**Attributes**|**Description**|
|:--------------|:---------------|
|cic_id|record ID
|country_of_birth|immigrant country of birth
|country_of_residence|immigrant country of residence
|year_of_birth|Year of birth
|gender|immigrant sex

Dimension table: dim_airline

|**Attributes**|**Description**|
|:--------------|:---------------|
|flight_number|Flight number of Airline used to arrive in U.S.
|airline_brand|Airline used to arrive in U.S.


Dimension table: dim_location

|**Attributes**|**Description**|
|:--------------|:---------------|
|city_name|name of city
|state_name|name of state
|state_code|code of state
|median_age|median age of the location
|male_population|quantity of male
|female_population|quantity of female
|total_population|total quatity population
|number_of_veterans|quantity of veterans

#### Step 5: Complete Project Write Up

#### Tools and technologie
- AWS S3 storage to store the output of pipeline
- Pandas library for exploration the small sample data
- Spark use to create pipeline and deal with large number of data

#### Data update duration:

- *Immigration data*: dataset is partition by month so the pipeline should be scheduled monthly to process catching up the updating of source data
- *Demographic data*: This dataset seem updated annually because of some attribute must need time and high cost to estimate and process statistic 

#### Solution for scenarios:

- *The data was increased by 100x:*

    For the large data, we need to use EMR to create a cluster have enough power of computing and with elastic scale feature it can easily handle bigdata


- *The data populates a dashboard that must be updated on a daily basis by 7am every day:*
    
    With the demand of schedule processing, we can apply Airflow to create dags and process exactly and automatically. 
    
- *The data needed to be accessed by 100+ people*
    
    With high connection, we should move the data storage from s3 to Redshift. Redshift allows 500 connection so we can handle the problem well.
    
    
    