In [1]:
import findspark
findspark.init('/home/jun3/Downloads/spark-2.4.3-bin-hadoop2.7/')
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext

In [2]:
import configparser
from datetime import datetime, timedelta
import os
import re
from pyspark.sql.types import BooleanType
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import monotonically_increasing_id

In [3]:
from pyspark.sql.functions import sum as Fsum
from helper import counting_null_number_inColumns, change_data_type, remove_last2_from_colname, change_allowed_stay_date
#import datetime as dt

In [4]:
import pandas as pd
import numpy as np

In [5]:
conf = SparkConf().set('spark.driver.memory', '6G')\
                .set('spark.executor.memory', "4G")
spark = SparkSession.builder.appName("capstone_proj")\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0", conf=conf).getOrCreate()

In [6]:
path = os.path.abspath("data/i94_apr16_sub.parquet/")

In [7]:
df_imm=spark.read.parquet(path)  ### Apr16 data

In [8]:
df_imm= df_imm.drop('validres',
 'delete_days',
 'delete_mexl',
 'delete_dup',
 'delete_visa',
 'delete_recdup')

In [9]:
df_imm.count()

3096313

In [10]:
df_imm.take(1)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')]

In [11]:
# convert SAS date to datetime date
convert_to_dt_date = udf(lambda x: (datetime(1960, 1, 1).date() + timedelta(x)).isoformat() if x else None)

In [12]:
df_imm = df_imm.select("*").withColumn("arrival_date", convert_to_dt_date(df_imm.arrdate))

In [13]:
df_imm = df_imm.select("*").withColumn("departure_date", convert_to_dt_date(df_imm.depdate))

In [14]:
df_imm = df_imm.drop("arrdate", "depdate")

In [15]:
df_ds=df_imm.select("visatype", "dtaddto").where(df_imm.dtaddto=="D/S")

In [16]:
df_ds_gb = df_ds.groupBy("visatype").count()

In [17]:
df_ds_gb.show() 

+--------+-----+
|visatype|count|
+--------+-----+
|      F2| 2982|
|      B2|   17|
|      F1|38944|
|     CPL|    1|
|      I1|  189|
|      M1|    3|
|      B1|    3|
|      E1|   43|
|       I| 3158|
|      E2|    3|
+--------+-----+



In [18]:
df_f1=df_imm.select("visatype", "dtaddto").where(df_imm.visatype=="F1")  # "dtaddto" column contains allowed to stay till that dates

In [19]:
df_f1_gb = df_f1.select("*").groupBy("dtaddto").count()

In [20]:
df_f1_gb.sort("count", ascending = False).show(5)

+--------+-----+
| dtaddto|count|
+--------+-----+
|     D/S|38944|
|    null|   16|
|05012016|   10|
|05022016|    6|
|04302016|    4|
+--------+-----+
only showing top 5 rows



In [21]:
38944/df_f1.count() # 99% F1 students are D/S (D/S: during status--my guess) for allowed to stay date

0.9981546032396965

In [22]:
df_imm = df_imm.where("dtaddto is not null") # select data with not null value.

In [23]:
df_imm.count()

3095836

In [24]:
df_imm = df_imm.where(col("dtaddto")!="")

In [25]:
df_imm.count()

3095836

In [26]:
# regular expression function to filter out non-datetime looking data, not perfect function.  But sufficient to get rid data that caused error problem 
def regex_filter(x):
    regexs = ['[0-1][\d]+[0-4]+[\d]+[\d]+[\d]+[\d]+', "D/S"]
    
    if x:
        for r in regexs:
            if re.match(r, x, re.IGNORECASE):
                return True    
    return False 

In [27]:
# convert regex_filter function to udf function
regex_filter_udf = udf(regex_filter, BooleanType())

In [28]:
regex_filter("11122016") # testing

True

In [29]:
df_imm = df_imm.withColumn("regEX_dtaddto", regex_filter_udf("dtaddto"))  

In [30]:
df_imm.take(1)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', i94mode=1.0, i94addr='CA', i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1', arrival_date='2016-04-30', departure_date='2016-05-08', regEX_dtaddto=True)]

In [31]:
df_imm2 = df_imm.where(col("regEX_dtaddto")==True)

