# Project Title
### Data Engineering Capstone Project

#### Project Summary

The goal of this project is to build an ETL pipeline that extracts I94 immigration data and U.S. City Demographic data. From the tables we build, we can answer questions like what are the most popular ports(cities), what is the gender distribution, etc. 

Pyspark is used to perform ETL operations. Results are stored in parquet files and saved to AWS s3 buckets.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime, timedelta
from pyspark.sql import types as T
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import os
import configparser

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, we are going to create a data lake for US immigrantion. First we will load the data, then perform exploratory analysis and cleaning on the data. Finally we will create fact and dimension tables saved them in parquet files on AWS S3.

#### Tools
- Use pandas for the sample data. 
- Use Pyspark to load and process the data.

#### Describe and Gather Data 
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. Form I-94, the Arrival-Departure Record Card, is a form used by U.S. Customs and Border Protection (CBP) intended to keep track of the arrival and departure to/from the United States of people who are not United States citizens or lawful permanent residents (with the exception of those who are entering using the Visa Waiver Program or Compact of Free Association, using Border Crossing Cards, re-entering via automatic visa revalidation, or entering temporarily as crew members). Data can be accessed in a folder with the following path: ../../data/18-83510-I94-Data-2016/. There's a file for each month of the year. We only use April data for this project.

- Cities demographic data: This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. It comes from the US Census Bureau's 2015 American Community Survey.


In [None]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
output_data = config['AWS']['OUTPUT_DATA_PATH']

#### Immigration sample file

In [2]:
df = pd.read_csv('immigration_data_sample.csv')

In [3]:
pd.set_option('max_columns', None)
df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


#### Data Library (CIC use)
- cicid: CIC ID
- i94yr: 4-digit-year
- i94mon: Numeric month
- i94cit: Country of citizenship
- i94res: Country of residence
- i94port: 3-letter city code
- arrdate: Arrival date in the USA
- i94mode: Mode of transportation
- i94addr: State code
- depdate: Departure date from the USA
- i94bir:  Age of Respondent in Years
- i94visa: Visa type codes
- count: Used for summary statistics
- matflag: Match flag - Match of arrival and departure records
- biryear: 4 digit year of birth
- gender: Non-immigrant sex
- insnum: INS number
- airline: Airline used to arrive in U.S.
- admnum: Admission Number
- fltno: Flight number of Airline used to arrive in U.S.
- visatype: Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

#### Full immigration data

 Create a spark session and read full immigration data

In [3]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [5]:
df_spark.count()

3096313

In [6]:
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

#### Cities demographic data

In [8]:
df_demographic = spark.read.csv('us-cities-demographics.csv',header=True, sep=';')

In [9]:
df_demographic.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [10]:
df_demographic.count()

2891

In [11]:
df_demographic.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


### Step 2: Explore and Assess the Data

#### 1. Cities demographic data

Since the city names in I94 labels descriptions are in all captical letter, it's better to convert city names to all capitcal.

In [12]:
df_demographic = df_demographic.withColumn('City', F.upper(F.col('City')))

In [13]:
df_demographic.select('City').show(5)

+----------------+
|            City|
+----------------+
|   SILVER SPRING|
|          QUINCY|
|          HOOVER|
|RANCHO CUCAMONGA|
|          NEWARK|
+----------------+
only showing top 5 rows



Check for unique city names

In [14]:
df_demographic.select(F.countDistinct('City', 'State')).show()

+---------------------------+
|count(DISTINCT City, State)|
+---------------------------+
|                        596|
+---------------------------+



In [15]:
df_demographic.select('City').distinct().show(5)

+--------+
|    City|
+--------+
|STAMFORD|
|   CHINO|
|  YAKIMA|
|AMARILLO|
|  MUNCIE|
+--------+
only showing top 5 rows



Let's take a look at the city Stamford for example 

In [16]:
 df_demographic.where(df_demographic.City == 'STAMFORD').toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,STAMFORD,Connecticut,35.4,64941,63936,128877,2269,44003,2.7,CT,Asian,11013
1,STAMFORD,Connecticut,35.4,64941,63936,128877,2269,44003,2.7,CT,Hispanic or Latino,33197
2,STAMFORD,Connecticut,35.4,64941,63936,128877,2269,44003,2.7,CT,Black or African-American,24329
3,STAMFORD,Connecticut,35.4,64941,63936,128877,2269,44003,2.7,CT,White,85620
4,STAMFORD,Connecticut,35.4,64941,63936,128877,2269,44003,2.7,CT,American Indian and Alaska Native,1416


