In [20]:
import configparser
import os
import datetime as dt
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.functions import udf, col, to_timestamp, from_unixtime,monotonically_increasing_id, desc, when

In [3]:
def create_or_get_spark_session():
    spark = SparkSession \
    .builder \
    .config("spark.jars.repositories", "https://repos.spark-packages.org/") \
    .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .enableHiveSupport() \
    .getOrCreate()

    return spark

spark = create_or_get_spark_session()

#Print SparkSession
spark

In [57]:
fname_state_data = './state_descriptions.csv'
us_states_ds = spark.read.csv(fname_state_data, header=True)
us_states_ds.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- state_description: string (nullable = true)
 |-- country_code: string (nullable = true)



In [58]:
us_states_ds.limit(5).toPandas()

Unnamed: 0,state_code,state_description,country_code
0,AL,ALABAMA,US
1,AK,ALASKA,US
2,AZ,ARIZONA,US
3,AR,ARKANSAS,US
4,CA,CALIFORNIA,US


In [28]:
fname_visa = './visa_type_data.xlsx'
visa_type_pd = pd.read_excel(fname_visa, sheet_name='visa_type_sheet', inferSchema=True)

In [29]:
visa_type_ds = spark.createDataFrame(visa_type_pd)
visa_type_ds.printSchema()

root
 |-- Visa Category: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Initial Duration of Staya: string (nullable = true)
 |-- Annual Numeric Limit: string (nullable = true)



In [55]:
visa_type_ds.limit(10).toPandas()

Unnamed: 0,visa_type,description,initial_duration_of_staya,annual_numeric_limit
0,A1,"Ambassador, public minister, career diplomat, ...",Duration of assignment,
1,A2,"Other foreign government official or employee,...",Duration of assignment,
2,A3,"Attendant or personal employee of A-1/A-2, and...",Up to three years,
3,B1,Visitor for business,Six months to one year,
4,B2,Visitor for pleasure,Six months to one year,
5,B1/B2,Visitor for business and pleasure,Six months to one year,
6,B1/B2/\nBCC,Border crossing cards for Mexicans,Up to 30 days (or longer if coupled with B-1 o...,
7,B1/B2/\nBCV,Mexican Lincoln Border Crossing Visa,Up to 30 days (or longer if coupled with B-1 o...,
8,C1,Alien in transit,Up to 29 days,
9,C1/D,Alien in transit/crew member,Up to 29 days,


In [None]:
visa_type_ds = visa_type_ds.select(
    col('Visa Category').alias('visa_type'),
    col('Description').alias('description'),
    col('Initial Duration of Staya').alias('initial_duration_of_staya'),
    col('Annual Numeric Limit').alias('annual_numeric_limit')
)

In [33]:
visa_type_ds.printSchema()

root
 |-- visa_type: string (nullable = true)
 |-- description: string (nullable = true)
 |-- initial_duration_of_staya: string (nullable = true)
 |-- annual_numeric_limit: string (nullable = true)



In [36]:
fname = './airport-codes_csv.csv'
airport_ds = spark.read.csv(fname,header=True)
airport_ds.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [37]:
airport_ds.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [38]:
f_i94_name = 'immigration_data_sample.csv'
imm_ds = spark.read.csv(f_i94_name, header=True)
imm_ds.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = tru

In [39]:
imm_ds.select(
    col('cicid').alias('imm_id'),
    col('i94yr').alias('year'),
    col('i94mon').alias('month'),
    col('i94port').alias('port'),
    col('i94res').alias('country_origin'),
    col('arrdate').alias('arrival_date'),
    col('depdate').alias('departure_date'),
    col('i94mode').alias('imm_type'),
    col('i94addr').alias('temp_state_residence'),
    col('gender').alias('gender'),
    col('i94visa').alias('visa_code'),
    col('visatype').alias('visa_type')
).limit(5).toPandas()

Unnamed: 0,imm_id,year,month,port,country_origin,arrival_date,departure_date,imm_type,temp_state_residence,gender,visa_code,visa_type
0,4084316.0,2016.0,4.0,HHW,209.0,20566.0,20573.0,1.0,HI,F,2.0,WT
1,4422636.0,2016.0,4.0,MCA,582.0,20567.0,20568.0,1.0,TX,M,2.0,B2
2,1195600.0,2016.0,4.0,OGG,112.0,20551.0,20571.0,1.0,FL,M,2.0,WT
3,5291768.0,2016.0,4.0,LOS,297.0,20572.0,20581.0,1.0,CA,M,2.0,B2
4,985523.0,2016.0,4.0,CHM,111.0,20550.0,20553.0,3.0,NY,F,2.0,WT


In [40]:
imm_cleaned = imm_ds.select(
    col('cicid').alias('imm_id'),
    col('i94yr').alias('year'),
    col('i94mon').alias('month'),
    col('i94port').alias('port'),
    col('i94res').alias('country_origin'),
    col('arrdate').alias('arrival_date'),
    col('depdate').alias('departure_date'),
    col('i94mode').alias('imm_type'),
    col('i94addr').alias('temp_state_residence'),
    col('gender').alias('gender'),
    col('i94visa').alias('visa_code'),
    col('visatype').alias('visa_type')) \
.where(imm_ds['i94mode'] == 1)

In [41]:
imm_cleaned.select(col('visa_type')).distinct().toPandas()

Unnamed: 0,visa_type
0,F2
1,B2
2,F1
3,WB
4,M1
5,B1
6,WT
7,CP
8,GMT
9,E2


In [42]:
airport_ds_us = airport_ds.where( (airport_ds['type'].like('%airport%')) & (airport_ds['iso_country'] == 'US') )

In [43]:
imm_ds.select('cicid', 'i94yr', 'i94mon', 'i94port', 'gender', 'biryear') \
        .where(imm_ds['i94mode'] == 1) \
        .groupby(imm_ds['i94port']) \
        .count() \
        .orderBy(desc('count')) \
        .limit(10).toPandas()

Unnamed: 0,i94port,count
0,NYC,155
1,MIA,110
2,LOS,104
3,SFR,54
4,CHI,45
5,NEW,45
6,HHW,39
7,ORL,39
8,ATL,37
9,HOU,30


In [60]:
imm_cleaned.join(airport_ds_us, imm_cleaned.port == airport_ds_us.local_code,'left') \
            .join(us_states_ds, imm_cleaned.temp_state_residence == us_states_ds.state_code, 'inner').limit(5).toPandas()

Unnamed: 0,imm_id,year,month,port,country_origin,arrival_date,departure_date,imm_type,temp_state_residence,gender,...,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,state_code,state_description,country_code
0,4084316.0,2016.0,4.0,HHW,209.0,20566,20573,1.0,HI,F,...,US,US-OK,Hugo,KHHW,HUJ,HHW,"-95.54190063, 34.03480148",HI,HAWAII,US
1,4422636.0,2016.0,4.0,MCA,582.0,20567,20568,1.0,TX,M,...,,,,,,,,TX,TEXAS,US
2,1195600.0,2016.0,4.0,OGG,112.0,20551,20571,1.0,FL,M,...,US,US-HI,Kahului,PHOG,OGG,OGG,"-156.429993, 20.8986",FL,FLORIDA,US
3,5291768.0,2016.0,4.0,LOS,297.0,20572,20581,1.0,CA,M,...,,,,,,,,CA,CALIFORNIA,US
4,1481650.0,2016.0,4.0,ATL,577.0,20552,20606,1.0,GA,M,...,US,US-GA,Atlanta,KATL,ATL,ATL,"-84.428101, 33.6367",GA,GEORGIA,US


In [46]:
imm_cleaned = imm_cleaned.withColumn('arrival_date', imm_cleaned['arrival_date'].cast(IntegerType())) \
                        .withColumn('departure_date',imm_cleaned['departure_date'].cast(IntegerType()))
imm_cleaned.limit(5).toPandas()
imm_cleaned.printSchema()

root
 |-- imm_id: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- port: string (nullable = true)
 |-- country_origin: string (nullable = true)
 |-- arrival_date: integer (nullable = true)
 |-- departure_date: integer (nullable = true)
 |-- imm_type: string (nullable = true)
 |-- temp_state_residence: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_code: string (nullable = true)
 |-- visa_type: string (nullable = true)



In [47]:
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(float(x))).isoformat() if x else None)