In [32]:
# get rid of 4 data points
df_imm2.count()

3095832

In [33]:
df_imm_f = df_imm.where(col("regEX_dtaddto")==False) # check False data

In [34]:
df_imm_f.select("dtaddto").take(4) # These data are removed in df_imm2 3 steps ago

[Row(dtaddto='06 02002'),
 Row(dtaddto='10 02003'),
 Row(dtaddto='183'),
 Row(dtaddto='/   183D')]

In [35]:
# check number of null value in each column using counting_null_number_inColumns from helper.py 
counting_null_number_inColumns(df_imm2)

cicid :  0
i94yr :  0
i94mon :  0
i94cit :  0
i94res :  0
i94port :  0
i94mode :  1
i94addr :  152298
i94bir :  801
i94visa :  0
count :  0
dtadfile :  1
visapost :  1880804
occup :  3087706
entdepa :  0
entdepd :  138384
entdepu :  3095440
matflag :  138384
biryear :  801
dtaddto :  0
gender :  414031
insnum :  2982274
airline :  83375
admnum :  0
fltno :  19299
visatype :  0
arrival_date :  0
departure_date :  142408
regEX_dtaddto :  0


In [36]:
# Drop columns containing a lot of null values and regEX_dtaddto.  
df_imm2 = df_imm2.drop("visapost", "occup", "entdepd", "entdepu", "insnum", "matflag", "regEX_dtaddto", "gender")

In [37]:
df_imm2 = df_imm2.select("*").where(("dtadfile is not null"))  # "dtadfile" only has 1 null value, remove it

In [38]:
df_imm2.take(2)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', i94mode=1.0, i94addr='CA', i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', entdepa='G', biryear=1976.0, dtaddto='10292016', airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1', arrival_date='2016-04-30', departure_date='2016-05-08'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', i94mode=1.0, i94addr='NV', i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', entdepa='G', biryear=1984.0, dtaddto='10292016', airline='VA', admnum=94955622830.0, fltno='00007', visatype='B1', arrival_date='2016-04-30', departure_date='2016-05-17')]

In [39]:
# count null value again
counting_null_number_inColumns(df_imm2)

cicid :  0
i94yr :  0
i94mon :  0
i94cit :  0
i94res :  0
i94port :  0
i94mode :  0
i94addr :  152297
i94bir :  801
i94visa :  0
count :  0
dtadfile :  0
entdepa :  0
biryear :  801
dtaddto :  0
airline :  83374
admnum :  0
fltno :  19298
visatype :  0
arrival_date :  0
departure_date :  142407


In [40]:
add_to_file_date_udf = udf(lambda x: datetime.strptime(x, '%Y%m%d').strftime('%Y-%m-%d'))

In [41]:
df_imm2 = df_imm2.select("*").withColumn("add_to_file_date", add_to_file_date_udf(df_imm.dtadfile))

In [42]:
df_imm2 = df_imm2.drop("dtadfile")

In [43]:
df_imm2 = df_imm2.withColumnRenamed("dtaddto", 'allow_to_stay_date')

In [44]:
df_imm2.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- allow_to_stay_date: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- departure_date: string (nullable = true)
 |-- add_to_file_date: string (nullable = true)



In [45]:
df_imm2.columns

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'i94mode',
 'i94addr',
 'i94bir',
 'i94visa',
 'count',
 'entdepa',
 'biryear',
 'allow_to_stay_date',
 'airline',
 'admnum',
 'fltno',
 'visatype',
 'arrival_date',
 'departure_date',
 'add_to_file_date']

In [46]:
len(df_imm2.columns)

21

In [47]:
# change column names
names =['cicid',
 'entry_year',
 'entry_month',
 'citizenship_of_country',
 'resident_of_country',
 'i94port_city',
  'entry_mode_id',
 'destination_state',
  "age",
 'visa_category',
 'count',
 'departure_flag',
 'birth_year',
 'allow_to_stay_date',
 'airline_code',
 'admission_number',
 'flight_number',
 'visa_type',
 'arrival_date',
 'departure_date',
 'add_to_file_date']

In [48]:
len(names)

21

In [49]:
df_imm2 = df_imm2.toDF(*names)

