# Importing Modules 

In [1]:
import pyspark
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import col , udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import trim
from pyspark.sql.functions import initcap
from pyspark.sql.functions import split


import pandas as pd 
import numpy as np 
import re

findspark.init('D:\\Spark\\')

# Creating SparkSession

In [2]:
spark=SparkSession.builder\
    .master('local') \
    .appName('Udacity') \
    .getOrCreate()

In [3]:
spark

# Loading and Inspsecting Dataframe

In [4]:
df = spark.read.parquet('data/*.parquet')

In [5]:
df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [6]:
df.show()

+--------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|   cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+--------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|459651.0|2016.0|   4.0| 135.0| 135.0|    ATL|20547.0|    1.0|     FL|20559.0|  54.0|    2.0|  1.0|20160403|    null| null|      O|      R|   null|      M| 1962.0|07012016|  null|  null|     VS|5.5556253633E10|00115|      WT|
|459652.0|2016.0|   4.0| 135.0| 135.0|    ATL|20547.0|    1.0|     FL|20555.0|  74.0|    2.0|  1

## Cleaning I94 Dataset 

+ ~~Dropping Columns~~ 
+ ~~Invalid States~~ 
+ ~~Invalid Ports~~ 
+ ~~Dropping Duplicates~~

In [7]:
# Read Port Data 
df_port=spark.createDataFrame(pd.read_json('data\i94prtl.json', orient='index').reset_index().rename(columns={'index':'i94port',0:'Port'}))

In [8]:
# Read Origin Data 
df_country=spark.createDataFrame(pd.read_json('data\i94cntyl.json', orient='index').reset_index().rename(columns={'index':'i94cit',0:'Country of Origin'}))

In [9]:
# Read TravleMode Data
df_mode=spark.createDataFrame(pd.read_json('data\i94model.json', orient='index').reset_index().rename(columns={'index':'i94mode',0:'Travelmode'}))

In [10]:
# Read Destination data 
df_dest=spark.createDataFrame(pd.read_json('data\i94addrl.json', orient='index').reset_index().rename(columns={'index':'i94addr',0:'Destination'}))

In [11]:
# Read Visa data 
df_visa=spark.createDataFrame(pd.read_json('data\i94visa.json', orient='index').reset_index().rename(columns={'index':'i94visa',0:'Visa'}))

## Join Tables on DF 

In [12]:
# Join Port 
df=df.join(df_port, df.i94port==df_port.i94port, how='left').drop(df_port.i94port)
# Join Country 
df=df.join(df_country, df.i94cit==df_country.i94cit, how='left').drop(df_country.i94cit)
# Join Travelmode
df=df.join(df_mode,df.i94mode==df_mode.i94mode, how='left').drop(df_mode.i94mode)
# Join Destination 
df=df.join(df_dest,df.i94addr==df_dest.i94addr, how='left').drop(df_dest.i94addr)
# Join Visa Data 
df=df.join(df_visa,df.i94visa==df_visa.i94visa, how='left').drop(df_visa.i94visa)

In [13]:
df.show(20)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------------+--------------------+----------+-----------+---------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|                Port|   Country of Origin|Travelmode|Destination|     Visa|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------------+--------------------+----------+-----------+---------+
|2093085.0|2016.0|   4.0| 582.0| 582.0|    HOU|20555.0|    1.0|     .N|20559.0|  46.0|    1.0|

In [14]:
df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [15]:
col_to_drop=['visapost','occup','entdepu','count','admnum','matflag','dtaddto','insnum','fltno','i94mode','i94addr','i94visa','i94cit']
df=df.drop(*col_to_drop)

In [16]:
# Strip Whitespaces
df_clean=df.withColumn('Port',trim(col('Port')))

## Cleaning Table 

In [17]:
# Port
df_clean.show()

+---------+------+------+------+-------+-------+-------+------+--------+-------+-------+-------+------+-------+--------+-----------------+--------------------+----------+-----------+---------+
|    cicid| i94yr|i94mon|i94res|i94port|arrdate|depdate|i94bir|dtadfile|entdepa|entdepd|biryear|gender|airline|visatype|             Port|   Country of Origin|Travelmode|Destination|     Visa|
+---------+------+------+------+-------+-------+-------+------+--------+-------+-------+-------+------+-------+--------+-----------------+--------------------+----------+-----------+---------+
|2093085.0|2016.0|   4.0| 582.0|    HOU|20555.0|20559.0|  46.0|20160411|      K|      O| 1970.0|     M|     UA|      B1|      HOUSTON, TX|  MEXICO Air Sea,...|       Air|       null| Business|
|1373083.0|2016.0|   4.0| 135.0|    ATL|20552.0|20553.0|  22.0|20160408|      O|      R| 1994.0|  null|     VS|      B1|      ATLANTA, GA|      UNITED KINGDOM|       Air|       null| Business|
|2696520.0|2016.0|   4.0| 135.0|   

In [18]:
# Clean Invalid Ports 
df_port_clean=df_clean.filter(~col('Port').contains('No PORT Code'))\
    .filter(~col('Port').contains('Collapsed'))\
        .filter(col('Port').isNotNull())

In [19]:
# Invalid Country Codes
df_country_clean=df_port_clean.filter(~col('Country of Origin').contains('INVALID'))\
    .filter(~col('Country of Origin').contains('Collapsed'))\
        .filter(col('Country of Origin').isNotNull())

In [20]:
# Remove Null-Values in Country Codes
df_travelmode_clean=df_country_clean.filter(col('Travelmode').isNotNull())

In [21]:
# Remove Null-Values in Travelmode
df_destination_clean=df_travelmode_clean.filter(col('Destination').isNotNull())

In [22]:
# Nothing to change in Travelmode
df_destination_clean.groupBy('Visa').count().show()

+---------+-------+
|     Visa|  count|
+---------+-------+
| Pleasure|1303035|
|  Student|  25492|
| Business| 293487|
+---------+-------+



In [23]:
# Renaming Colum
df_destination_clean=df_destination_clean.withColumnRenamed('Country of Origin','Origin_Country')

In [24]:
# Get City names 
df_city=df_destination_clean.withColumn("City", initcap(split(col("Port"),',')[0])).drop('Port')

In [25]:
# City as KeyField for Demographics and Weather 
df_city.show()

+---------+------+------+------+-------+-------+-------+------+--------+-------+-------+-------+------+-------+--------+----------------+----------+-----------+---------+--------------+
|    cicid| i94yr|i94mon|i94res|i94port|arrdate|depdate|i94bir|dtadfile|entdepa|entdepd|biryear|gender|airline|visatype|  Origin_Country|Travelmode|Destination|     Visa|          City|
+---------+------+------+------+-------+-------+-------+------+--------+-------+-------+-------+------+-------+--------+----------------+----------+-----------+---------+--------------+
| 881270.0|2016.0|   4.0| 299.0|    SFR|20549.0|20553.0|  34.0|20160405|      G|      O| 1982.0|     M|     KE|      B1|        MONGOLIA|       Air|    ARIZONA| Business| San Francisco|
|1048471.0|2016.0|   4.0| 299.0|    SFR|20550.0|20557.0|  51.0|20160406|      G|      O| 1965.0|     F|     UA|      B1|        MONGOLIA|       Air|    ARIZONA| Business| San Francisco|
|1048473.0|2016.0|   4.0| 299.0|    SFR|20550.0|20557.0|  45.0|2016040

In [70]:
# Write to csv for local inspection
df_city.repartition(1).write.csv('processed/immigration_clean',mode='append',header=True)