# I94 Immigration Data Lake

## Project Summary
The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType, LongType, StringType, TimestampType
from pyspark.sql.functions import col, udf, date_format, upper, to_date, year, month

pd.options.display.max_columns = None

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

# If using S3 bucket
# output_key = 's3a://capstone-datalake/'

# If testing locally
output_key = './datamart/'

## Step 1: Scope the Project and Gather Data

### Scope 
This project uses three datasources: I94 immigration data, world temperature data and US demographic data to setup a Data Lake with fact and dimension tables.

#### Tools and Technologies
1. AWS S3 for data storage.
2. Pandas to explore and analyze the data.
3. PySpark for the main ETL process.

### Describe and Gather Data 

#### I94 Immigration Data
This data comes from the US National Tourism and Trade Office. [This](https://www.trade.gov/national-travel-and-tourism-office) is where the data comes from.

The immigration data is stored in a folder with the following path: `../../data/18-83510-I94-Data-2016/`. There's a file for each month of the year. An example file name is `i94_apr16_sub.sas7bdat`. Each file has a three-letter abbreviation for the month name. So a full file path for June would look like this: `../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat`.

#### World Temperature Data
This dataset came from this Kaggle [link](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

The temperature data is stored in a folder with the following path: `../../data2/`. There's just one file in that folder, called `GlobalLandTemperaturesByCity.csv`.

#### U.S. City Demographic Data
This dataset came from this OpenSoft [link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

The demographic data is just 1 csv file in this folder: `./us-cities-demographics.csv`.

## Step 2: Explore and Assess the Data
### Explore the Data

#### I94 Immigration Data

##### Preview data

In [2]:
# Read in the data here
df_immi = pd.read_csv("immigration_data_sample.csv")
df_immi.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [3]:
df_immi.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


##### Check if `cicid` is a primary key candidate

In [4]:
df_immi['cicid'].apply(float.is_integer).all()

True

In [5]:
df_immi['cicid'].duplicated().any()

False

Column `cicid` is whole float (integer) and distinct, so `cicid` can be used as a primary key for tables related to `df_immi`. However, I need to convert `cicid` (along with other float columns) to integer first.

#### World Temperature Data

##### Preview data

In [6]:
# Read in the data here
df_temp = pd.read_csv("../../data2/GlobalLandTemperaturesByCity.csv")
df_temp.columns

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')

In [7]:
# Only take USA data
df_temp = df_temp[df_temp['Country'] == 'United States']
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


##### Check if `(dt, City)` is a primary key candidate

In [8]:
df_temp.duplicated(subset=['dt', 'City']).any()

True

The set of `(dt, City)` is not distinct, we need to clean this.

#### U.S. City Demographic Data

##### Preview data

In [9]:
# Read in the data here
df_demo = pd.read_csv("us-cities-demographics.csv", delimiter=';')
df_demo.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')

In [10]:
df_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


##### Check if `(City, State)` is a primary key candidate

In [11]:
df_demo.duplicated(subset=['City', 'State']).any()

True

The set of `(City, State)` is not distinct, we need to clean this.

### Cleaning Steps

#### I94 Immigration Data

In [12]:
# Performing cleaning tasks here
df_immi = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_immi = df_immi.withColumn("cicid", df_immi["cicid"].cast(IntegerType())) \
                 .withColumn("i94yr", df_immi["i94yr"].cast(IntegerType())) \
                 .withColumn("i94mon", df_immi["i94mon"].cast(IntegerType())) \
                 .withColumn("i94cit", df_immi["i94cit"].cast(IntegerType())) \
                 .withColumn("i94res", df_immi["i94res"].cast(IntegerType())) \
                 .withColumn("arrdate", df_immi["arrdate"].cast(IntegerType())) \
                 .withColumn("depdate", df_immi["depdate"].cast(IntegerType())) \
                 .withColumn("i94mode", df_immi["i94mode"].cast(IntegerType())) \
                 .withColumn("i94visa", df_immi["i94visa"].cast(IntegerType())) \
                 .withColumn("biryear", df_immi["biryear"].cast(IntegerType())) \
                 .withColumn("admnum", df_immi["admnum"].cast(LongType()))

In [13]:
# for record in df_immi.head(2):
#     print(record)

#### World Temperature Data

In [14]:
# Performing cleaning tasks here
df_temp = spark.read.option("header", "true").csv('../../data2/GlobalLandTemperaturesByCity.csv')
df_temp = df_temp.filter(df_temp['Country'] == 'United States')
df_temp = df_temp.dropDuplicates(subset=['dt', 'City'])
df_temp = df_temp.withColumn("AverageTemperature", df_temp["AverageTemperature"].cast(DoubleType())) \
                 .withColumn("AverageTemperatureUncertainty", df_temp["AverageTemperatureUncertainty"].cast(DoubleType()))

In [15]:
# for record in df_temp.head(2):
#     print(record)

#### U.S. City Demographic Data

In [16]:
# Performing cleaning tasks here
df_demo = spark.read.option("header", "true").option("delimiter", ";").csv('us-cities-demographics.csv')
df_demo = df_demo.dropDuplicates(subset=['City', 'State'])
df_demo = df_demo.withColumn("Male Population", df_demo["Male Population"].cast(IntegerType())) \
                 .withColumn("Female Population", df_demo["Female Population"].cast(IntegerType())) \
                 .withColumn("Number of Veterans", df_demo["Number of Veterans"].cast(IntegerType())) \
                 .withColumn("Foreign-born", df_demo["Foreign-born"].cast(IntegerType())) \
                 .withColumn("Median Age", df_demo["Median Age"].cast(DoubleType())) \
                 .withColumn("Average Household Size", df_demo["Average Household Size"].cast(DoubleType()))

In [17]:
# for record in df_demo.head(2):
#     print(record)

## Step 3: Define the Data Model
### 3.1 Conceptual Data Model
For an OLAP or BI application, it is best to use a Star schema for our data model.

<img src="./images/datamart.png" width="600" height="400">

### 3.2 Mapping Out Data Pipelines

We have only checked for primary key candidate and filter out some unnecessary data such as `Country` data in `df_temp`. However, we still have a lot of work to transform the raw dataframes into what are designed.

1. Rename the columns into what are designed.

## Step 4: Run Pipelines to Model the Data

### 4.1 Create the data model

#### Fact Immigration

In [18]:
def parse_i94port(filepath, startline, endline, comma_sep=False):
    code_name_dict = {}
    
    with open(filepath) as file:
        file_content = file.readlines()
    
    for line in file_content[startline:endline]:
        code, name = line.split('=')
        code, name = code.strip().strip("'"), name.strip().strip("'")
        if comma_sep:
            name = name.split(',')[0].strip()
        code_name_dict[code] = name
    
    return code_name_dict

In [19]:
# Rename the columns
rename_mapping = dict(zip(
    ['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate',
     'i94mode', 'i94visa', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum',
     'airline', 'admnum', 'fltno', 'visatype'],
    ['cic_id', 'year', 'month', 'city', 'state', 'arrival_date', 'departure_date',
     'mode', 'visa', 'citizen_country', 'residence_country', 'birth_year', 'gender',
     'ins_num', 'airline', 'admin_num', 'flight_number', 'visa_type']
))
fact_immi = df_immi.select([col(c).alias(rename_mapping.get(c)) for c in df_immi.columns if c in rename_mapping.keys()])

# Map city
code_city_mapping = parse_i94port('I94_SAS_Labels_Descriptions.SAS', 302, 962, comma_sep=True)

@udf(returnType=StringType())
def map_city(code):
    return code_city_mapping.get(code, 'other')

fact_immi = fact_immi.withColumn('city', map_city(col('city')))

# Map state
code_state_mapping = parse_i94port('I94_SAS_Labels_Descriptions.SAS', 981, 1036)

@udf(returnType=StringType())
def map_state(code):
    return code_state_mapping.get(code, 'other')

fact_immi = fact_immi.withColumn('state', map_state(col('state')))

# Process arrival_date
fact_immi = fact_immi.withColumn('arrival_date', (col("arrival_date") * 24 * 3600).cast(TimestampType()))
fact_immi = fact_immi.withColumn("arrival_date", date_format(col("arrival_date"), "yyyy-MM-dd"))

# Process arrival_date
fact_immi = fact_immi.withColumn('departure_date', (col("departure_date") * 24 * 3600).cast(TimestampType()))
fact_immi = fact_immi.withColumn("departure_date", date_format(col("departure_date"), "yyyy-MM-dd"))

In [20]:
for record in fact_immi.head(5):
    print(record)

Row(cic_id=6, year=2016, month=4, citizen_country=692, residence_country=692, city='NOT REPORTED/UNKNOWN', arrival_date='2026-04-30', mode=None, state='other', departure_date=None, visa=2, birth_year=1979, gender=None, ins_num=None, airline=None, admin_num=1897628485, flight_number=None, visa_type='B2')
Row(cic_id=7, year=2016, month=4, citizen_country=254, residence_country=276, city='ATLANTA', arrival_date='2026-04-08', mode=1, state='ALABAMA', departure_date=None, visa=3, birth_year=1991, gender='M', ins_num=None, airline=None, admin_num=3736796330, flight_number='00296', visa_type='F1')
Row(cic_id=15, year=2016, month=4, citizen_country=101, residence_country=101, city='WASHINGTON DC', arrival_date='2026-04-02', mode=1, state='MICHIGAN', departure_date='2026-08-26', visa=2, birth_year=1961, gender='M', ins_num=None, airline='OS', admin_num=666643185, flight_number='93', visa_type='B2')
Row(cic_id=16, year=2016, month=4, citizen_country=101, residence_country=101, city='NEW YORK', a

In [21]:
# Write code here
fact_immi.write.mode('overwrite').partitionBy('year', 'month').parquet(output_key + 'fact_immi')

#### Dim Temperature

In [22]:
# Rename the columns
rename_mapping = dict(zip(
    ['dt', 'City', 'AverageTemperature', 'AverageTemperatureUncertainty'],
    ['dt', 'city', 'avg_temp', 'avg_temp_uncertainty']
))
dim_temp = df_temp.select([col(c).alias(rename_mapping.get(c)) for c in df_temp.columns if c in rename_mapping.keys()])

# Convert city to uppercase
dim_temp = dim_temp.withColumn('city', upper(col('city')))

# Extract month and year
dim_temp = dim_temp.withColumn('ts', to_date(col('dt'),'yyyy-MM-dd')) \
                   .withColumn('year', year(col('ts'))) \
                   .withColumn('month', month(col('ts'))) \
                   .drop('ts')

# Take only April to match Fact table
dim_temp = dim_temp.filter(col('month') == 4)

In [23]:
for record in dim_temp.head(5):
    print(record)

Row(dt='1745-04-01', avg_temp=10.864, avg_temp_uncertainty=1.432, city='ARLINGTON', year=1745, month=4)
Row(dt='1745-04-01', avg_temp=5.973000000000001, avg_temp_uncertainty=1.005, city='LOWELL', year=1745, month=4)
Row(dt='1746-04-01', avg_temp=None, avg_temp_uncertainty=None, city='LEXINGTON FAYETTE', year=1746, month=4)
Row(dt='1749-04-01', avg_temp=None, avg_temp_uncertainty=None, city='VIRGINIA BEACH', year=1749, month=4)
Row(dt='1750-04-01', avg_temp=7.822999999999999, avg_temp_uncertainty=1.08, city='HARTFORD', year=1750, month=4)


In [24]:
# Write code here
dim_temp.write.mode('overwrite').partitionBy('year', 'month').parquet(output_key + 'dim_temp')

#### Dim Demographics

In [25]:
# Rename the columns
rename_mapping = dict(zip(
    ['City', 'State', 'Male Population', 'Female Population', 'Number of Veterans',
     'Foreign-born', 'Median Age', 'Average Household Size'],
    ['city', 'state', 'male_population', 'female_population', 'num_veterans',
     'foreign_born', 'median_age', 'avg_household_size']
))
dim_demo = df_demo.select([col(c).alias(rename_mapping.get(c)) for c in df_demo.columns if c in rename_mapping.keys()])

# Convert city to uppercase
dim_demo = dim_demo.withColumn('city', upper(col('city')))

# Convert state to uppercase
dim_demo = dim_demo.withColumn('state', upper(col('state')))

In [26]:
for record in dim_demo.head(5):
    print(record)

Row(city='CINCINNATI', state='OHIO', median_age=32.7, male_population=143654, female_population=154883, num_veterans=13699, foreign_born=16896, avg_household_size=2.08)
Row(city='KANSAS CITY', state='KANSAS', median_age=33.4, male_population=74606, female_population=76655, num_veterans=8139, foreign_born=25507, avg_household_size=2.71)
Row(city='LYNCHBURG', state='VIRGINIA', median_age=28.7, male_population=38614, female_population=41198, num_veterans=4322, foreign_born=4364, avg_household_size=2.48)
Row(city='AUBURN', state='WASHINGTON', median_age=37.1, male_population=36837, female_population=39743, num_veterans=5401, foreign_born=14842, avg_household_size=2.73)
Row(city='DAYTON', state='OHIO', median_age=32.8, male_population=66631, female_population=73966, num_veterans=8465, foreign_born=7381, avg_household_size=2.26)


In [27]:
# Write code here
dim_demo.write.mode('overwrite').parquet(output_key + 'dim_demo')

### 4.2 Data Quality Checks
After each processing steps, it is obvious that I always print out and make sure everything is done perfectly. Therefore, the data quality check can be as simple as just re-read the data.

#### Fact Immigration

In [28]:
# Perform quality checks here
spark.read.parquet(output_key + 'fact_immi').head(2)

[Row(cic_id=5748517, citizen_country=245, residence_country=438, city='LOS ANGELES', arrival_date='2026-05-01', mode=1, state='CALIFORNIA', departure_date='2026-05-09', visa=1, birth_year=1976, gender='F', ins_num=None, airline='QF', admin_num=94953870030, flight_number='00011', visa_type='B1', year=2016, month=4),
 Row(cic_id=5748518, citizen_country=245, residence_country=438, city='LOS ANGELES', arrival_date='2026-05-01', mode=1, state='NEVADA', departure_date='2026-05-18', visa=1, birth_year=1984, gender='F', ins_num=None, airline='VA', admin_num=94955622830, flight_number='00007', visa_type='B1', year=2016, month=4)]

#### Dim Temperature

If you see no output in `temp` data, it's because the machine has no memory left instead of the code fault.

In [29]:
# Perform quality checks here
spark.read.parquet(output_key + 'dim_temp').head(2)

[Row(dt='1933-04-01', avg_temp=8.173, avg_temp_uncertainty=0.304, city='ALBUQUERQUE', year=1933, month=4),
 Row(dt='1933-04-01', avg_temp=19.369, avg_temp_uncertainty=0.26899999999999996, city='GAINESVILLE', year=1933, month=4)]

#### Dim Demographics

In [30]:
# Perform quality checks here
spark.read.parquet(output_key + 'dim_demo').head(2)

[Row(city='AMES', state='IOWA', median_age=23.0, male_population=33814, female_population=31238, num_veterans=2265, foreign_born=8606, avg_household_size=2.16),
 Row(city='BIRMINGHAM', state='ALABAMA', median_age=35.6, male_population=102122, female_population=112789, num_veterans=13212, foreign_born=8258, avg_household_size=2.21)]

### 4.3 Data dictionary

#### Fact Immigration
* `cic_id (Integer)`: CIC ID
* `year (Integer)`: immigration year
* `month (Integer)`: immigration month
* `city (String)`: uppercase immigration destination city
* `state (String)`: uppercase immigration destination state
* `mode (Integer)`: traffic method
* `visa (Integer)`: VISA category
* `arrival_date (String)`: arrival date
* `departure_date (String)`: departure date
* `citizen_country (String)`: country of citizenship
* `residence_country (String)`: country of residence
* `birth_year (Integer)`: birth year
* `gender (String)`: 1 character, 'M' or 'F'
* `ins_num (Integer)`: INS number
* `airline (String)`: airline used to arrive in USA
* `admin_num (Long)`: admission number
* `flight_num (Integer)`: flight number
* `visa_type (Integer)`: class of legal immigration admission

#### Dim Temperature
* `dt (Timestamp)`: temperature date in format `YYYY-MM-dd`
* `city (String)`: uppercase temperature city
* `avg_temp (Double)`: Average temperature
* `avg_temp_uncertainty (Double)`: Average temperature uncertainty
* `year (Integer)`: temperature year extracted from `dt`
* `month (Integer)`: temperature month extracted from `dt`

#### Dim Demographics
* `city (String)`: uppercase demographics city
* `state (String)`: uppercase demographics state
* `male_population (Integer)`: city male population
* `female_population (Integer)`: city female population
* `num_veterans (Integer)`: number of veterans
* `foreign_born (Integer)`: number of foreign-born babies
* `median_age (Double)`: city median age
* `avg_household_size (Double)`: city average household size

## Step 5: Complete Project Write Up

#### Tools and Technologies
1. AWS S3 for data storage seems to be the best choice for storing Data Lake.
2. Pandas to explore and analyze the data as it is very good with small scale datasets (we only analyze with a small proportion of data).
3. PySpark for the main ETL process since this requires enormous processing power.

#### Data Update Frequency
1. Since the data is distributed and loaded to the system by month, the pipeline should be scheduled monthly to build `Fact Immigration` and `Dim Temperature`.
2. `Dim Demographics` can be updated annually since it is not that necessary to update demographic data realtime.
3. All tables should be updated in an `append-only` mode.

#### Future Design Considerations
1. The data was increased by 100x.
    If Spark with standalone server mode can not process 100x data set, we could consider using [AWS EMR](https://us-west-2.console.aws.amazon.com/elasticmapreduce/home?region=us-west-2#) for processing, yet still use [AWS S3](https://s3.console.aws.amazon.com/s3/get-started?region=us-west-2) for storage.

2. The data populate a dashboard that must be updated on a daily basis by 7am every day.
    [Apache Airflow](https://airflow.apache.org) could be used for building a data pipeline to regularly update the data which populate the dashboard.

3. The database needed to be accessed by 100+ people.
    [AWS Redshift](https://us-east-1.console.aws.amazon.com/redshiftv2/home?region=us-east-1#landing) can handle up to 500 connections. Therefore, Redshift should find it easy to handle a workload of 100+ connections at a point of time.
    
In general, when it comes to scaling the pipeline, the most feasible and efficient solution seems to be *cloudify* it. This could be done using AWS, Microsoft Azure or any trustworthy Cloud service platform. 