In [50]:
df_imm2.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- entry_year: double (nullable = true)
 |-- entry_month: double (nullable = true)
 |-- citizenship_of_country: double (nullable = true)
 |-- resident_of_country: double (nullable = true)
 |-- i94port_city: string (nullable = true)
 |-- entry_mode_id: double (nullable = true)
 |-- destination_state: string (nullable = true)
 |-- age: double (nullable = true)
 |-- visa_category: double (nullable = true)
 |-- count: double (nullable = true)
 |-- departure_flag: string (nullable = true)
 |-- birth_year: double (nullable = true)
 |-- allow_to_stay_date: string (nullable = true)
 |-- airline_code: string (nullable = true)
 |-- admission_number: double (nullable = true)
 |-- flight_number: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- departure_date: string (nullable = true)
 |-- add_to_file_date: string (nullable = true)



In [51]:
int_lst = ["admission_number", "cicid", "entry_year", "entry_month", "citizenship_of_country", 
           "resident_of_country", "visa_category", "entry_mode_id", "age", "birth_year", "count"]

df_imm2 = change_data_type(int_lst, df_imm2, "int") 

In [52]:
# remove last letter 2 from column name
df_imm2 = remove_last2_from_colname(df_imm2)

In [53]:
df_imm2.take(1)

[Row(i94port_city='LOS', destination_state='CA', departure_flag='G', allow_to_stay_date='10292016', airline_code='QF', flight_number='00011', visa_type='B1', arrival_date='2016-04-30', departure_date='2016-05-08', add_to_file_date='2016-04-30', admission_number=2147483647, cicid=5748517, entry_year=2016, entry_month=4, citizenship_of_country=245, resident_of_country=438, visa_category=1, entry_mode_id=1, age=40, birth_year=1976, count=1)]

In [54]:
df_ent = spark.read.csv("./data/i94entry_mode.txt", sep="=")

In [55]:
df_ent.take(4)

[Row(_c0='1 ', _c1=" 'Air'"),
 Row(_c0='2 ', _c1=" 'Sea'"),
 Row(_c0='3 ', _c1=" 'Land'"),
 Row(_c0='9 ', _c1=" 'Not reported'")]

In [56]:
get_ent_id_udf = udf(lambda x: x[0])

In [57]:
df_ent = df_ent.withColumn("entry_mode_id", get_ent_id_udf("_c0")).drop("_c0")

In [58]:
get_entry_mode_udf = udf(lambda x: x[2:-1])

In [59]:
df_ent = df_ent.withColumn("entry_mode", get_entry_mode_udf("_c1")).drop("_c1")

In [60]:
df_ent = change_data_type(["entry_mode_id"], df_ent, "int")

In [61]:
df_ent = remove_last2_from_colname(df_ent)

In [62]:
df_ent.show()

+------------+-------------+
|  entry_mode|entry_mode_id|
+------------+-------------+
|         Air|            1|
|         Sea|            2|
|        Land|            3|
|Not reported|            9|
+------------+-------------+



In [63]:
df_ent.printSchema()

root
 |-- entry_mode: string (nullable = true)
 |-- entry_mode_id: integer (nullable = true)



In [64]:
df_imm2.take(1)

[Row(i94port_city='LOS', destination_state='CA', departure_flag='G', allow_to_stay_date='10292016', airline_code='QF', flight_number='00011', visa_type='B1', arrival_date='2016-04-30', departure_date='2016-05-08', add_to_file_date='2016-04-30', admission_number=2147483647, cicid=5748517, entry_year=2016, entry_month=4, citizenship_of_country=245, resident_of_country=438, visa_category=1, entry_mode_id=1, age=40, birth_year=1976, count=1)]

In [65]:
df_imm3 = df_imm2.join(df_ent, on=['entry_mode_id'])

In [66]:
visa_cat = spark.read.csv("./data/i94visa_category.csv", header=True)

In [67]:
visa_cat = change_data_type(["VISA_category"], visa_cat, "int")

In [68]:
visa_cat = remove_last2_from_colname(visa_cat)

In [69]:
visa_cat.printSchema()

root
 |--  category_name: string (nullable = true)
 |-- VISA_category: integer (nullable = true)



In [70]:
visa_cat.show()