All 5 rows have identical information except data in Race and Count Columns; we will drop these two columns for the purpose of this project.

In [17]:
df_demographic_clean = df_demographic.drop('Race', 'Count')

In [18]:
df_demographic_clean = df_demographic_clean.dropDuplicates()

In [19]:
df_demographic_clean.count()

596

In [20]:
df_demographic_clean.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_demographic_clean.columns]).show()

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|   0|    0|         0|              1|                1|               0|                 7|           7|                     8|         0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+



rename the columns

In [21]:
df_demographic_clean = df_demographic_clean.withColumnRenamed('Median Age','median_age')\
                                            .withColumnRenamed('Male Population', 'male_population')\
                                            .withColumnRenamed('Female Population', 'female_population')\
                                            .withColumnRenamed('Total Population', 'total_population')\
                                            .withColumnRenamed('Number of Veterans', 'number_of_veterans')\
                                            .withColumnRenamed('Foreign-born', 'foreign_born')\
                                            .withColumnRenamed('Average Household Size', 'average_household_size')\
                                            .withColumnRenamed('State Code', 'state_code')

In [22]:
df_demographic_clean.select('state_code').distinct().show(60)

+----------+
|state_code|
+----------+
|        AZ|
|        SC|
|        LA|
|        MN|
|        NJ|
|        DC|
|        OR|
|        VA|
|        RI|
|        KY|
|        NH|
|        MI|
|        NV|
|        WI|
|        ID|
|        CA|
|        CT|
|        NE|
|        MT|
|        NC|
|        MD|
|        DE|
|        MO|
|        IL|
|        ME|
|        WA|
|        ND|
|        MS|
|        AL|
|        IN|
|        OH|
|        TN|
|        NM|
|        IA|
|        PA|
|        SD|
|        NY|
|        TX|
|        GA|
|        MA|
|        KS|
|        FL|
|        CO|
|        AK|
|        AR|
|        OK|
|        PR|
|        UT|
|        HI|
+----------+



####  2. country code

In [4]:
df_country_code = spark.read.options(header='False').csv("country_code.csv")

In [10]:
df_country_code.limit(5).toPandas()

Unnamed: 0,_c0
0,"582 = 'MEXICO Air Sea, and Not Reported (I-94..."
1,236 = 'AFGHANISTAN'
2,101 = 'ALBANIA'
3,316 = 'ALGERIA'
4,102 = 'ANDORRA'


In [25]:
df_country_code = df_country_code.withColumn('_c0', F.regexp_replace('_c0', "'", ''))

In [26]:
df_country_code_clean = df_country_code.withColumn("code", F.split(F.col("_c0"), "=").getItem(0)) \
                        .withColumn("country", F.split(F.col("_c0"), "=").getItem(1)).drop("_c0")

In [27]:
df_country_code_clean.limit(5).toPandas()

Unnamed: 0,code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no land arrivals)"
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [28]:
df_country_code_clean.printSchema()

root
 |-- code: string (nullable = true)
 |-- country: string (nullable = true)



#### 3.Port code

In [29]:
df_port = spark.read.options(header='False').csv("port.csv")

In [30]:
df_port.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3
0,'ALC',=,"'ALCAN, AK '",
1,'ANC',=,"'ANCHORAGE, AK '",
2,'BAR',=,"'BAKER AAF - BAKER ISLAND, AK'",
3,'DAC',=,"'DALTONS CACHE, AK '",
4,'PIZ',=,"'DEW STATION PT LAY DEW, AK'",


In [31]:
df_port_clean = df_port.withColumn('port_code', F.regexp_replace('_c0', "['$ ]", ''))\
                .withColumn('port', F.regexp_replace('_c2', "['$ ]", '')).drop('_c0','_c1','_c2','_c3')

In [32]:
df_port_clean = df_port_clean.withColumn("city", F.split(F.col("port"), ",").getItem(0)) \
                        .withColumn("state", F.split(F.col("port"), ",").getItem(1)).drop("port")

In [33]:
df_port_clean.limit(5).toPandas()

