# Immigration data model
### Data Engineering Capstone Project

#### Project Summary
I choose Udacity provided project focusing on immigration data model. The goal of this project is to find out where (states of US) immigrants choose as destinations, and from that we can find more information about these destinations.

The project follows these steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [117]:
# Do all imports and installs here
import pandas as pd
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import concat, col, lit
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import *

### Step 1: Scope the Project and Gather Data

#### Scope 
- In this project, the main purpose is building a data schema and from that we can query to find out the answers for the questions like where immigrants choose to live or visit and the population in these places.
- The data sets are used in this project:
    - I94 Immigration Data    
    - U.S. City Demographic Data
- The project will be run locally on workspace and Spark will be used to pre-process and do ETL.   

#### Describe and Gather Data 
- _I94 Immigration Data_: This data comes from the US National Tourism and Trade Office. You can read more about it [here](https://travel.trade.gov/research/reports/i94/historical/2016.html)
    - You can access the immigration data in a folder with the following path: ../../data/18-83510-I94-Data-2016/. 
    - There's a file for each month of the year. An example file name is i94_apr16_sub.sas7bdat. Each file has a three-letter abbreviation for the month name. So a full file path for June would look like this: ../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat. 
    - Below is what it would look like to import April file into pandas

In [118]:
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


- _U.S. City Demographic Data_: You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
    - The data is already uploaded to the workspace. Below is how it would look like to read the file into a pandas dataframe.

In [119]:
fname = 'us-cities-demographics.csv'
df = pd.read_csv(fname, delimiter=';')
df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### Step 2: Explore and Assess the Data

In [120]:
# Use Spark to process data like droping duplicate data, cleaning data
# Create SparkSession with configuration to enable loading sas7bdat data
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

#### 2.1. Working with I94 Immigration Data

In [121]:
# List all immigration files in the attached disk
files = os.listdir('../../data/18-83510-I94-Data-2016/')
files

['i94_apr16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_nov16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_jul16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat']

In [122]:
# Read all sas7bdat files from the folder and write into sas_data folder in workspace
# This reading and writing are only executed at the first time
for file in files:
    df_i94 =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/' + file)
    df_i94.write.mode('append').parquet("sas_data")

In [123]:
# Read i94 dataframe 
df_i94 = spark.read.parquet("sas_data")
print(df_i94.count())

40790529


In [124]:
df_i94.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

_Data dictionary_

- cicid: Unique record ID
- i94yr: 4 digit year
- i94mon: Numeric month
- i94cit: 3 digit code for immigrant city of birth
- i94res: 3 digit code for immigrant country of residence
- i94port: Port of admission
- arrdate: Arrival Date in the USA
- i94mode: Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
- i94addr: USA State of arrival
- depdate: Departure Date from the USA
- i94bir: Age of Respondent in Years
- i94visa: Visa codes collapsed into three categories
- count: Field used for summary statistics
- dtadfile: Character Date Field - Date added to I-94 Files
- visapost: Department of State where where Visa was issued
- occup: Occupation that will be performed in U.S
- entdepa: Arrival Flag - admitted or paroled into the U.S.
- entdepd: Departure Flag - Departed, lost I-94 or is deceased
- entdepu: Update Flag - Either apprehended, overstayed, adjusted to perm residence
- matflag: Match flag - Match of arrival and departure records
- biryear: 4 digit year of birth
- dtaddto: Character Date Field - Date to which admitted to U.S. (allowed to stay until)
- gender: sex of immigrant
- insnum: INS number
- airline: Airline used to arrive in U.S.
- admnum: Admission Number
- fltno: Flight number of Airline used to arrive in U.S.
- visatype: Class of admission legally admitting the non-immigrant to temporarily stay in U.S.


_For I94 Immigration data, we will:_
- Drop rows with missing values in cicid, i94port, i94addr
- Check the valid of port admission, USA State of arrival (i94port, i94addr)

In [125]:
# Drop rows with missing values
df_i94 = df_i94.dropna(how='all', subset='cicid')
df_i94 = df_i94.dropna(how='all', subset='i94port')
df_i94 = df_i94.dropna(how='all', subset='i94addr')
print(df_i94.count())

38762603


In [126]:
# From the list of vaid I94 port in text file, make a valid_port list so that we can filter the rows in dataframe that have valid ports
valid_port = []
with open("i94_port_valid.txt") as f:
    for line in f:        
        valid_port.append(line[4:7])  

df_i94 = df_i94.filter(df_i94["i94port"].isin(list(valid_port)))

print(df_i94.count())


38735974


In [127]:
# From the list of vaid I94 state in text file, make a valid_state list so that we can filter the rows in dataframe that have valid state
valid_state = []
with open("i94_state_valid.txt") as f:
    for line in f:        
        valid_state.append(line[2:4])  

df_i94 = df_i94.filter(df_i94["i94addr"].isin(list(valid_state)))

print(df_i94.count())

38334545


_<u>Conclustion</u>: After cleaning some missing and invalid data, we still have 38,334,545 rows compared to 40,790,529 rows in the beginning_

#### 2.2. Working with U.S. City Demographic Data

In [128]:
# Read Demographic dataframe 
df_city = spark.read.csv("us-cities-demographics.csv", sep=';', header=True, inferSchema=True)
df_city.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [129]:
df_city.count()

2891

_For U.S. City Demographic Data, we will:_
- Drop rows with missing values in City, State, State Code, Male Population, Female Population, Foreign-born
- Check the valid of State, State Code

In [130]:
# Drop rows with missing values
df_city = df_city.dropna(how='all', subset='City')
df_city = df_city.dropna(how='all', subset='State')
df_city = df_city.dropna(how='all', subset='State Code')
df_city = df_city.dropna(how='all', subset='Male Population')
df_city = df_city.dropna(how='all', subset='Female Population')
df_city = df_city.dropna(how='all', subset='Foreign-born')
df_city = df_city.dropna(how='all', subset='Median Age')

print(df_city.count())

2875


In [131]:
# From the list of vaid I94 state in text file, make a valid_state list so that we can filter the rows in dataframe that have valid state
valid_state = []
with open("i94_state_valid.txt") as f:
    for line in f:        
        valid_state.append(line[2:4])  

df_city = df_city.filter(df_city["State Code"].isin(list(valid_state)))

print(df_city.count())

2875


_<u>Conclustion</u>: After cleaning some missing and invalid data, we still have 2875 rows compared to 2891 rows in the beginning_

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The first dimension table (dim_immigration) contains private information about immigrants like record id, year and month of entering US, country of residence, city of birth, birth year and gender  
- immigration_id: unique id for the immigration dimension table 
- cicid: Unique record ID (may be for month)
- i94yr: 4 digit year
- i94mon: Numeric month
- i94cit: 3 digit code for immigrant city of birth
- i94res: 3 digit code for immigrant country of residence
- biryear: 4 digit year of birth
- gender: sex of immigrant

The second dimension table (dim_visa) show three categories of visa code.
- i94visa: visa code
- visa_type: string
    - 1 = Business
    - 2 = Pleasure
    - 3 = Student

The third dimesion table (dim_city) contain all information from U.S. cities demographics data.
- City
- State
- median_age
- male_population
- female_population
- total_population
- number_of_veterans
- foreign_born
- average_household_size
- state_code
- race 
- count

The fourth dimension tabe (dim_state) contain some information about state as the aggregation.
- State
- state_code
- male_population
- female_population
- total_population
- foreign_born
    
The fact table will contain these columns (fact_immigration)
- immigration_id: unique id for the immigration entry
- i94visa: visa code
- i94port: 3 character code of destination city
- i94addr: USA State of arrival

#### 3.2 Mapping Out Data Pipelines

The pipeline steps are described below:

- Clean I94 data as described in step 2 to create Spark dataframe df_i94
- Clean U.S Cities Demographics data as described in step 2 to create Spark dataframe df_city
- Create dimension tables by selecting relevant columns from df_i94 and write to parquet files
- Create fact table by selecting relevant columns from df_i94 and write to parquet file


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.


In [132]:
# Add unique id for the immigration dimension table
df_i94 = df_i94.withColumn('id', monotonically_increasing_id())
df_i94 = df_i94.withColumn('immigration_id', concat(col("id"), lit("immi")))
df_i94 = df_i94.drop('id')

In [133]:
df_i94.take(5)

[Row(cicid=5680949.0, i94yr=2016.0, i94mon=7.0, i94cit=117.0, i94res=117.0, i94port='NYC', arrdate=20659.0, i94mode=1.0, i94addr='NY', depdate=None, i94bir=30.0, i94visa=3.0, count=1.0, dtadfile='20160724', visapost='NPL', occup=None, entdepa='G', entdepd=None, entdepu=None, matflag=None, biryear=1986.0, dtaddto='D/S', gender='F', insnum=None, airline='IG', admnum=2947450085.0, fltno='3940', visatype='F1', immigration_id='0immi'),
 Row(cicid=5680950.0, i94yr=2016.0, i94mon=7.0, i94cit=245.0, i94res=245.0, i94port='DET', arrdate=20659.0, i94mode=1.0, i94addr='IL', depdate=20679.0, i94bir=46.0, i94visa=2.0, count=1.0, dtadfile='20160813', visapost=None, occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1970.0, dtaddto='01232017', gender='M', insnum='78652', airline='DL', admnum=2947451085.0, fltno='188', visatype='B2', immigration_id='1immi'),
 Row(cicid=5680953.0, i94yr=2016.0, i94mon=7.0, i94cit=245.0, i94res=245.0, i94port='SEA', arrdate=20659.0, i94mode=1.0, i9

##### 4.1.1. Dimension Table: dim_immigration 

In [134]:
# Extract columns for immigration dimension table
dim_immigration = df_i94.select(["immigration_id", "cicid", "i94yr", "i94mon", "i94cit", "i94res", "biryear", "gender"])

In [135]:
dim_immigration.take(5)

[Row(immigration_id='0immi', cicid=5680949.0, i94yr=2016.0, i94mon=7.0, i94cit=117.0, i94res=117.0, biryear=1986.0, gender='F'),
 Row(immigration_id='1immi', cicid=5680950.0, i94yr=2016.0, i94mon=7.0, i94cit=245.0, i94res=245.0, biryear=1970.0, gender='M'),
 Row(immigration_id='2immi', cicid=5680953.0, i94yr=2016.0, i94mon=7.0, i94cit=245.0, i94res=245.0, biryear=1980.0, gender='F'),
 Row(immigration_id='3immi', cicid=5680954.0, i94yr=2016.0, i94mon=7.0, i94cit=135.0, i94res=135.0, biryear=1999.0, gender='F'),
 Row(immigration_id='4immi', cicid=5680956.0, i94yr=2016.0, i94mon=7.0, i94cit=213.0, i94res=213.0, biryear=1993.0, gender='M')]

In [136]:
# Write immigration dimension table to "results/dim_immigration"
dim_immigration.write.mode("overwrite").parquet("results/dim_immigration")

##### 4.1.2. Dimension Table: dim_visa 

In [137]:
# Extract column i94visa for visa dimension table
dim_visa = df_i94.select(["i94visa"]).distinct()

In [138]:
def visafunc(value):
  if   value == 1: 
      return 'Business'
  elif value == 2:
      return 'Pleasure'
  elif value == 3:
      return 'Student'
    
#convert to a udf function by passing in the function and return type of function
udf_visafunc = F.udf(visafunc, StringType())
dim_visa = dim_visa.withColumn("visa_type", udf_visafunc("i94visa"))

In [139]:
dim_visa.show()

+-------+---------+
|i94visa|visa_type|
+-------+---------+
|    1.0| Business|
|    3.0|  Student|
|    2.0| Pleasure|
+-------+---------+



In [140]:
# Write visa dimension table to "results/dim_visa"
dim_visa.write.mode("overwrite").parquet("results/dim_visa")

##### 4.1.3. Dimension Table: dim_city

In [141]:
# Copy the whole dataframe df_city for city dimension table and rename it
dim_city = df_city
dim_city = dim_city.withColumnRenamed('Median Age','median_age') \
                .withColumnRenamed('Male Population', 'male_population') \
                .withColumnRenamed('Female Population', 'female_population') \
                .withColumnRenamed('Total Population', 'total_population') \
                .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
                .withColumnRenamed('Foreign-born', 'foreign_born') \
                .withColumnRenamed('Average Household Size', 'average_household_size') \
                .withColumnRenamed('State Code', 'state_code')

In [142]:
dim_city.take(5)

[Row(City='Silver Spring', State='Maryland', median_age=33.8, male_population=40601, female_population=41862, total_population=82463, number_of_veterans=1562, foreign_born=30908, average_household_size=2.6, state_code='MD', Race='Hispanic or Latino', Count=25924),
 Row(City='Quincy', State='Massachusetts', median_age=41.0, male_population=44129, female_population=49500, total_population=93629, number_of_veterans=4147, foreign_born=32935, average_household_size=2.39, state_code='MA', Race='White', Count=58723),
 Row(City='Hoover', State='Alabama', median_age=38.5, male_population=38040, female_population=46799, total_population=84839, number_of_veterans=4819, foreign_born=8229, average_household_size=2.58, state_code='AL', Race='Asian', Count=4759),
 Row(City='Rancho Cucamonga', State='California', median_age=34.5, male_population=88127, female_population=87105, total_population=175232, number_of_veterans=5821, foreign_born=33878, average_household_size=3.18, state_code='CA', Race='Blac

In [143]:
# Add unique id for city dimension table
dim_city = dim_city.withColumn('id', monotonically_increasing_id())

In [144]:
# Write city dimension table to "results/dim_city"
dim_city.write.mode("overwrite").parquet("results/dim_city")

##### 4.1.4. Dimension Table: dim_state

In [145]:
dim_state = dim_city.select(["State", "state_code", "male_population", "female_population", "total_population", "foreign_born"])

In [146]:

dim_state = dim_state.groupBy(["State", "state_code"]).sum()
dim_state.printSchema()

root
 |-- State: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- sum(male_population): long (nullable = true)
 |-- sum(female_population): long (nullable = true)
 |-- sum(total_population): long (nullable = true)
 |-- sum(foreign_born): long (nullable = true)



In [147]:
dim_state = dim_state.withColumnRenamed('sum(male_population)', 'male_population') \
                .withColumnRenamed('sum(female_population)', 'female_population') \
                .withColumnRenamed('sum(total_population)', 'total_population') \
                .withColumnRenamed('sum(foreign_born)', 'foreign_born') 

In [148]:
dim_state.printSchema()

root
 |-- State: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- male_population: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- total_population: long (nullable = true)
 |-- foreign_born: long (nullable = true)



In [149]:
# Write state dimension table to "results/dim_state"
dim_city.write.mode("overwrite").parquet("results/dim_state")

##### 4.1.5. Fact Table: fact_immigration

In [150]:
# Extract columns for immigration fact table
fact_immigration = df_i94.select(["immigration_id", "i94visa", "i94port", "i94addr"])
fact_immigration = fact_immigration.withColumn("id", monotonically_increasing_id())

In [151]:
fact_immigration.take(5)

[Row(immigration_id='0immi', i94visa=3.0, i94port='NYC', i94addr='NY', id=0),
 Row(immigration_id='1immi', i94visa=2.0, i94port='DET', i94addr='IL', id=1),
 Row(immigration_id='2immi', i94visa=2.0, i94port='SEA', i94addr='WA', id=2),
 Row(immigration_id='3immi', i94visa=2.0, i94port='ORL', i94addr='FL', id=3),
 Row(immigration_id='4immi', i94visa=2.0, i94port='MIA', i94addr='FL', id=4)]

In [152]:
# Write fact table to "results/fact_immigration"
fact_immigration.write.mode("overwrite").parquet("results/fact_immigration")

#### 4.2 Data Quality Checks
The data quality checks ensures that the ETL has created fact and dimension tables with adequate records.

In [116]:
df_tables = {
    'dim_immigration': dim_immigration,
    'dim_visa': dim_visa,
    'dim_city':  dim_city,
    'dim_state': dim_state,
    'fact_immigration': fact_immigration
}


for df_name, df_table in df_tables.items():
    total_rows = df_table.count()    
    if total_rows == 0:
        print(f"Data quality check failed for {df_name} with zero records!")
    else:
        print(f"Data quality check passed for {df_name} with {total_rows:,} records.")    
    

Data quality check passed for dim_immigration with 2,915,506 records.
Data quality check passed for dim_visa with 3 records.
Data quality check passed for dim_city with 2,875 records.
Data quality check passed for dim_state with 48 records.
Data quality check passed for fact_immigration with 2,915,506 records.


#### 4.3 Data dictionary 
__dim_immigration table__
- mmigration_id: unique id for the immigration dimension table
- cicid: Unique record ID (may be for month)
- i94yr: 4 digit year
- i94mon: Numeric month
- i94cit: 3 digit code for immigrant city of birth
- i94res: 3 digit code for immigrant country of residence
- biryear: 4 digit year of birth
- gender: sex of immigrant


__dim_visa table__
- i94visa: visa code
- visa_type: string
    - Business (i94visa = 1)
    - Pleasure (i94visa = 2)
    - Student (i94visa = 3)

__dim_city__
- City: City's name
- State: State's name
- median_age: median age of city
- male_population: number of men in the city
- female_population: number of women in the city
- total_population: number of persons in the city
- number_of_veterans: number of veterans in the city
- foreign_born: number of foreign-born persons in the city
- average_household_size: average household size
- state_code: two-letter code for state
- race: White, Hispanic or Latino, Asian, Black or African-American, or American Indian and Alaska Native
- count: number of people of that race in the city

__dim_state__

- State: State's name
- state_code: two-letter code for state
- male population: number of men in the city
- female population: number of women in the city
- total population: number of persons in the city
- foreign_born:  number of foreign-born persons in the city

__fact_immigration__
- immigration_id: unique id for the immigration entry
- i94visa: visa code
- i94port: 3 character code of destination city
- i94addr: USA State of arrival


#### Step 5: Complete Project Write Up
- Clearly state the rationale for the choice of tools and technologies for the project.
    - Spark is used in this project for some reasons:
        - It is robust in processing big data
        - it can handle different file formats        
    - Python is used because it is fast and easy to work with data and different APIs
    
- Propose how often the data should be updated and why.
    - The data should be updated monthly, according to the i94 immigration file stored
    
- Write a description of how you would approach the problem differently under the following scenarios:
    - The data was increased by 100x.
        - In this case, Spark on Cloud like EMR, and storing data on AWS S3 could be a good choice        
    - The data populates a dashboard that must be updated on a daily basis by 7am every day.
        - Apache Airflow can be used to schedule workflow        
    - The database needed to be accessed by 100+ people.
        - HDFS can be used in this scenario, because HDFS is a distributed file system which guarantees fault tolerance