+--------------+-------------+
| category_name|VISA_category|
+--------------+-------------+
|      Business|            1|
|      Pleasure|            2|
|       Student|            3|
+--------------+-------------+



In [71]:
visa_cat = visa_cat.withColumnRenamed("VISA_category", "visa_category")

In [72]:
visa_cat.take(3)

[Row( category_name=' Business', visa_category=1),
 Row( category_name=' Pleasure', visa_category=2),
 Row( category_name=' Student', visa_category=3)]

In [73]:
remove_space_udf = udf(lambda x: x[1:])

In [74]:
visa_cat = visa_cat.withColumn(" category_name", remove_space_udf(" category_name"))

In [75]:
visa_cat = visa_cat.withColumnRenamed(" category_name", "category_name")

In [76]:
df_imm3.count()

3095831

In [77]:
visa_cat_gb = df_imm3.groupBy("visa_category").count()

In [78]:
visa_cat_gb.toPandas() # most traveling are visa_category 2 (for pleasure)

Unnamed: 0,visa_category,count
0,1,522057
1,3,43348
2,2,2530426


In [79]:
df_imm4 = df_imm3.dropna()

In [80]:
df_imm4.count()

2769642

In [81]:
2769642/3095831 # preserve 89% data

0.8946360444094009

In [82]:
counting_null_number_inColumns(df_imm4) # no null value in the data

entry_mode_id :  0
i94port_city :  0
destination_state :  0
departure_flag :  0
allow_to_stay_date :  0
airline_code :  0
flight_number :  0
visa_type :  0
arrival_date :  0
departure_date :  0
add_to_file_date :  0
admission_number :  0
cicid :  0
entry_year :  0
entry_month :  0
citizenship_of_country :  0
resident_of_country :  0
visa_category :  0
age :  0
birth_year :  0
count :  0
entry_mode :  0


In [83]:
df_imm4.columns

['entry_mode_id',
 'i94port_city',
 'destination_state',
 'departure_flag',
 'allow_to_stay_date',
 'airline_code',
 'flight_number',
 'visa_type',
 'arrival_date',
 'departure_date',
 'add_to_file_date',
 'admission_number',
 'cicid',
 'entry_year',
 'entry_month',
 'citizenship_of_country',
 'resident_of_country',
 'visa_category',
 'age',
 'birth_year',
 'count',
 'entry_mode']

In [85]:
# Pre-select column names 
imm_colnames = ['admission_number',
 'i94port_city',
 'destination_state',
 'airline_code',
 'visa_type',
 'visa_category',
 'arrival_date',
 'departure_date',
 'allow_to_stay_date',
 'entry_year',
 'entry_month',
 'citizenship_of_country',
 'resident_of_country',
 'entry_mode',
 'age']

In [86]:
# select columns
df_imm4 = df_imm4.select(imm_colnames)

In [87]:
df_imm4.take(2)

[Row(admission_number=2147483647, i94port_city='LOS', destination_state='CA', airline_code='QF', visa_type='B1', visa_category=1, arrival_date='2016-04-30', departure_date='2016-05-08', allow_to_stay_date='10292016', entry_year=2016, entry_month=4, citizenship_of_country=245, resident_of_country=438, entry_mode='Air', age=40),
 Row(admission_number=2147483647, i94port_city='LOS', destination_state='NV', airline_code='VA', visa_type='B1', visa_category=1, arrival_date='2016-04-30', departure_date='2016-05-17', allow_to_stay_date='10292016', entry_year=2016, entry_month=4, citizenship_of_country=245, resident_of_country=438, entry_mode='Air', age=32)]

In [88]:
# rename columns
imm_colnames = ['admission_number',
 'i94port_city',
 'destination_state_code',
 'airline_code',
 'visa_type',
 'visa_category',
 'arrival_date',
 'departure_date',
 'allow_to_stay_date',
 'entry_year',
 'entry_month',
 'citizenship_of_country',
 'resident_of_country',
 'entry_mode',
 'age']

In [89]:
df_imm4 = df_imm4.toDF(*imm_colnames)

In [90]:
df_imm4.take(2)