Unnamed: 0,port_code,city,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKERAAF-BAKERISLAND,AK
3,DAC,DALTONSCACHE,AK
4,PIZ,DEWSTATIONPTLAYDEW,AK


In [34]:
df_port_clean.select('port_code').distinct().count()

660

#### 4. Full Immigration data

First we will pick the columns we are interested in

In [35]:
df_immi = df_spark.select('cicid','i94cit','i94res', 'i94port','arrdate', 'i94mode', 'biryear', 'gender', 'i94visa')

check null values

In [36]:
df_immi.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_immi.columns]).toPandas()

Unnamed: 0,cicid,i94cit,i94res,i94port,arrdate,i94mode,biryear,gender,i94visa
0,0,0,0,0,0,239,802,414269,0


check if cicid can be used as the primary key

In [37]:
df_immi.count()

3096313

In [38]:
df_immi.select('cicid').distinct().count()

3096313

Next, we will change sas date type to spark date type

In [39]:
def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())

In [40]:
df_immi = df_immi.withColumn('arrival_date', udf_datetime_from_sas('arrdate'))

In [41]:
df_immi.limit(5).toPandas()

Unnamed: 0,cicid,i94cit,i94res,i94port,arrdate,i94mode,biryear,gender,i94visa,arrival_date
0,6.0,692.0,692.0,XXX,20573.0,,1979.0,,2.0,2016-04-29
1,7.0,254.0,276.0,ATL,20551.0,1.0,1991.0,M,3.0,2016-04-07
2,15.0,101.0,101.0,WAS,20545.0,1.0,1961.0,M,2.0,2016-04-01
3,16.0,101.0,101.0,NYC,20545.0,1.0,1988.0,,2.0,2016-04-01
4,17.0,101.0,101.0,NYC,20545.0,1.0,2012.0,,2.0,2016-04-01


In [42]:
df_immi.select('i94port').distinct().count()

299

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

In [5]:
from IPython.display import Image
Image(url="capstone.png", width=1000, height=800)

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Load all datasets
2. Fix data types
3. Create `fact_immigration` table from `df_immi`
4. Create `dim_visa` table
5. Create `dim_country` table from `df_country_code_clean`
6. Extract arrival_date from `df_immi` and create `dim_date` table
7. Create `dim_port` table from `df_port_clean` table
8. Create `dim_demographics` table from `df_demographic_clean` and `dim_port` table
9. Create `dim_travel_mode` table
10. Write tables to parquet files

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

`fact_immigration` table 

In [44]:
fact_immigration = df_immi.select(
                            F.col('cicid').cast('int').alias('id'),
                            F.col('i94cit').cast('int').alias('citizenship_code'),
                            F.col('i94res').cast('int').alias('residence_code'),
                            F.col('i94port').alias('port_code'),
                            F.col('arrival_date'),
                            F.col('i94mode').cast('int').alias('transportation_mode'),
                            F.col('biryear').cast('int').alias('birth_year'),
                            F.col('gender'),
                            F.col('i94visa').cast('int').alias('visa_code')
).dropDuplicates(['id'])

In [45]:
fact_immigration = fact_immigration.withColumn('citizenship_code', F.col('citizenship_code').cast('string'))\
                                .withColumn('residence_code', F.col('residence_code').cast('string'))              

In [47]:
fact_immigration.show(5)

+---+----------------+--------------+---------+------------+-------------------+----------+------+---------+
| id|citizenship_code|residence_code|port_code|arrival_date|transportation_mode|birth_year|gender|visa_code|
+---+----------------+--------------+---------+------------+-------------------+----------+------+---------+
|148|             103|           103|      NEW|  2016-04-01|                  1|      1995|     F|        2|
|463|             103|           103|      MIA|  2016-04-01|                  1|      1991|  null|        2|
|471|             103|           103|      MIA|  2016-04-01|                  2|      1953|     M|        2|
|496|             103|           103|      CHI|  2016-04-01|                  1|      1952|  null|        1|
|833|             104|           104|      BOS|  2016-04-01|                  1|      2000|     F|        2|
+---+----------------+--------------+---------+------------+-------------------+----------+------+---------+
only showing top 5 

`dim_visa` table

In [48]:
visa_data = [[1, 'Business'], [2, 'Pleasure'], [3, 'Student']]
visa_pd = pd.DataFrame(visa_data, columns = ['visa_code', 'visa']) 

