### Data Exploration and Prepare Raw Data

In [1]:
import os
import datetime
import pandas as pd
import configparser
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, date_format
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType, FloatType

In [2]:
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()

##### I94 Immigration Data Exploration
1. Use Spark to explore the I94 Immigration Data located in: "data/sas_data/\*.parquet"  
    1.1 select features which are necessary for further analysis   
    1.2 drop duplicated records  
    1.3 convert type, name of the features   
    1.4 sort the records by feature 'arrival_date' descending  

In [3]:
df_immigration_raw = spark.read.parquet("data/sas_data/*.parquet")

In [4]:
df_immigration_raw.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [11]:
# select necessary columns
# admnum: admission number (one person has only one admission number)
# i94bir: age 
# gender : gender
# i94yr: arrival year
# i94mon: arrival month
# arrdate : arrival date
# depdate : departure date
# i94res: nationality
# i94port: destination usa city
# i94visa: the reason for immigration
# i94mode: traval tools 
selected_column = ["admnum", "i94bir", "gender",  "i94yr", "i94mon", "arrdate", "depdate", "i94res", "i94port", "i94visa", "i94mode"]
df_immigration = df_immigration_raw.select(selected_column)

In [12]:
# Funtion to convert SAS arrival date to datetime format 
get_date = udf(lambda x: (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(x)).isoformat() if x else None)

In [13]:
# convert some columns' name to make them more readable 
# convert numbers to integer 
# covert SAS date to datetime format 
df_immigration = df_immigration.selectExpr('cast(admnum as int) AS admission_number',
                          'cast(i94bir as int) AS age',
                          'gender',
                          'cast(i94yr as int) AS arrival_year',
                          'cast(i94mon as int) AS arrival_month',
                          'cast(arrdate as int) AS arrival_date', 
                          'cast(depdate as int) AS departure_date',
                          'cast(i94res as int) AS from_country_code', 
                          'i94port AS usa_port_code',
                          'cast(i94visa as int) AS visa_code',
                          'cast(i94mode as int) AS travel_way')

In [14]:
df_immigration = df_immigration.withColumn("arrival_date", get_date(df_immigration.arrival_date)).\
                                withColumn("departure_date", get_date(df_immigration.departure_date))

In [16]:
df_immigration = df_immigration.withColumn("arrival_day", date_format(df_immigration.arrival_date, "d"))
df_immigration = df_immigration.withColumn("arrival_day", df_immigration.arrival_day.cast("int"))

In [17]:
# order by arrival_date descending 
df_immigration = df_immigration.orderBy('arrival_date', ascending=False)
df_immigration.show(3)

+----------------+---+------+------------+-------------+------------+--------------+-----------------+-------------+---------+----------+-----------+
|admission_number|age|gender|arrival_year|arrival_month|arrival_date|departure_date|from_country_code|usa_port_code|visa_code|travel_way|arrival_day|
+----------------+---+------+------------+-------------+------------+--------------+-----------------+-------------+---------+----------+-----------+
|      2147483647| 42|  null|        2016|            4|  2016-04-30|    2016-05-01|              101|          WAS|        2|         1|         30|
|      2147483647| 25|     M|        2016|            4|  2016-04-30|    2016-05-14|              101|          NYC|        2|         1|         30|
|      2147483647| 54|  null|        2016|            4|  2016-04-30|    2016-05-20|              101|          TAM|        2|         1|         30|
+----------------+---+------+------------+-------------+------------+--------------+----------------

In [18]:
# check the maximal and minimal arrival date
# maybe it is a good feature to partition immigarion data 
print("max arrival date: {}".format(df_immigration.select("arrival_date").rdd.max()[0]))
print("min arrival date: {}".format(df_immigration.select("arrival_date").rdd.min()[0]))

max arrival date: 2016-04-30
min arrival date: 2016-04-01


In [82]:
# check duplicates
df_immigration.count(), df_immigration.drop_duplicates().count()

(3096302, 2610963)

In [19]:
# assum that one person only has one admnum 
# some rows have the same admission number, assum that one person have multiple immigration-records 
df_immigration.select("admission_number").drop_duplicates().count()

80404

In [20]:
# drop duplicate rows 
df_immigration = df_immigration.drop_duplicates()

In [21]:
df_immigration.printSchema()

root
 |-- admission_number: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- departure_date: string (nullable = true)
 |-- from_country_code: integer (nullable = true)
 |-- usa_port_code: string (nullable = true)
 |-- visa_code: integer (nullable = true)
 |-- travel_way: integer (nullable = true)
 |-- arrival_day: integer (nullable = true)