[Row(admission_number=2147483647, i94port_city='LOS', destination_state_code='CA', airline_code='QF', visa_type='B1', visa_category=1, arrival_date='2016-04-30', departure_date='2016-05-08', allow_to_stay_date='10292016', entry_year=2016, entry_month=4, citizenship_of_country=245, resident_of_country=438, entry_mode='Air', age=40),
 Row(admission_number=2147483647, i94port_city='LOS', destination_state_code='NV', airline_code='VA', visa_type='B1', visa_category=1, arrival_date='2016-04-30', departure_date='2016-05-17', allow_to_stay_date='10292016', entry_year=2016, entry_month=4, citizenship_of_country=245, resident_of_country=438, entry_mode='Air', age=32)]

In [91]:
counting_null_number_inColumns(df_imm4)

admission_number :  0
i94port_city :  0
destination_state_code :  0
airline_code :  0
visa_type :  0
visa_category :  0
arrival_date :  0
departure_date :  0
allow_to_stay_date :  0
entry_year :  0
entry_month :  0
citizenship_of_country :  0
resident_of_country :  0
entry_mode :  0
age :  0


In [92]:
df_imm4.count()

2769642

In [93]:
df_imm4.write.parquet("./out/immigration_table_2016_09.parquet", mode ="overwrite", partitionBy=['entry_month', "i94port_city", "destination_state_code"])

In [94]:
df_airport_i94port_join = spark.read.parquet("./out/airport_i94port_join_table_cleaned.parquet")

In [95]:
df_airport_i94port_join.toPandas().head()

Unnamed: 0,airport_id,airport_type,airport_name,state_code,elevation_ft,airport_longitude,airport_latitude,i94port_code
0,KBWI,large_airport,Baltimore/Washington International Thurgood Ma...,MD,146.0,-76.668297,39.1754,BAL
1,KBOS,large_airport,General Edward Lawrence Logan International Ai...,MA,20.0,-71.005203,42.3643,BOS
2,KMSY,large_airport,Louis Armstrong New Orleans International Airport,LA,4.0,-90.258003,29.993401,NOL
3,KJAN,large_airport,Jackson-Medgar Wiley Evers International Airport,MS,346.0,-90.075897,32.311199,JAN
4,KATL,large_airport,Hartsfield Jackson Atlanta International Airport,GA,1026.0,-84.428101,33.6367,ATL


In [96]:
df_airport_i94port_join.count()

77

In [97]:
df_airport_i94port_join.take(1)

[Row(airport_id='KBWI', airport_type='large_airport', airport_name='Baltimore/Washington International Thurgood Marshall Airport', state_code='MD', elevation_ft=146.0, airport_longitude=-76.66829681396484, airport_latitude=39.17539978027344, i94port_code='BAL')]

In [99]:
df_imm4.take(1)

[Row(admission_number=2147483647, i94port_city='LOS', destination_state_code='CA', airline_code='QF', visa_type='B1', visa_category=1, arrival_date='2016-04-30', departure_date='2016-05-08', allow_to_stay_date='10292016', entry_year=2016, entry_month=4, citizenship_of_country=245, resident_of_country=438, entry_mode='Air', age=40)]

In [100]:
df_imm_airport = df_imm4.join(df_airport_i94port_join, df_imm4.i94port_city==df_airport_i94port_join.i94port_code, "inner")

In [101]:
df_imm_airport.count()

2373797

In [102]:
df_imm_airport.take(2)

