# Data Gathering

    1. Immigration data has been put into one S3 folder named - s3://capstone-project-immigration-data/source-data/
    2. Data has been taken from the Project resources only, total count is ~ 3mn rows

In [1]:
df = spark.read.parquet('s3://capstone-project-immigration-data/source-data/')

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1609493017809_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3096313

# Data Assessment

In [4]:
df.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

# Data Quality Issues identified:

    1. column names i94yr, i94mon are not very sound, they need to be changed to more meaningful names
    2. missing values in i94addr column - code '99' can be filled for missing values
    3. format for arrdate and depdate columns are of SAS format which is unreadable
    4. i94yr and biryear columns should be of proper int type
    5. missing values in gender and airline columns
    6. number values in mode is not very informative - based on the code, an extra column with name of mode of arrival should          be added

Define

# Data Cleaning - correcting SAS data format to normal data format

Code

In [27]:
from datetime import datetime, timedelta
from pyspark.sql import types as T
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
    
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())

df_new = df.withColumn("arrival_date", udf_datetime_from_sas("arrdate")) \
    .withColumn("departure_date", udf_datetime_from_sas("depdate"))

# Test
df_new.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+--------------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|arrival_date|departure_date|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+------------+--------------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|  2016-04-30|

Define

# Correcting the data type by casting the columns to appropriate types

Code

In [32]:
df_new2 = df_new.selectExpr("cast(cicid as int) cicid","cast(i94yr as int) year","cast(i94mon as int) month_num","cast(i94cit as int) city_code","cast(i94res as int) res_code","cast(i94port as int) port_id","arrdate","cast(i94mode as int) mode_of_arrival_code","i94addr","depdate","i94bir","i94visa","count","dtadfile","visapost","occup","entdepa","entdepd","entdepu","matflag","cast(biryear as int) birth_year","dtaddto","gender","insnum","airline","admnum","fltno","visatype","arrival_date","departure_date")

#Test
df_new2.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----+---------+---------+--------+-------+-------+--------------------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+----------+--------+------+------+-------+--------------+-----+--------+------------+--------------+
|  cicid|year|month_num|city_code|res_code|port_id|arrdate|mode_of_arrival_code|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|birth_year| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|arrival_date|departure_date|
+-------+----+---------+---------+--------+-------+-------+--------------------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+----------+--------+------+------+-------+--------------+-----+--------+------------+--------------+
|5748517|2016|        4|      245|     438|   null|20574.0|                   1|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M|     

Define

# Generating extra column with month name on the basis of month number

Code

In [39]:
import datetime

def month_num_to_name(x):
    try:
        x = str(x)
        datetime_object = datetime.datetime.strptime(x, "%m")
        month_name = datetime_object.strftime("%B")
        return month_name
    except:
        return None
    
udf_month_name = udf(lambda x: month_num_to_name(x), T.StringType())

df_new3 = df_new2.withColumn('month_name', udf_month_name("month_num"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Test

In [41]:
df_new3.select('month_name').distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|month_name|
+----------+
|     April|
+----------+

In [42]:
df_new3.na.fill(value='99',subset=["i94addr"]).show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----+---------+---------+--------+-------+-------+--------------------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+----------+--------+------+------+-------+--------------+-----+--------+------------+--------------+----------+
|  cicid|year|month_num|city_code|res_code|port_id|arrdate|mode_of_arrival_code|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|birth_year| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|arrival_date|departure_date|month_name|
+-------+----+---------+---------+--------+-------+-------+--------------------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+----------+--------+------+------+-------+--------------+-----+--------+------------+--------------+----------+
|5748517|2016|        4|      245|     438|   null|20574.0|                   1|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|    

Define

# Fill null values in i94addr column with '99' as informed in doc

Code

In [44]:
df_na = df_new3.filter(df_new3.i94addr == '99')
df_na.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----+---------+---------+--------+-------+-------+--------------------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+----------+--------+------+------+-------+---------------+-----+--------+------------+--------------+----------+
|  cicid|year|month_num|city_code|res_code|port_id|arrdate|mode_of_arrival_code|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|birth_year| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|arrival_date|departure_date|month_name|
+-------+----+---------+---------+--------+-------+-------+--------------------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+----------+--------+------+------+-------+---------------+-----+--------+------------+--------------+----------+
|5925820|2016|        4|      129|     129|   null|20569.0|                   9|     99|   null|  70.0|    2.0|  1.0|20160520|    null| null| 

In [45]:
df_na.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

52

In [48]:
df_new3.na.fill(value='Unclear',subset=["gender"]).show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----+---------+---------+--------+-------+-------+--------------------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+----------+--------+------+------+-------+--------------+-----+--------+------------+--------------+----------+
|  cicid|year|month_num|city_code|res_code|port_id|arrdate|mode_of_arrival_code|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|birth_year| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|arrival_date|departure_date|month_name|
+-------+----+---------+---------+--------+-------+-------+--------------------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+----------+--------+------+------+-------+--------------+-----+--------+------------+--------------+----------+
|5748517|2016|        4|      245|     438|   null|20574.0|                   1|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|    

Define

# Filling null values in gender column with 'Unclear'

Code

In [49]:
df_na = df_new3.filter(df_new3.gender == 'Unclear')
df_na.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----+---------+---------+--------+-------+-------+--------------------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+----------+-------+------+------+-------+------+-----+--------+------------+--------------+----------+
|cicid|year|month_num|city_code|res_code|port_id|arrdate|mode_of_arrival_code|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|birth_year|dtaddto|gender|insnum|airline|admnum|fltno|visatype|arrival_date|departure_date|month_name|
+-----+----+---------+---------+--------+-------+-------+--------------------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+----------+-------+------+------+-------+------+-----+--------+------------+--------------+----------+
+-----+----+---------+---------+--------+-------+-------+--------------------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------

In [50]:
df_new3.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3096313

In [51]:
df_final = df_new3.drop_duplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
df_final.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3096313

# Storing clean data

Code

In [53]:
df_final.coalesce(10).write.parquet('s3://capstone-project-data-files/immigration-data/cleaned/')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…