In [49]:
mySchema = StructType([StructField("visa_code", IntegerType(), True)\
                       ,StructField("visa", StringType(), True)])

In [50]:
dim_visa = spark.createDataFrame(visa_pd,schema=mySchema)
dim_visa.show()

+---------+--------+
|visa_code|    visa|
+---------+--------+
|        1|Business|
|        2|Pleasure|
|        3| Student|
+---------+--------+



`dim_country` table

In [53]:
dim_country = df_country_code_clean.select(
                                    F.col('code').alias('country_code'),
                                    F.col('country')).dropDuplicates(['country_code']).dropDuplicates(['country_code'])

In [54]:
dim_country.show(5)

+------------+--------------------+
|country_code|             country|
+------------+--------------------+
|        245 |          CHINA, PRC|
|        152 |          AZERBAIJAN|
|        687 |          ARGENTINA |
|        849 |  No Country Code...|
|        213 |               INDIA|
+------------+--------------------+
only showing top 5 rows



`dim_date` table

In [56]:
dim_date = fact_immigration.select(F.col('arrival_date'),
                       F.dayofmonth('arrival_date').alias('day'),
                       F.weekofyear('arrival_date').alias('week'),
                       F.month('arrival_date').alias('month'),
                       F.year('arrival_date').alias('year'),
                       F.date_format('arrival_date','E').alias('weekofday')
                      ).distinct()

In [57]:
dim_date.show(5)

+------------+---+----+-----+----+---------+
|arrival_date|day|week|month|year|weekofday|
+------------+---+----+-----+----+---------+
|  2016-04-05|  5|  14|    4|2016|      Tue|
|  2016-04-17| 17|  15|    4|2016|      Sun|
|  2016-04-09|  9|  14|    4|2016|      Sat|
|  2016-04-06|  6|  14|    4|2016|      Wed|
|  2016-04-21| 21|  16|    4|2016|      Thu|
+------------+---+----+-----+----+---------+
only showing top 5 rows



`dim_port` table

In [60]:
dim_port = df_port_clean.select(F.col('port_code'),
                               F.col('city'),
                               F.col('state').alias('state_code')).dropDuplicates(['port_code'])

In [61]:
dim_port.show(5)

+---------+---------+----------+
|port_code|     city|state_code|
+---------+---------+----------+
|      BGM|   BANGOR|        ME|
|      FMY|FORTMYERS|        FL|
|      LEB|  LEBANON|        NH|
|      DNS| DUNSEITH|        ND|
|      EGL|    EAGLE|        AK|
+---------+---------+----------+
only showing top 5 rows



`dim_demographics` table

In [63]:
df_demographic_clean.createOrReplaceTempView("demographic_data")
dim_port.createOrReplaceTempView("port")

In [87]:
dim_demographics = spark.sql("""SELECT
                                DISTINCT port_code,
                                d.City AS city,
                                median_age,
                                male_population,
                                female_population,
                                number_of_veterans,
                                average_household_size,
                                d.state_code AS state_code 
                                FROM demographic_data AS d
                                JOIN port AS p
                                ON d.City = p.city
                                AND d.state_code = p.state_code
                                
""")

In [88]:
dim_demographics = dim_demographics.withColumn('median_age', F.col('median_age').cast('float'))\
                                   .withColumn('male_population', F.col('male_population').cast('int')) \
                                   .withColumn('female_population', F.col('female_population').cast('int')) \
                                   .withColumn('number_of_veterans', F.col('number_of_veterans').cast('int')) \
                                   .withColumn('average_household_size', F.col('average_household_size').cast('float')) 

In [90]:
dim_demographics = dim_demographics.drop('city', 'state_code')

In [91]:
dim_demographics.show(5)

+---------+----------+---------------+-----------------+------------------+----------------------+
|port_code|median_age|male_population|female_population|number_of_veterans|average_household_size|
+---------+----------+---------------+-----------------+------------------+----------------------+
|      BOS|      31.8|         322149|           347320|             18350|                  2.38|
|      GAR|      38.1|          35876|            41478|              3952|                  2.35|
|      BUF|      33.1|         124537|           133529|             11231|                  2.27|
|      OMA|      34.2|         218789|           225098|             24503|                  2.47|
|      PHI|      34.1|         741270|           826172|             61995|                  2.61|
+---------+----------+---------------+-----------------+------------------+----------------------+
only showing top 5 rows



