# US Immigration Analysis

#### Project Summary
Please refer to the [README.md](./README.md) for the project summary.

### Import Libraries

In [1]:
import pandas as pd
import os
import glob
import re
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType, IntegerType

### Set Environment Variables

In [2]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

### Create Spark Session

In [3]:
spark = SparkSession.builder.getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Please refer to the [README.md](./README.md) for the project scope.

#### Dataset Descriptions
Please refer to the [README.md](./README.md) for the dataset descriptions.

#### Load Data

In [4]:
# Read I94 immigration data
immigration_df = spark.read.load('./sas_data')

In [5]:
# Display first few entries of immigration data
immigration_df.limit(3).toPandas().head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1


In [6]:
# Read temperature data
temp_fname = "../../data2/GlobalLandTemperaturesByCity.csv"
temp_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(temp_fname)

In [7]:
# Display first few entries of temperature data
temp_df.limit(3).toPandas().head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E


In [8]:
# Read demographics data
demo_fname = "us-cities-demographics.csv"
demo_df = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(demo_fname)

In [9]:
# Display first few entries of demographics data
demo_df.limit(3).toPandas().head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759


### Step 2: Explore and Assess the Data
#### Explore and Clean Immigration Data

In [10]:
# Create list of valid ports
i94_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_airports = {}
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_airports[results.group(1)] = results.group(2)
print(len(valid_airports))

659


In [11]:
# Create list of valid states
valid_states = demo_df.toPandas()["State Code"].unique()
print(len(valid_states))
print(valid_states)

49
['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA' 'NV'
 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY' 'SC' 'LA'
 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID' 'IN' 'AK' 'MS'
 'HI' 'SD' 'ME' 'MT']


In [12]:
# Convert SAS date to PySpark date function
@udf(StringType())
def conv_date(sas_date):
    if sas_date:
        return (datetime(1960, 1, 1).date() + timedelta(sas_date)).isoformat()
    return None

In [13]:
# State validation function
@udf(StringType())
def validate_state(state):  
    if state in valid_states:
        return state
    return 'other'

In [14]:
# Remove any missing values
clean_immigration_df = immigration_df.dropna(how="any", subset=["i94port", "i94addr", "gender"])

# Extract valid states 
clean_immigration_df = clean_immigration_df.withColumn("i94addr", validate_state(clean_immigration_df.i94addr))

# Convert arrival_date (SAS format) to PySpark format
clean_immigration_df = clean_immigration_df.withColumn("arrdate", conv_date(clean_immigration_df.arrdate))

# only keep us related immigration data
clean_immigration_df = clean_immigration_df.filter(clean_immigration_df.i94addr != 'other')