##### Encoding Problem

Some features in the I94 Immigration Data are encoded, therefore we must figure out the description of the code     
    * usa-port description: "data/sas_data_description/i94port.csv"  
    * travel-way description: "data/sas_data_description/i94mode.csv"  
    * traveler-nationality description: "data/sas_data_description/i94res.csv"  
    * traveler-purpose description: "data/sas_data_description/i94visa.csv" 
    * state-state-code: data/us-cities-demographics.csv

##### i94port - USA PORT Description

In [52]:
# read i94port.csv => USA port description
usa_port = pd.read_csv("data/sas_data_description/i94port.csv", sep = "=", header = None, 
                           names=["code", "port"])

# Remove whitespaces and single quotes
usa_port['code']=usa_port['code'].str.strip().str.replace("'",'')

# Create two columns from i94port string: port_city and port_addr
# also remove whitespaces and single quotes
usa_port['city'], usa_port['state_code'] = usa_port['port'].str.strip().str.replace("'",'').str.strip().str.split(',',1).str

# Remove more whitespace from port_state
usa_port['state_code']=usa_port['state_code'].str.strip()

# Drop port column and keep the two new columns: port_city and port_state
usa_port.drop(columns =['port'], inplace = True)

# convert city to lower case 
usa_port['city'] = usa_port['city'].str.lower()

In [53]:
usa_demographics = pd.read_csv("data/us-cities-demographics.csv", sep = ";")
# convert column name to lower and replace whitespace or "-" by "_"
usa_demographics.columns = map(str.lower, usa_demographics.columns)
usa_demographics.columns = usa_demographics.columns.str.replace(" ",'_')
usa_demographics.columns = usa_demographics.columns.str.replace("-",'_')

In [54]:
state = usa_demographics[[ "state", "state_code"]].drop_duplicates()

In [55]:
usa_port = pd.merge(usa_port, state, how = "left", on = ["state_code"])

In [57]:
usa_port.dtypes

code          object
city          object
state_code    object
state         object
dtype: object

##### i94mode - Travel Way Description

In [58]:
# read i94mode.csv => Travel Way Description
travel_way = pd.read_csv("data/sas_data_description/i94mode.csv", sep = "=", header = None, 
                           names=["code", "description"])

# Remove whitespaces and single quotes
travel_way['description']=travel_way['description'].str.strip().str.replace("'",'')

In [59]:
travel_way.dtypes

code            int64
description    object
dtype: object

##### i94res - Traveler Nationality

In [60]:
# read i94res.csv => Traveler Nationality
country_code = pd.read_csv("data/sas_data_description/i94res.csv", sep = "=", header = None,
                          names = ["code", "country"])

# Remove whitespaces and single quotes
country_code['country']=country_code['country'].str.strip().str.replace("'",'')

In [61]:
country_code.dtypes

code        int64
country    object
dtype: object

###### i94visa - Purpose of Travel

In [63]:
# read i94visa.csv => Purpose of TravelÂ¶
visa_code = pd.read_csv("data/sas_data_description/i94visa.csv", sep = "=", header = None,
                          names = ["code", "visa"])

# Remove whitespaces and single quotes
visa_code['visa']=visa_code['visa'].str.strip().str.replace("'",'')

In [64]:
visa_code.dtypes

code     int64
visa    object
dtype: object

##### Save

In [22]:
immigration_dir = os.path.join("data", "raw", "i94_immigration_data")

In [23]:
df_immigration.write.partitionBy("arrival_year", "arrival_month", "arrival_day").\
                parquet(immigration_dir, "overwrite")

In [67]:
immigration_describe_dir = os.path.join("data", "raw", "i94_immigration_labels_description")

In [69]:
usa_port.to_csv(os.path.join(immigration_describe_dir, "usa_port.csv"), index = False)

In [71]:
if not os.path.exists(immigration_describe_dir):
    os.makedirs(immigration_des)

In [74]:
usa_port.to_csv(os.path.join(immigration_describe_dir, "usa_port.csv"), index = False)
travel_way.to_csv(os.path.join(immigration_describe_dir, "travel_way.csv"), index = False)
visa_code.to_csv(os.path.join(immigration_describe_dir, "visa_code.csv"), index = False)
country_code.to_csv(os.path.join(immigration_describe_dir, "country_code.csv"), index = False)