[Row(admission_number=2147483647, i94port_city='LOS', destination_state_code='CA', airline_code='QF', visa_type='B1', visa_category=1, arrival_date='2016-04-30', departure_date='2016-05-08', allow_to_stay_date='10292016', entry_year=2016, entry_month=4, citizenship_of_country=245, resident_of_country=438, entry_mode='Air', age=40, airport_id='KLAX', airport_type='large_airport', airport_name='Los Angeles International Airport', state_code='CA', elevation_ft=125.0, airport_longitude=-118.40799713134766, airport_latitude=33.942501068115234, i94port_code='LOS'),
 Row(admission_number=2147483647, i94port_city='LOS', destination_state_code='NV', airline_code='VA', visa_type='B1', visa_category=1, arrival_date='2016-04-30', departure_date='2016-05-17', allow_to_stay_date='10292016', entry_year=2016, entry_month=4, citizenship_of_country=245, resident_of_country=438, entry_mode='Air', age=32, airport_id='KLAX', airport_type='large_airport', airport_name='Los Angeles International Airport', st

In [103]:
df_imm_airport.columns

['admission_number',
 'i94port_city',
 'destination_state_code',
 'airline_code',
 'visa_type',
 'visa_category',
 'arrival_date',
 'departure_date',
 'allow_to_stay_date',
 'entry_year',
 'entry_month',
 'citizenship_of_country',
 'resident_of_country',
 'entry_mode',
 'age',
 'airport_id',
 'airport_type',
 'airport_name',
 'state_code',
 'elevation_ft',
 'airport_longitude',
 'airport_latitude',
 'i94port_code']

In [104]:
df_imm_airport.printSchema()

root
 |-- admission_number: integer (nullable = true)
 |-- i94port_city: string (nullable = true)
 |-- destination_state_code: string (nullable = true)
 |-- airline_code: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- visa_category: integer (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- departure_date: string (nullable = true)
 |-- allow_to_stay_date: string (nullable = true)
 |-- entry_year: integer (nullable = true)
 |-- entry_month: integer (nullable = true)
 |-- citizenship_of_country: integer (nullable = true)
 |-- resident_of_country: integer (nullable = true)
 |-- entry_mode: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- airport_id: string (nullable = true)
 |-- airport_type: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- elevation_ft: float (nullable = true)
 |-- airport_longitude: float (nullable = true)
 |-- airport_latitude: float (nullable = t

In [105]:
df_i94port_gb = df_imm_airport.select("i94port_city").groupby('i94port_city').count() # groupby cities

In [107]:
top25 = df_i94port_gb.sort("count", ascending=False).limit(25)

In [108]:
top25.printSchema()

root
 |-- i94port_city: string (nullable = true)
 |-- count: long (nullable = false)



In [109]:
top25.show(25)

+------------+------+
|i94port_city| count|
+------------+------+
|         NYC|456898|
|         MIA|317537|
|         LOS|281537|
|         SFR|142534|
|         ORL|139045|
|         HHW|131297|
|         NEW|128796|
|         CHI|119092|
|         HOU| 91120|
|         FTL| 86564|
|         ATL| 85744|
|         LVG| 83476|
|         BOS| 52642|
|         SEA| 43650|
|         PHO| 34588|
|         DET| 32194|
|         TAM| 23503|
|         PHI| 23216|
|         DEN| 16561|
|         FMY| 15288|
|         CLT| 14829|
|         SDP|  8381|
|         WPB|  6837|
|         SNA|  6046|
|         SLC|  4705|
+------------+------+



In [110]:
df_imm_airport.count()

2373797

In [111]:
sum_number = top25.agg({"count":"sum"}).collect()[0]
sum_number

Row(sum(count)=2346080)

In [112]:
sum_number["sum(count)"]/df_imm_airport.count() # Entries from top 25 cities contributed >98% traveling

0.9883237698927078

In [113]:
dest_gb = df_imm_airport.groupby('destination_state_code').count() # groupby destination states

In [114]:
top25_dest = dest_gb.sort("count", ascending=False).limit(25)

In [115]:
top25_dest.show()

+----------------------+------+
|destination_state_code| count|
+----------------------+------+
|                    FL|543817|
|                    NY|491032|
|                    CA|411991|
|                    HI|153975|
|                    NV|100852|
|                    TX| 90365|
|                    IL| 69301|
|                    NJ| 64051|
|                    MA| 56463|
|                    GA| 38403|
|                    WA| 35339|
|                    MI| 25348|
|                    NE| 23438|
|                    PA| 23417|
|                    LA| 18474|
|                    NC| 17219|
|                    AZ| 16411|
|                    CO| 12948|
|                    OH| 12663|
|                    CT| 11639|
+----------------------+------+
only showing top 20 rows



In [116]:
sum_number2 = top25_dest.agg({"count":"sum"}).collect()[0]
sum_number2

Row(sum(count)=2269939)

In [117]:
sum_number2["sum(count)"]/df_imm_airport.count()  # > 95% traveling are from top25 destination states

0.9562481543282766