In [53]:
ds_joined = imm_cleaned.join(airport_ds_us, imm_cleaned.port == airport_ds_us.local_code,'left') \
            .withColumn('arrival_date_sas', imm_cleaned['arrival_date']) \
            .withColumn('departure_date_sas', imm_cleaned['departure_date']) \
            .withColumn('arrival_date_f', get_date(imm_cleaned['arrival_date'])) \
            .withColumn('departure_date_f', get_date(imm_cleaned['departure_date'])) \
            .drop(col('arrival_date')) \
            .drop(col('departure_date'))

In [54]:
ds_joined.join(visa_type_ds, (ds_joined['visa_type'] == visa_type_ds['visa_type']), 'inner').limit(5).toPandas()

Unnamed: 0,imm_id,year,month,port,country_origin,imm_type,temp_state_residence,gender,visa_code,visa_type,...,local_code,coordinates,arrival_date_sas,departure_date_sas,arrival_date_f,departure_date_f,visa_type.1,description,initial_duration_of_staya,annual_numeric_limit
0,5514177.0,2016.0,4.0,HOU,343.0,1.0,TX,M,3.0,F2,...,HOU,"-95.27890015, 29.64539909",20573,,2016-04-29,,F2,Spouse or child of F-1,Duration of study,
1,1400880.0,2016.0,4.0,NEW,213.0,1.0,NJ,M,3.0,F2,...,NEW,"-90.028297424316, 30.042400360107",20552,,2016-04-08,,F2,Spouse or child of F-1,Duration of study,
2,4106787.0,2016.0,4.0,WAS,258.0,1.0,MA,F,3.0,F2,...,,,20566,,2016-04-22,,F2,Spouse or child of F-1,Duration of study,
3,4422636.0,2016.0,4.0,MCA,582.0,1.0,TX,M,2.0,B2,...,,,20567,20568.0,2016-04-23,2016-04-24,B2,Visitor for pleasure,Six months to one year,
4,5291768.0,2016.0,4.0,LOS,297.0,1.0,CA,M,2.0,B2,...,,,20572,20581.0,2016-04-28,2016-05-07,B2,Visitor for pleasure,Six months to one year,