stage_immigration_df = clean_immigration_df.select(col("cicid").alias("id"),
                                       col("arrdate").alias("date"),
                                       col("i94port").alias("city_code"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("i94visa").alias("visa_type"),
                                       "count").drop_duplicates()

stage_immigration_df.limit(3).toPandas()

Unnamed: 0,id,date,city_code,state_code,age,gender,visa_type,count
0,5925997.0,2016-04-09,BUF,NY,,U,2.0,1.0
1,5928645.0,2016-04-25,HHW,HI,,U,2.0,1.0
2,158100.0,2016-04-01,MIA,FL,0.0,F,2.0,1.0


In [15]:
stage_immigration_df.printSchema()

root
 |-- id: double (nullable = true)
 |-- date: string (nullable = true)
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_type: double (nullable = true)
 |-- count: double (nullable = true)



#### Explore and Clean Temperature Data

In [16]:
temp_df.count()

8599212

In [17]:
temp_df.limit(3).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E


In [18]:
# Map full city name to city port abbreviation
@udf(StringType())
def city_to_airport(city):
    for key in valid_airports:
        if city.lower() in valid_airports[key].lower():
            return key

In [19]:
# Only use temperatures from United States
# Map full name to city port abbreviation
# Remove invalid ports
clean_temp_df = temp_df.filter(temp_df["Country"] == "United States") \
    .withColumn("year", year(temp_df['dt'])) \
    .withColumn("month", month(temp_df["dt"])) \
    .withColumn("i94port", city_to_airport(temp_df["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])

# Only use temperatures from 2013 (the latest year in the dataset)
clean_temp_df = clean_temp_df.filter(clean_temp_df["year"] == 2013)

stage_temp_df = clean_temp_df.select(col("year"), col("month"), col("i94port").alias("city_code"),
                                         round(col("AverageTemperature"), 1).alias("avg_temperature"),
                                         col("Latitude").alias("lat"), col("Longitude").alias("long")).drop_duplicates()

print(stage_temp_df.count())
stage_temp_df.limit(3).toPandas()

1044


Unnamed: 0,year,month,city_code,avg_temperature,lat,long
0,2013,4,COL,16.9,32.95N,85.21W
1,2013,1,DAB,0.5,39.38N,83.24W
2,2013,1,ONT,6.8,34.56N,116.76W


In [20]:
stage_temp_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- city_code: string (nullable = true)
 |-- avg_temperature: float (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)



#### Explore and Clean Demographics Data

In [21]:
demo_df.count()

2891

In [22]:
demo_df.limit(3).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759


In [23]:
# Calculate percentages of numeric columns and create new ones
clean_demo_df = demo_df.withColumn("median_age", demo_df['Median Age']) \
    .withColumn("pct_male_pop", (demo_df['Male Population'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_female_pop", (demo_df['Female Population'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_veterans", (demo_df['Number of Veterans'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_foreign_born", (demo_df['Foreign-born'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_race", (demo_df['Count'] / demo_df['Total Population']) * 100) \
    .withColumn("city_code", city_to_airport(demo_df["City"])) \
    .dropna(how='any', subset=["city_code"])

clean_demo_df = clean_demo_df.select(col("City").alias("city_name"), col("State Code").alias("state_code"), 
                         "median_age", "pct_male_pop", "pct_female_pop","pct_veterans", 
                         "pct_foreign_born", col("Total Population").alias("total_pop"), 
                         col("Race").alias("race"), "pct_race").drop_duplicates()

clean_demo_df.count()

883

In [24]:
# Pivot the race column
pivot_demo_df = clean_demo_df.groupBy("city_name", "state_code", "median_age", "pct_male_pop",
                                        "pct_female_pop","pct_veterans", "pct_foreign_born", "total_pop").pivot("Race").avg("pct_race")

pivot_demo_df = pivot_demo_df.withColumn("city_code", city_to_airport(pivot_demo_df["city_name"])) \
    .dropna(how='any', subset=["city_code"])

stage_demo_df = pivot_demo_df.select("city_code", "state_code", "city_name", "median_age",
                                    round(col("pct_male_pop"), 1).alias("pct_male_pop"),
                                    round(col("pct_female_pop"), 1).alias("pct_female_pop"),
                                    round(col("pct_veterans"), 1).alias("pct_veterans"),
                                    round(col("pct_veterans"), 1).alias("pct_foreign_born"),
                                    round(col("American Indian and Alaska Native"), 1).alias("pct_native_american"),
                                    round(col("Asian"), 1).alias("pct_asian"),
                                    round(col("Black or African-American"), 1).alias("pct_black"),
                                    round(col("Hispanic or Latino"), 1).alias("pct_hispanic_or_latino"),
                                    round(col("White"), 1).alias("pct_white"), "total_pop")
print(stage_demo_df.count())
stage_demo_df.limit(3).toPandas()

180


Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop
0,TUC,AZ,Tucson,33.6,49.8,50.2,7.2,7.2,4.6,4.6,6.4,43.5,76.1,531674
1,MCA,TX,Allen,37.2,52.3,47.7,3.6,3.6,0.2,16.1,13.4,10.8,71.2,98138
2,CRP,TX,Corpus Christi,35.0,49.5,50.5,7.7,7.7,0.9,2.8,4.6,61.9,90.3,324082


In [25]:
stage_demo_df.printSchema()

root
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- pct_male_pop: double (nullable = true)
 |-- pct_female_pop: double (nullable = true)
 |-- pct_veterans: double (nullable = true)
 |-- pct_foreign_born: double (nullable = true)
 |-- pct_native_american: double (nullable = true)
 |-- pct_asian: double (nullable = true)
 |-- pct_black: double (nullable = true)
 |-- pct_hispanic_or_latino: double (nullable = true)
 |-- pct_white: double (nullable = true)
 |-- total_pop: string (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

- I chose a star schema data model because of its simplicity and efficacy.
- Users can write simple queries by joing fact and dimension tables to analyze the data.

Please refer to the [README.md](./README.md) for a detailed database schema.

#### 3.2 Mapping Out Data Pipelines
##### ETL Steps:

Please refer to the [README.md](./README.md) for ETL steps.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [26]:
# Create dimension table for immigrant
immigrant_df = stage_immigration_df.select("id", "gender", "age", "visa_type").drop_duplicates()

In [27]:
immigrant_df.count()

2435922

In [28]:
immigrant_df.limit(3).toPandas()

Unnamed: 0,id,gender,age,visa_type
0,488015.0,M,6.0,2.0
1,3885998.0,M,11.0,2.0
2,3305770.0,M,13.0,2.0


In [29]:
# Create dimension table for city
city_df = stage_demo_df.join(stage_temp_df, "city_code") \
    .select("city_code", "state_code", "city_name", "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
           "pct_foreign_born", "pct_native_american", "pct_asian", "pct_black",
           "pct_hispanic_or_latino", "pct_white", "total_pop", "lat", "long").drop_duplicates()

In [30]:
city_df.count()

142

In [31]:
city_df.limit(3).toPandas()

Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop,lat,long
0,BRO,TX,Brownsville,30.6,47.7,52.3,2.3,2.3,0.6,0.9,0.7,92.5,95.0,183888,26.52N,96.72W
1,HSV,WI,Madison,30.7,49.2,50.8,3.9,3.9,0.9,9.6,8.2,7.9,82.1,248956,34.56N,85.62W
2,ATL,GA,Atlanta,33.8,48.3,51.7,4.0,4.0,1.0,5.2,52.9,4.0,42.3,463875,34.56N,83.68W


In [32]:
# Create dimension table for monthly city temperature
monthly_city_temp_df = stage_temp_df.select("city_code", "year", "month", "avg_temperature").drop_duplicates()

In [33]:
monthly_city_temp_df.count()

1043

In [34]:
monthly_city_temp_df.limit(3).toPandas()

Unnamed: 0,city_code,year,month,avg_temperature
0,SAA,2013,6,18.6
1,PHI,2013,5,16.6
2,BOS,2013,5,14.3


In [35]:
# Create dimension table for time
time_df = stage_immigration_df.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))
                        
time_df = time_df.select("date", "dayofweek", "weekofyear", "month").drop_duplicates()

In [36]:
time_df.count()

30

In [37]:
time_df.limit(3).toPandas()

Unnamed: 0,date,dayofweek,weekofyear,month
0,2016-04-23,7,16,4
1,2016-04-22,6,16,4
2,2016-04-08,6,14,4


In [38]:
# Create fact table for immigration
immigration_df = stage_immigration_df.select("id", "state_code", "city_code", "date", "count").drop_duplicates()

In [39]:
immigration_df.count()

2435922

In [40]:
immigration_df.limit(3).toPandas()

Unnamed: 0,id,state_code,city_code,date,count
0,4750990.0,HI,HHW,2016-04-25,1.0
1,4308064.0,HI,HHW,2016-04-23,1.0
2,4232231.0,FL,ATL,2016-04-22,1.0


In [41]:
# Write dimension tables to parquet
immigrant_df.write.mode("overwrite").partitionBy("gender", "age").parquet("immigrants")
city_df.write.mode("overwrite").partitionBy("state_code").parquet("cities")
monthly_city_temp_df.write.mode("overwrite").parquet("monthly_city_temps")
time_df.write.mode("overwrite").parquet("time")
# Write fact table to parquet
# immigration_df.write.mode("overwrite").partitionBy("state_code", "city_code").parquet("immigration")

#### 4.2 Data Quality Checks
1. Check to see if tables exist.
2. Check to see if tables are empty.
 
##### Run Quality Checks:

In [42]:
def table_exists(df):
    if df is not None:
        return True
    else:
        return False
        
if table_exists(immigrant_df) & table_exists(city_df) & table_exists(monthly_city_temp_df) & table_exists(time_df) & table_exists(immigration_df):
    print("dimension tables and fact table exist")
else:
    print("data quality check failed: table missing")

dimension tables and fact table exist


In [43]:
def table_not_empty(df):
    return df.count() != 0 

if table_not_empty(immigrant_df) & table_not_empty(city_df) & table_not_empty(monthly_city_temp_df) & table_not_empty(time_df) & table_not_empty(immigration_df):
    print("dimension tables and fact table contain records")
else:
    print("data quality check failed: null records")

dimension tables and fact table contain records


#### 4.3 Data dictionary

Please refer to the [README.md](./README.md) for the data dictionary.

#### Step 5: Project Write Up

Please refer to the [README.md](./README.md) for the project write up.