# U.S. Immigration
### Data Engineering Project

According to the Gallup World Poll survey(2013-2016), the United States is one of the top desired destinations for potential migrants to move. Around 147 millions of people who participated in the survey worldwide would like to immigrate to the U.S. This project is to create a data lake to help people know more about the country demographics. In addition, the officers, who inspect and examine passengers at U.S. ports of entry, are able to retrieve insights over the i94 visitor arrival/departure records.

#### Here are the datasets used to build the data lake:
* I94 Visitor Arrival/Departure Records: This data comes from the US National Tourism and Trade Office. You can read more about it [here](https://travel.trade.gov/research/reports/i94/historical/2016.html).
* U.S. Cities Demographic Data: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

#### Here are the insights could be retrieved from I94 records:
* Top 10 countries where visitors are from
* Top 3 busiest ports of entry
* Number of visitors arrive at U.S ports per day

#### Here are the questions might help people choose where to live:
* Which group is the racial majority of each city? 


### Step 1: Explore and Assess the Data
#### Explore the Data 
U.S. Cities Demographic Data has one column for total city poplulation, one column for race and one column for race count. However, the sum of race counts in each city is not equal to the total city polulation. With the assumption that people might self-identify themselves as members of different races, sum of race counts is used to calculate racial majority. We just want to have a general idea about community racial/ethnic composition in each city.



### Step 2: Define the Data Model
#### 2.1 Conceptual Data Model

A snowflake schema is created for optimizing queries on i94 visitor arrival/departure records analysis.
* Fact table: 
    - visitor_arrival_departure_records \
    year: arrival year - 4 digits, Integer \
    month: arrival month, Integer \
    arrdate_date: arrival date, Date \
    depdate_date: departure date, Date \
    arrival_port_code: arrival port code, Integer \
    file_create_date: creating file date, String \
    cicid: visitor client id, Integer 

    
* Dimension tables: 
    - visitors \
    cicid: visitor client id, Integer \
    from_country_code: code for the country that visitor comes from, Integer \
    arrival_transportation_code: arrival transportation code, Integer \
    address_state: state code, String \
    occup as occupation: visitor's occupation, String \
    birthyear: visitor's birthyear, Integer \
    gender: visitor's gender, String \
    visit_purpose_code: code for the visiting purpose, Integer \
    visatype: type of visa, String
    - country_codes \
    code: country code, Integer \
    name: country name, String
    - port_codes \
    code: port code, Integer \
    location: port of entry location, String 
    - visit_purpose_codes (1 - Business, 2 - Pleasure, 3 - Student)\
    code: code, Integer \
    purpose: purpose of visiting U.S., String 
    - arrival_transportation_codes (1 - Air, 2 - Sea, 3 - Land, 9 - Unknown)\
    code: transportation code, Integer \
    transportation: String


A star schema is created for optimizing queries on U.S. Cities Demographic analysis.
* Fact table:
    - cities \
    city: name of city, String \
    state_code: state abbreviation, String \
    total_population: Integer \
    race_code: Integer \
    race_count: Integer 
* Dimension table:
    - state_codes \
    state_code: state abbreviation, String \
    state: state name, String
    - race_codes \
    code: Integer \
    race: String


### Step 3: Run Pipelines to Model the Data 
#### 3.1 Create the data model
* Step 1: load i94 visitor arrival/departure raw records to staging table `border_arrival_departure_records_df` and change data type of some columns
* Step 2: extract columns from staging table to create `visitor_arrival_departure_records` table and save it in parquet file(s)
* Step 3: extract columns from staging table to create `visitors` table and save it in parquet file(s)
* Step 4: load country codes txt file, create `country_codes` table, and save it in parquet file(s)
* Step 5: load i94 port location mapping txt file, create `port_codes` table, and save it in parquet file(s)
* Step 6: load arrival transportation codes txt file and create `arrival_transportation_codes` table
* Step 7: load visit purpose codes txt file and create `visit_purpose_codes` table
* Step 8: load U.S. cities demographics csv file to staging table `us_cities_demographics_staging_df` and change data type of some columns
* Step 9: extract columns from staging table to create `race_codes` table and save it in parquet file(s)
* Step 10: extract columns from staging table to create `us_cities_demographics` table and save it in parquet file(s)
* Step 11: extract columns from staging table to create `state_codes` table and save it in parquet file(s)



In [57]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, LongType as Long, TimestampType as Ts
from pyspark.sql.functions import udf, col, row_number, length, sum as Fsum, round
from pyspark.sql import SparkSession
from pyspark.sql.window import Window as W

In [2]:
def create_spark_session():
    spark_session = SparkSession \
        .builder \
        .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.5")\
        .enableHiveSupport()\
        .getOrCreate()

    return spark_session

In [3]:
spark = create_spark_session()

In [4]:
####################################################################################################
#
# Step 1: load i94 visitor arrival/departure raw records to staging table `border_arrival_departure_records_df` and change data type of some columns
#
####################################################################################################

In [5]:
border_arrival_departure_records_in_parquet = './raw/sas_data'

In [6]:
border_arrival_departure_records_df = spark.read.parquet(border_arrival_departure_records_in_parquet)

In [7]:
border_arrival_departure_records_df = border_arrival_departure_records_df.withColumn('cicid', border_arrival_departure_records_df["cicid"].cast(Int()))
border_arrival_departure_records_df = border_arrival_departure_records_df.withColumn('i94yr', border_arrival_departure_records_df["i94yr"].cast(Int()))
border_arrival_departure_records_df = border_arrival_departure_records_df.withColumn('i94mon', border_arrival_departure_records_df["i94mon"].cast(Int()))
border_arrival_departure_records_df = border_arrival_departure_records_df.withColumn('i94res', border_arrival_departure_records_df["i94res"].cast(Int()))
border_arrival_departure_records_df = border_arrival_departure_records_df.withColumn('i94mode', border_arrival_departure_records_df["i94mode"].cast(Int()))
border_arrival_departure_records_df = border_arrival_departure_records_df.withColumn('i94bir', border_arrival_departure_records_df["i94bir"].cast(Int()))
border_arrival_departure_records_df = border_arrival_departure_records_df.withColumn('i94visa', border_arrival_departure_records_df["i94visa"].cast(Int()))
border_arrival_departure_records_df = border_arrival_departure_records_df.withColumn('biryear', border_arrival_departure_records_df["biryear"].cast(Int()))
border_arrival_departure_records_df = border_arrival_departure_records_df.withColumn('arrdate', border_arrival_departure_records_df["arrdate"].cast(Int()))
border_arrival_departure_records_df = border_arrival_departure_records_df.withColumn('depdate', border_arrival_departure_records_df["depdate"].cast(Int()))

In [8]:
from datetime import timedelta, datetime

add_days = udf (lambda x: datetime.strptime('19600101', '%Y%m%d').date() + timedelta(days=x) if x is not None else None, Date())

border_arrival_departure_records_df = border_arrival_departure_records_df.withColumn('arrdate_date', add_days(col('arrdate')))
border_arrival_departure_records_df = border_arrival_departure_records_df.withColumn('depdate_date', add_days(col('depdate')))

In [9]:
#########################################################################################################################################################
#
# Step 2: extract columns from staging table to create `visitor_arrival_departure_records` table and save it in parquet file(s)
#
#########################################################################################################################################################

In [10]:
cols = ['i94yr as year', 'i94mon as month', 'arrdate_date', 'depdate_date', 'i94port as arrival_port_code', 'dtadfile as file_create_date', 'cicid']
arrival_departure_records_df = border_arrival_departure_records_df.selectExpr(*cols)
arrival_departure_records_df.write.partitionBy('year', 'month').parquet('./visitor-arrival-departure-records', 'append')

In [11]:
arrival_departure_records_df.show(2)

+----+-----+------------+------------+-----------------+----------------+-------+
|year|month|arrdate_date|depdate_date|arrival_port_code|file_create_date|  cicid|
+----+-----+------------+------------+-----------------+----------------+-------+
|2016|    4|  2016-04-30|  2016-05-08|              LOS|        20160430|5748517|
|2016|    4|  2016-04-30|  2016-05-17|              LOS|        20160430|5748518|
+----+-----+------------+------------+-----------------+----------------+-------+
only showing top 2 rows



In [12]:
#########################################################################################################################################################
#
# Step 3: extract columns from staging table to create `visitors` table and save it in parquet file(s)
#
#########################################################################################################################################################

In [13]:
cols = ['cicid', 'i94res as from_country_code', 'i94mode as arrival_transportation_code', 'i94addr as address_state', 'occup as occupation', 'biryear as birthyear', 'gender', 'i94visa as visit_purpose_code', 'visatype']
visitors_df = border_arrival_departure_records_df.selectExpr(*cols)
visitors_df.write.parquet('./visitors', 'append')

In [14]:
visitors_df.show(2)

+-------+-----------------+---------------------------+-------------+----------+---------+------+------------------+--------+
|  cicid|from_country_code|arrival_transportation_code|address_state|occupation|birthyear|gender|visit_purpose_code|visatype|
+-------+-----------------+---------------------------+-------------+----------+---------+------+------------------+--------+
|5748517|              438|                          1|           CA|      null|     1976|     F|                 1|      B1|
|5748518|              438|                          1|           NV|      null|     1984|     F|                 1|      B1|
+-------+-----------------+---------------------------+-------------+----------+---------+------+------------------+--------+
only showing top 2 rows



In [15]:
#########################################################################################################################################################
#
# Step 4: load country codes txt file, create `country_codes` table, and save it in parquet file(s)
#
#########################################################################################################################################################

In [16]:
country_codes_in_txt = './raw/country-codes.txt'

country_codes_schema = R([
    Fld("code",Int()),
    Fld("name",Str()),
])

country_codes_df = spark.read.csv(country_codes_in_txt, header=False, sep=';', schema=country_codes_schema)

country_codes_df.write.parquet('./codes/country-codes', 'overwrite')

country_codes_df.show(2, False)

+----+-----------+
|code|name       |
+----+-----------+
|582 |MEXICO     |
|236 |AFGHANISTAN|
+----+-----------+
only showing top 2 rows



In [17]:
#########################################################################################################################################################
#
# Step 5: load i94 port location mapping txt file, create `port_codes` table, and save it in parquet file(s)
#
#########################################################################################################################################################

In [18]:
ports_in_txt = './raw/i94-port-location-mapping.txt'

ports_schema = R([
    Fld("code",Str()),
    Fld("location",Str()),
])

ports_df = spark.read.csv(ports_in_txt, header=False, sep=';', schema=ports_schema)

ports_df.write.parquet('./codes/port-codes', 'overwrite')

ports_df.show(2, False)

+----+-------------+
|code|location     |
+----+-------------+
|ALC |ALCAN, AK    |
|ANC |ANCHORAGE, AK|
+----+-------------+
only showing top 2 rows



In [19]:
#########################################################################################################################################################
#
# Step 6: load arrival transportation codes txt file and create `arrival_transportation_codes` table
#
#########################################################################################################################################################

In [20]:
arrival_transportation_codes_in_txt = './codes/arrival-transportation-codes.txt'

arrival_transportation_codes_schema = R([
    Fld("code",Int()),
    Fld("transportation",Str()),
])

arrival_transportation_codes = spark.read.csv(arrival_transportation_codes_in_txt, header=False, sep=';', schema=arrival_transportation_codes_schema)

arrival_transportation_codes.show(5, False)

+----+--------------+
|code|transportation|
+----+--------------+
|1   |Air           |
|2   |Sea           |
|3   |Land          |
|9   |Unknown       |
+----+--------------+



In [21]:
#########################################################################################################################################################
#
# Step 7: load visit purpose codes txt file and create `visit_purpose_codes` table
#
#########################################################################################################################################################

In [22]:
visit_purpose_codes_in_txt = './codes/visit-purpose-codes.txt'

visit_purpose_codes_schema = R([
    Fld("code",Int()),
    Fld("purpose",Str()),
])

visit_purpose_codes = spark.read.csv(visit_purpose_codes_in_txt, header=False, sep=';', schema=visit_purpose_codes_schema)

visit_purpose_codes.show(5, False)

+----+--------+
|code|purpose |
+----+--------+
|1   |Business|
|2   |Pleasure|
|3   |Student |
+----+--------+



In [23]:
#########################################################################################################################################################
#
# Step 8: load U.S. cities demographics csv file to staging table `us_cities_demographics_staging_df` and change data type of some columns
#
#########################################################################################################################################################

In [24]:
def generate_us_cities_demographics_schema():    
    schema = R([
        Fld("city",Str()),
        Fld("state",Str()),
        Fld("median_age",Dbl()),
        Fld("male_population",Int()),
        Fld("female_population",Int()),
        Fld("total_population",Int()),
        Fld("number_of_veterans",Int()),
        Fld("number_of_foreign_born",Int()),
        Fld("average_household_size",Dbl()),
        Fld("state_code",Str()),
        Fld("race",Str()),
        Fld("count",Int()),
    ])
    return schema

In [25]:
us_cities_demographics_schema = generate_us_cities_demographics_schema()

In [26]:
us_cities_demographics_in_s3 = './raw/us-cities-demographics.csv'
us_cities_demographics_staging_df = spark.read.csv(us_cities_demographics_in_s3, header=True, sep=';', schema=us_cities_demographics_schema)
us_cities_demographics_staging_df.show(3, False)

+-------------+-------------+----------+---------------+-----------------+----------------+------------------+----------------------+----------------------+----------+------------------+-----+
|city         |state        |median_age|male_population|female_population|total_population|number_of_veterans|number_of_foreign_born|average_household_size|state_code|race              |count|
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+----------------------+----------------------+----------+------------------+-----+
|Silver Spring|Maryland     |33.8      |40601          |41862            |82463           |1562              |30908                 |2.6                   |MD        |Hispanic or Latino|25924|
|Quincy       |Massachusetts|41.0      |44129          |49500            |93629           |4147              |32935                 |2.39                  |MA        |White             |58723|
|Hoover       |Alabama      |38.5  

In [27]:
#########################################################################################################################################################
#
# Step 9: extract columns from staging table to create `race_codes` table and save it in parquet file(s)
#
#########################################################################################################################################################

In [29]:
race_codes_df = us_cities_demographics_staging_df.select(['race']).distinct()
windowSpec = W.orderBy('race')
race_codes_df = race_codes_df.withColumn('code', row_number().over(windowSpec))
race_codes_df.coalesce(1).write.parquet('./codes/race-codes', 'overwrite')

In [30]:
race_codes_df.show(10, False)

+---------------------------------+----+
|race                             |code|
+---------------------------------+----+
|American Indian and Alaska Native|1   |
|Asian                            |2   |
|Black or African-American        |3   |
|Hispanic or Latino               |4   |
|White                            |5   |
+---------------------------------+----+



In [31]:
#########################################################################################################################################################
#
# Step 10: extract columns from staging table to create us_cities_demographics table and save it in parquet file(s)
#
#########################################################################################################################################################

In [32]:
cols = ["us_cities_demographics_staging_df.*", "race_codes_df.code as race_code"]
us_cities_demographics_staging_df = us_cities_demographics_staging_df.join(race_codes_df, race_codes_df['race'] == us_cities_demographics_staging_df['race'], how = 'left').select(us_cities_demographics_staging_df['*'], race_codes_df['code'])

In [33]:
cols = ['city', 'state_code', 'total_population', 'code as race_code', 'count as race_count']
us_cities_demographics_df = us_cities_demographics_staging_df.selectExpr(*cols)
us_cities_demographics_df.coalesce(1).write.parquet('./cities', 'overwrite')

In [34]:
us_cities_demographics_df.show(10, False)

+----------------+----------+----------------+---------+----------+
|city            |state_code|total_population|race_code|race_count|
+----------------+----------+----------------+---------+----------+
|Silver Spring   |MD        |82463           |4        |25924     |
|Quincy          |MA        |93629           |5        |58723     |
|Hoover          |AL        |84839           |2        |4759      |
|Rancho Cucamonga|CA        |175232          |3        |24437     |
|Newark          |NJ        |281913          |5        |76402     |
|Peoria          |IL        |118661          |1        |1343      |
|Avondale        |AZ        |80683           |3        |11592     |
|West Covina     |CA        |108489          |2        |32716     |
|O'Fallon        |MO        |85032           |4        |2583      |
|High Point      |NC        |109828          |2        |11060     |
+----------------+----------+----------------+---------+----------+
only showing top 10 rows



In [35]:
#########################################################################################################################################################
#
# Step 11: extract columns from staging table to create state_codes table and save it in parquet file(s)
#
#########################################################################################################################################################

In [36]:
state_codes_df = us_cities_demographics_staging_df.select(['state_code', 'state']).drop_duplicates() 
state_codes_df.coalesce(1).write.parquet('./codes/state-codes', 'overwrite')

In [37]:
state_codes_df.show(4)

+----------+--------------+
|state_code|         state|
+----------+--------------+
|        MT|       Montana|
|        NC|North Carolina|
|        MD|      Maryland|
|        CO|      Colorado|
+----------+--------------+
only showing top 4 rows



### Step 4: Run Data Quality Checks

In [38]:
check_if_table_has_records = [
    {'df': spark.read.parquet('./codes/state-codes'), 'table': 'state_codes'},
    {'df': spark.read.parquet('./codes/country-codes'), 'table': 'country_odes'},
    {'df': spark.read.parquet('./codes/race-codes'), 'table': 'race_odes'},
    {'df': spark.read.parquet('./codes/port-codes'), 'table': 'port_odes'},
    {'df': spark.read.parquet('./cities'), 'table': 'cities'},
    {'df': spark.read.parquet('./visitors'), 'table': 'visitors'},
    {'df': spark.read.parquet('./visitor-arrival-departure-records'), 'table': 'vistor_arrival_departure_records'},
]

for test in check_if_table_has_records:
    if test['df'].count() == 0:
        raise ValueError(f"Data quality check failed. {test['table']} has no records")
print("check_if_table_has_records tests past")

check_if_table_has_records tests past


### Step 5: Create Analytical Tables

In [39]:
#########################################################################################################################################################
#
# Top 10 countries where visitors are from
#
#########################################################################################################################################################

In [40]:
visitors_in_file = './visitors'
country_codes_in_file = './codes/country-codes'

visitors_df = spark.read.parquet(visitors_in_file)
country_codes_df = spark.read.parquet(country_codes_in_file)

country_code_count = visitors_df.groupby(visitors_df.from_country_code).count().sort(col("count").desc()) 
top_10_countries_where_visitors_are_from = country_code_count.join(country_codes_df, country_code_count.from_country_code == country_codes_df.code, how = 'left').select(country_codes_df['name'], country_code_count['count']) 

top_10_countries_where_visitors_are_from.show(10, False)

+--------------+------+
|name          |count |
+--------------+------+
|UNITED KINGDOM|368421|
|JAPAN         |249167|
|CHINA, PRC    |185609|
|FRANCE        |185339|
|MEXICO        |179603|
|GERMANY       |156613|
|SOUTH KOREA   |136312|
|BRAZIL        |134907|
|AUSTRALIA     |112407|
|INDIA         |107193|
+--------------+------+
only showing top 10 rows



In [45]:
#########################################################################################################################################################
#
# Top 3 busiest ports of entry
#
#########################################################################################################################################################

In [46]:
visitor_arrival_departure_records_in_file = './visitor-arrival-departure-records' 
port_codes_in_file = './codes/port-codes'

visitor_arrival_departure_records_df = spark.read.parquet(visitor_arrival_departure_records_in_file)
port_codes_df = spark.read.parquet(port_codes_in_file)

arrival_ports_count = visitor_arrival_departure_records_df.groupby(col('arrival_port_code')).count().sort(col("count").desc()).limit(3)

arrival_ports_count.join(port_codes_df, arrival_ports_count.arrival_port_code == port_codes_df.code, how = 'left').select(port_codes_df['*'], arrival_ports_count['count']).show()

+----+---------------+------+
|code|       location| count|
+----+---------------+------+
| NYC|   NEW YORK, NY|485916|
| MIA|      MIAMI, FL|343941|
| LOS|LOS ANGELES, CA|310163|
+----+---------------+------+



In [47]:
#########################################################################################################################################################
#
# Number of visitors arrive at U.S ports per day
#
#########################################################################################################################################################

In [49]:
visitor_arrival_departure_records_df.groupby(col('arrdate_date')).count().sort(col("arrdate_date").asc()).show()

+------------+------+
|arrdate_date| count|
+------------+------+
|  2016-04-01|108407|
|  2016-04-02|103196|
|  2016-04-03| 99972|
|  2016-04-04| 97653|
|  2016-04-05| 91514|
|  2016-04-06| 88273|
|  2016-04-07| 99763|
|  2016-04-08|103660|
|  2016-04-09|105930|
|  2016-04-10|104394|
|  2016-04-11| 98737|
|  2016-04-12| 85600|
|  2016-04-13| 91173|
|  2016-04-14|107557|
|  2016-04-15|114803|
|  2016-04-16|114970|
|  2016-04-17|106474|
|  2016-04-18|100493|
|  2016-04-19| 86068|
|  2016-04-20| 95428|
+------------+------+
only showing top 20 rows



In [50]:
#########################################################################################################################################################
#
# Which group is the racial majority of each city?
#
#########################################################################################################################################################

In [84]:
us_cities_demographics_in_file = './cities'
us_cities_demographics_df = spark.read.parquet(us_cities_demographics_in_file)

race_codes_in_file = './codes/race-codes'
race_codes_df = spark.read.parquet(race_codes_in_file)

windowSpec = W.partitionBy('city', 'state_code')
us_cities_demographics_df = us_cities_demographics_df.withColumn("race_percentage", round(col('race_count') / Fsum("race_count").over(windowSpec) * 100, 2).cast("double"))
racial_majority_df = us_cities_demographics_df \
                      .orderBy(['city', 'state_code', 'race_percentage'], ascending=False) \
                      .groupby('city', 'state_code').agg({'race_code': 'first'}) \
                      .withColumnRenamed("first(race_code)", "racial_majority")

racial_majority_df = racial_majority_df.join(race_codes_df, race_codes_df.code == racial_majority_df.racial_majority, how='left') \
                      .select(racial_majority_df['city'], racial_majority_df['state_code'], race_codes_df['race'])
                    
                    
racial_majority_df.show(20, False)

+--------------+----------+-------------------------+
|city          |state_code|race                     |
+--------------+----------+-------------------------+
|Norwalk       |CA        |Hispanic or Latino       |
|Mesa          |AZ        |White                    |
|Asheville     |NC        |White                    |
|Waukegan      |IL        |White                    |
|Mountain View |CA        |White                    |
|Mount Pleasant|SC        |White                    |
|Minneapolis   |MN        |White                    |
|Arvada        |CO        |White                    |
|South Jordan  |UT        |White                    |
|Ontario       |CA        |Hispanic or Latino       |
|Fontana       |CA        |Hispanic or Latino       |
|Deltona       |FL        |White                    |
|Saint Cloud   |MN        |White                    |
|Lee's Summit  |MO        |White                    |
|Santa Clara   |CA        |Asian                    |
|Lansing       |MI        |W

#### Other Comments

The reasons for building data lake with spark:
* Spark has the ability to cache data across multiple nodes and transform data quickly.
* Due to the fact that data are stored in its original format, it is very flexible to query any records or process the data with different approaches.
* With schema-on-read feature, data format of data source can be flexible.

In production environment, the script must be run by a cron job. It is even better to build a ELT pipeline with Airflow. 

For the following scenarios:
 * If the data is increased by 100x: \
     I will use cloud service to run the script and save data in S3.
 * If the data populates a dashboard that must be updated on a daily basis by 7am every day: \
     Either set the cron job to run at 7am every day or create an Ariflow task to run at 7am every day.
 * If the database needed to be accessed by 100+ people: \
     I will create the data lake in S3. 