`dim_travel_mode` table

In [69]:
mode_data = [[1, 'Air'], [2, 'Sea'], [3, 'Land'], [9, 'Not reported']]
mode_pd = pd.DataFrame(mode_data, columns = ['mode_code', 'travel_mode']) 

In [70]:
mySchema = StructType([StructField("mode_code", IntegerType(), True)\
                       ,StructField("travel_mode", StringType(), True)])

In [71]:
dim_travel_mode = spark.createDataFrame(mode_pd,schema=mySchema)
dim_travel_mode.show()

+---------+------------+
|mode_code| travel_mode|
+---------+------------+
|        1|         Air|
|        2|         Sea|
|        3|        Land|
|        9|Not reported|
+---------+------------+



#### 4.2 Data Quality Checks
 * Data type checks
 * Count checks to ensure completeness
 
Run Quality Checks

1. Check table counts to ensure no table is empty.

In [81]:
def count_check(table_name, data_frame):
    number = data_frame.count()
    if number == 0:
        raise ValueError('Quality check failed. {} table is empty'.format(table_name))
    else:
        print('Quality check passed for {} table with {} counts'.format(table_name, number))

In [94]:
df_tables = {
    "fact_immigration": fact_immigration,
    "dim_visa": dim_visa,
    "dim_port": dim_port,
    "dim_country": dim_country,
    "dim_date": dim_date,
    "dim_demographics": dim_demographics,
    "dim_travel_mode": dim_travel_mode
}

In [83]:
for table_name, data_frame in df_tables.items():
    count_check(table_name, data_frame)

Quality check passed for fact_immigration table with 3096313 counts
Quality check passed for dim_visa table with 3 counts
Quality check passed for dim_port table with 660 counts
Quality check passed for dim_country table with 289 counts
Quality check passed for dim_date table with 30 counts
Quality check passed for dim_demographics table with 86 counts
Quality check passed for dim_travel_mode table with 4 counts


2. Check data types

In [95]:
for table_name, data_frame in df_tables.items():
    print(table_name)
    data_frame.printSchema()

fact_immigration
root
 |-- id: integer (nullable = true)
 |-- citizenship_code: string (nullable = true)
 |-- residence_code: string (nullable = true)
 |-- port_code: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- transportation_mode: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_code: integer (nullable = true)

dim_visa
root
 |-- visa_code: integer (nullable = true)
 |-- visa: string (nullable = true)

dim_port
root
 |-- port_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)

dim_country
root
 |-- country_code: string (nullable = true)
 |-- country: string (nullable = true)

dim_date
root
 |-- arrival_date: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekofday: string (nullable = true)

dim_demographics

#### 4.3 Write tables to parquet files

In [None]:
fact_immigration.write.partitionBy("arrival_date", "gender").parquet(output_data + "immigration.parquet", 'overwrite')
dim_visa.write.parquet(output_data + "visa.parquet",'overwrite')
dim_port.write.partitionBy("state_code").parquet(output_data + "port.parquet", 'overwrite')
dim_country.write.parquet(output_data + "country.parquet", 'overwrite')
dim_date.write.parquet(output_data + "date.parquet", 'overwrite')
dim_demographics.write.parquet(output_data + "demographics.parquet", 'overwrite')
dim_travel_mode.write.parquet(output_data + "mode.parquet", 'overwrite')

#### 4.4 Data dictionary 

README.md

#### Step 5: Complete Project Write Up
* Rationale for the choice of tools and technologies for the project
    - In this project we use pandas to get a brief knowledge of the dataset since it is good for dealing with small data. Then we use Pyspark to do the rest of the job since the full immigration data contains about 3 million rows and Spark is good for big data. Finally we use AWS s3 to store our fact and dimension tables.

* Propose how often the data should be updated and why.
    - Since I94 immigration data(source) is updated monthly, ETL should run monthly too.

* Write a description of how you would approach the problem differently under the following scenarios:
     * The data was increased by 100x.
          - We can run ETL on AWS EMR cluster, adding nodes if necessary.
     * The data populates a dashboard that must be updated on a daily basis by 7am every day.
          - We can use Airflow to schedule and run ETL.
     * The database needed to be accessed by 100+ people.
         - We can load tables into AWS redshift as a public data warehouse.