# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np
import configparser
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,isnan,when,count,avg
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as F
import datetime

def sas_date_to_datetime(sas_date):
    '''
    Converts given SAS numeric date to datetime
    '''
    if sas_date is None:
        return None
    return str(datetime.date(1960, 1, 1) + datetime.timedelta(days=sas_date))

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

def generate_time_df(immigration_data):
    '''
    Returns a df with (timestamp,year,month,day,week,weekday) columns
    From given immigrations df with (SAS date column(s))
    '''
    # get date columns
    time_df = immigration_data.select(['arrival_date', 'departure_date'])

    # Start creating unified date time df
    arrival_df = time_df.select('arrival_date').dropDuplicates()
    departure_df = time_df.select('departure_date').dropDuplicates()
    unified_df = arrival_df.union(departure_df).dropDuplicates()

    reg_convert_sas_date = F.udf(lambda date: sas_date_to_datetime(date))

    # Apply sas date conversion function
    unified_df = unified_df.withColumn('arrivalDateAsDATE', reg_convert_sas_date(unified_df.arrival_date))

    # Add other columns
    unified_df = unified_df.withColumn('year', F.year('arrivalDateAsDATE'))
    unified_df = unified_df.withColumn('month', F.month('arrivalDateAsDATE'))
    unified_df = unified_df.withColumn('day', F.dayofmonth('arrivalDateAsDATE'))
    unified_df = unified_df.withColumn('week', F.weekofyear('arrivalDateAsDATE'))
    unified_df = unified_df.withColumn('weekday', F.dayofweek('arrivalDateAsDATE'))

    # Drop date string column since we no longer need it
    unified_df = unified_df.drop('arrivalDateAsDATE')

    # Rename 
    unified_df = unified_df.withColumnRenamed('arrival_date','timestamp')
    unified_df = unified_df.withColumn('timestamp',unified_df['timestamp'].cast('int'))
    
    return unified_df

# For immigration data
# These columns are either meaningless for the scope of the project or almost empty, so we are removing them
drop_cols = ("i94yr","i94mon","i94res","count","visapost","occup","entdepa","entdepd","entdepu","matflag" \
             ,"biryear","insnum","fltno","dtadfile","dtaddto","airline","admnum")

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

## Purpose
This project aims to create a single source of truth database regarding I-94 information gathered throughout the year 2016.

### Considerations
* Unnamed column in immigration data will be dropped since there is no information about it

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

# 1) Clean and explore cities, ports and immigrations

## 1.1) Read Cities

In [2]:
# Explore us-cities-demographics.csv
# read data into frame read csv
city_df = spark.read.options(header="true",inferSchema="true",nullValue = "NULL",delimiter=";").csv('us-cities-demographics.csv')

# drop race and count columns
city_df = city_df.drop('Race','Count')

# Get non-null state code and city
city_df = city_df.filter(city_df['State Code'].isNotNull() & city_df['City'].isNotNull())

city_df.printSchema()
city_df.limit(10).toPandas()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)



Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC


## 1.2) Read Ports

In [3]:
# read
ports_df = spark.read.options(header="true",inferSchema="true",nullValue = "NULL").csv('airport-codes_csv.csv')

# Record count is 55075 initally
# Lets take iso_country US, non-null iata code and non-closed records
ports_df = ports_df.filter((ports_df.iata_code.isNotNull()) \
                            & (ports_df.iso_country == 'US') \
                            & (ports_df.type != 'closed') )

drop_cols = ('iso_country','gps_code','elevation_ft','local_code','coordinates')

ports_df = ports_df.drop(*drop_cols)

# Show some info
ports_df.printSchema()
ports_df.limit(10).toPandas()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- iata_code: string (nullable = true)



Unnamed: 0,ident,type,name,continent,iso_region,municipality,iata_code
0,07FA,small_airport,Ocean Reef Club Airport,,US-FL,Key Largo,OCA
1,0AK,small_airport,Pilot Station Airport,,US-AK,Pilot Station,PQS
2,0CO2,small_airport,Crested Butte Airpark,,US-CO,Crested Butte,CSE
3,0TE7,small_airport,LBJ Ranch Airport,,US-TX,Johnson City,JCY
4,13MA,small_airport,Metropolitan Airport,,US-MA,Palmer,PMX
5,13Z,seaplane_base,Loring Seaplane Base,,US-AK,Loring,WLR
6,16A,small_airport,Nunapitchuk Airport,,US-AK,Nunapitchuk,NUP
7,16K,seaplane_base,Port Alice Seaplane Base,,US-AK,Port Alice,PTC
8,19AK,small_airport,Icy Bay Airport,,US-AK,Icy Bay,ICY
9,19P,seaplane_base,Port Protection Seaplane Base,,US-AK,Port Protection,PPV


## 1.3) Combine cities and ports

In [4]:
# Join on 'City.city == ports_df.municipality
combined_df = city_df.join(ports_df, city_df.City == ports_df.municipality).dropDuplicates()
combined_df.printSchema() 

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- iata_code: string (nullable = true)



### Now cities have been matched!

## 1.4) Finalize ports and cities

### 1.4.1) Ports

In [5]:
# Define a function to get state part from 'iso_region' column which is the second item of the list after split by '-'
def split_iso_region(iso_region):
    return iso_region.split("-")[1]

# Register split function
reg_split_iso_region = F.udf(lambda iso_reg: split_iso_region(iso_reg))


final_ports_df = combined_df.select(['ident','name','municipality','type','iata_code','iso_region']) \
                            .withColumnRenamed('ident','port_id') \
                            .withColumnRenamed('municipality','city') 

# Apply split function to iso_region
final_ports_df = final_ports_df.withColumn('iso_region',reg_split_iso_region('iso_region'))
final_ports_df.printSchema()
final_ports_df.limit(10).toPandas()

root
 |-- port_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- type: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- iso_region: string (nullable = true)



Unnamed: 0,port_id,name,city,type,iata_code,iso_region
0,KBFI,Boeing Field King County International Airport,Seattle,large_airport,BFI,WA
1,KVSF,Hartness State (Springfield) Airport,Springfield,small_airport,VSF,VT
2,KSAF,Santa Fe Municipal Airport,Santa Fe,medium_airport,SAF,NM
3,KCHA,Lovell Field,Chattanooga,large_airport,CHA,TN
4,KMBO,Bruce Campbell Field,Madison,small_airport,DXE,MS
5,KOFF,Offutt Air Force Base,Omaha,medium_airport,OFF,NE
6,KTTD,Portland Troutdale Airport,Portland,medium_airport,TTD,OR
7,KWHP,Whiteman Airport,Los Angeles,small_airport,WHP,CA
8,KLAF,Purdue University Airport,Lafayette,medium_airport,LAF,IN
9,KSHV,Shreveport Regional Airport,Shreveport,medium_airport,SHV,LA


### 1.4.2) Cities

In [6]:
final_cities_df = combined_df.select(['City','State','Median Age','Male Population','Female Population','Total Population' \
                                      ,'Number of Veterans','Foreign-born','Average Household Size']) \
                            .withColumnRenamed('City','city') \
                            .withColumnRenamed('State','state') \
                            .withColumnRenamed('Median Age','median_age') \
                            .withColumnRenamed('Male Population','male_pop') \
                            .withColumnRenamed('Female Population','female_pop') \
                            .withColumnRenamed('Total Population','total_pop') \
                            .withColumnRenamed('Number of Veterans','veterans') \
                            .withColumnRenamed('Foreign-born','foreign_born') \
                            .withColumnRenamed('Average Household Size','avg_household_size')
final_cities_df.printSchema()
final_cities_df.limit(10).toPandas()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_pop: integer (nullable = true)
 |-- female_pop: integer (nullable = true)
 |-- total_pop: integer (nullable = true)
 |-- veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- avg_household_size: double (nullable = true)



Unnamed: 0,city,state,median_age,male_pop,female_pop,total_pop,veterans,foreign_born,avg_household_size
0,Seattle,Washington,35.5,345659,338784,684443,29364,119840,2.13
1,Springfield,Illinois,38.8,55639,62170,117809,7525,4264,2.22
2,Santa Fe,New Mexico,44.1,40601,43511,84112,5083,13824,2.41
3,Chattanooga,Tennessee,36.6,83640,92957,176597,10001,10599,2.4
4,Madison,Wisconsin,30.7,122596,126360,248956,9707,30090,2.23
5,Omaha,Nebraska,34.2,218789,225098,443887,24503,48263,2.47
6,Portland,Oregon,36.7,313516,318671,632187,29940,86041,2.43
7,Los Angeles,California,35.0,1958998,2012898,3971896,85417,1485425,2.86
8,Lafayette,Indiana,33.5,34313,36857,71170,5045,5697,2.19
9,Shreveport,Louisiana,35.2,93138,103856,196994,14287,5658,2.53


## 1.5) Immigrations

### Process immigrations data one by one in a loop, each iteration:
* Read data
* Repartition
* Drop duplicates, redundant columns, null records
* Finally,
    * JOIN ON conditions list = __[df.state == final_ports_df.iso_region, df.landing_port == final_ports_df.iata_code]__
    * joined_immig = df.join(final_ports_df, conditions)
    * Additionally, create time data using joined_immig (using arrdate and depdate columns)

In [7]:
# These columns are either meaningless for the scope of the project or almost empty, so we are removing them
drop_cols = ("i94yr","i94mon","i94res","count","visapost","occup","entdepa","entdepd","entdepu","matflag" \
             ,"biryear","insnum","fltno","dtadfile","dtaddto","airline","admnum")

# Month name list
months = ['jan','feb','mar','apr','may','jun','jul','aug','sep','oct','nov','dec']

#for month in months:
#immigration_data_path = f'../../data/18-83510-I94-Data-2016/i94_{month}16_sub.sas7bdat'
immigration_data_path = f'../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat'

# Read data using month in input file name
df = spark.read.format('com.github.saurfang.sas.spark').load(immigration_data_path)
df.repartition(6)
# Drop duplicates, redundant columns, null records from each df and write to S3 in parquet format
df = df.dropDuplicates().drop(*drop_cols).na.drop()

# Fix column names and types
df = df.withColumnRenamed('cicid','immigration_id')
df = df.withColumn('immigration_id',df['immigration_id'].cast('int'))
df = df.withColumnRenamed('i94cit','origin')
df = df.withColumn('origin',df['origin'].cast('int'))
df = df.withColumnRenamed('i94bir','age')
df = df.withColumn('age',df['age'].cast('int'))
df = df.withColumnRenamed('i94mode','arrival_mode')
df = df.withColumn('arrival_mode',df['arrival_mode'].cast('int'))
df = df.withColumnRenamed('i94visa','visa')
df = df.withColumn('visa',df['visa'].cast('int'))
df = df.withColumnRenamed('i94port','landing_port') \
    .withColumnRenamed('arrdate','arrival_date') \
    .withColumnRenamed('i94mode','arrival_mode') \
    .withColumnRenamed('i94addr','state') \
    .withColumnRenamed('depdate','departure_date')

# Now generate time df from current immigration data using 'arrival_date' and 'departure_date' columns
#time_df = generate_time_df(df)
#time_df = time_df.na.drop()
#time_df.printSchema()
#time_df.limit(10).toPandas()

In [9]:
# Join conditions as a list
conditions = [df.state == final_ports_df.iso_region, df.landing_port == final_ports_df.iata_code]

joined_immig = df.join(final_ports_df, conditions)

In [10]:
# Final df to write to S3
fact_immig_df = joined_immig.select('immigration_id','origin','landing_port' \
                                    ,'arrival_date','departure_date','arrival_mode' \
                                    ,'city','age','visa','visatype','gender')
fact_immig_df.printSchema()
fact_immig_df.limit(10).toPandas()
#print(f'Count: {fact_immig_df.count()}') # yields 466855

root
 |-- immigration_id: integer (nullable = true)
 |-- origin: integer (nullable = true)
 |-- landing_port: string (nullable = true)
 |-- arrival_date: double (nullable = true)
 |-- departure_date: double (nullable = true)
 |-- arrival_mode: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- visa: integer (nullable = true)
 |-- visatype: string (nullable = true)
 |-- gender: string (nullable = true)



Unnamed: 0,immigration_id,origin,landing_port,arrival_date,departure_date,arrival_mode,city,age,visa,visatype,gender
0,75354,107,BOS,20485.0,20488.0,1,Boston,28,2,B2,F
1,81940,116,BOS,20485.0,20486.0,1,Boston,40,1,WB,M
2,85982,123,BOS,20485.0,20492.0,1,Boston,64,2,WT,F
3,126885,245,BOS,20485.0,20491.0,1,Boston,35,2,B2,F
4,294117,213,BOS,20486.0,20488.0,1,Boston,59,1,B1,M
5,304039,245,BOS,20486.0,20490.0,1,Boston,26,2,B2,F
6,304212,245,BOS,20486.0,20599.0,1,Boston,26,3,F1,F
7,307803,251,BOS,20486.0,20500.0,1,Boston,66,2,B2,M
8,435173,148,BOS,20487.0,20495.0,1,Boston,24,2,WT,F
9,460876,245,BOS,20487.0,20505.0,1,Boston,54,2,B2,M


In [11]:
 # get date columns
time_df = fact_immig_df.select(['arrival_date', 'departure_date'])

# Start creating unified date time df
arrival_df = time_df.select('arrival_date').dropDuplicates()
departure_df = time_df.select('departure_date').dropDuplicates()
unified_df = arrival_df.union(departure_df).dropDuplicates()

reg_convert_sas_date = F.udf(lambda date: sas_date_to_datetime(date))

unified_df.limit(20).toPandas()

# Apply sas date conversion function
unified_df = unified_df.withColumn('arrivalDateAsDATE', reg_convert_sas_date(unified_df.arrival_date))

# Add other columns
#unified_df = unified_df.withColumn('year', F.year('arrivalDateAsDATE'))
#unified_df = unified_df.withColumn('month', F.month('arrivalDateAsDATE'))
#unified_df = unified_df.withColumn('day', F.dayofmonth('arrivalDateAsDATE'))
#unified_df = unified_df.withColumn('week', F.weekofyear('arrivalDateAsDATE'))
#unified_df = unified_df.withColumn('weekday', F.dayofweek('arrivalDateAsDATE'))

# Drop date string column since we no longer need it
#unified_df = unified_df.drop('arrivalDateAsDATE')

# Rename 
#unified_df = unified_df.withColumnRenamed('arrival_date','sas_timestamp')

#unified_df.show(50)

Unnamed: 0,arrival_date
0,20503.0
1,20511.0
2,20593.0
3,20522.0
4,20550.0
5,20592.0
6,20496.0
7,20518.0
8,20556.0
9,20485.0


In [12]:
# Apply sas date conversion function
unified_df = unified_df.withColumn('arrivalDateAsDATE', reg_convert_sas_date(unified_df.arrival_date))
unified_df.limit(20).toPandas()

Unnamed: 0,arrival_date,arrivalDateAsDATE
0,20503.0,2016-02-19
1,20511.0,2016-02-27
2,20593.0,2016-05-19
3,20522.0,2016-03-09
4,20550.0,2016-04-06
5,20592.0,2016-05-18
6,20496.0,2016-02-12
7,20518.0,2016-03-05
8,20556.0,2016-04-12
9,20485.0,2016-02-01


In [12]:
time_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in time_df.columns]).show()

+---------+----+-----+---+----+-------+
|timestamp|year|month|day|week|weekday|
+---------+----+-----+---+----+-------+
|        0|   0|    0|  0|   0|      0|
+---------+----+-----+---+----+-------+



In [13]:
time_df.na.drop().count()

118

## Skewness check for some suspicious columns!

In [24]:
fact_immig_df.groupBy('visa').count().orderBy('count', ascending=False).show()
fact_immig_df.groupBy('visatype').count().orderBy('count', ascending=False).show()
# i94visa seems skewed a bit, let's repartition
#df.rdd.getNumPartitions()
#partitioned_df = df.repartition('visa')
#partitioned_df.rdd.getNumPartitions()

+----+------+
|visa| count|
+----+------+
| 2.0|396818|
| 1.0| 66865|
| 3.0|  3172|
+----+------+

+--------+------+
|visatype| count|
+--------+------+
|      B2|260941|
|      WT|135794|
|      B1| 36372|
|      WB| 26666|
|      E2|  2910|
|      F1|  2626|
|      E1|   513|
|      F2|   401|
|       I|   383|
|      M1|   144|
|      CP|    77|
|      I1|    21|
|     CPL|     6|
|      M2|     1|
+--------+------+



In [25]:
fact_immig_df.groupBy('arrival_mode').count().orderBy('count', ascending=False).show()

+------------+------+
|arrival_mode| count|
+------------+------+
|         1.0|465131|
|         2.0|  1014|
|         3.0|   650|
|         9.0|    60|
+------------+------+



In [26]:
fact_immig_df.rdd.getNumPartitions()

200

In [27]:
fact_immig_df = fact_immig_df.repartition(col('visa'))
fact_immig_df.rdd.getNumPartitions()

200

# 2) Clean and explore GlobalLandandTemperaturesByCity
* Filter by 'United States'
* Drop redundant colums and rename the remaining ones
* Drop rows with null values
* Calculate the average of temperature and temperature uncertainty and group by 'city'
* Resulting data frame can be written to S3

In [10]:
temps_df = spark.read.options(header="true",inferSchema="true",nullValue = "NULL").csv('../../data2/GlobalLandTemperaturesByCity.csv')

# Filter by united states since others are meaningless
temps_df = temps_df.filter(temps_df.Country == 'United States')

# Drop redundant columns
temps_df = temps_df.drop("dt","Country","Latitude","Longitude")

final_temperature_df = temps_df.select(col('City'),col('AverageTemperature'),col('AverageTemperatureUncertainty')) \
                        .withColumnRenamed('City','city').withColumnRenamed('AverageTemperature','avg_temp') \
                        .withColumnRenamed('AverageTemperatureUncertainty','avg_temp_uncertainty')

avg_df = final_temperature_df.select(col('city'),col('avg_temp')) \
            .groupBy('city').avg('avg_temp') # average_temp_df

avg_uncer_df = final_temperature_df.select(col('city'),col('avg_temp_uncertainty')) \
            .groupBy('city').avg('avg_temp_uncertainty') # average_uncertainty_df

# Now finalize...
final_uni_df = avg_df.join(avg_uncer_df, ['city']) # join column name is same('city'), so we pass it as a list
# Fix column names and order
final_uni_df = final_uni_df.select(final_uni_df.city, final_uni_df['avg(avg_temp)'] \
                                   ,final_uni_df['avg(avg_temp_uncertainty)'])
final_uni_df = final_uni_df.withColumnRenamed('avg(avg_temp)', 'avg_temp') \
                            .withColumnRenamed('avg(avg_temp_uncertainty)','avg_temp_uncertainty')
final_uni_df.printSchema()
final_uni_df.show(5)
final_uni_df.count()

root
 |-- city: string (nullable = true)
 |-- avg_temp: double (nullable = true)
 |-- avg_temp_uncertainty: double (nullable = true)

+-----------+------------------+--------------------+
|       city|          avg_temp|avg_temp_uncertainty|
+-----------+------------------+--------------------+
|  Worcester| 7.341440525809558|  1.3742648284706618|
| Charleston|18.696557871112546|  1.4356107726835539|
|     Corona| 16.12483712696008|  0.7674734446130481|
|Springfield|10.647931343609901|  1.3296092707991722|
|      Tempe|  21.0487690509584|  0.7654862085086479|
+-----------+------------------+--------------------+
only showing top 5 rows



248

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.

In [4]:
# Read cities
city_df = spark.read.options(header="true",inferSchema="true",nullValue = "NULL",delimiter=";").csv('us-cities-demographics.csv')
# drop race and count columns
city_df = city_df.drop('Race','Count')
# Get non-null state code and city
city_df = city_df.filter(city_df['State Code'].isNotNull() & city_df['City'].isNotNull())

# Read Ports
ports_df = spark.read.options(header="true",inferSchema="true",nullValue = "NULL").csv('airport-codes_csv.csv')
# Lets take iso_country US, non-null iata code and non-closed records
ports_df = ports_df.filter((ports_df.iata_code.isNotNull()) \
                            & (ports_df.iso_country == 'US') \
                            & (ports_df.type != 'closed') )
drop_cols = ('iso_country','gps_code','elevation_ft','local_code','coordinates')
ports_df = ports_df.drop(*drop_cols)


# Join on 'City.city == ports_df.municipality
combined_df = city_df.join(ports_df, city_df.City == ports_df.municipality).dropDuplicates()

# Register split function
reg_split_iso_region = F.udf(lambda iso_reg: split_iso_region(iso_reg))

final_ports_df = combined_df.select(['ident','name','municipality','type','iata_code','iso_region']) \
                        .withColumnRenamed('ident','port_id') \
                        .withColumnRenamed('municipality','city') 

# Apply split function to iso_region
final_ports_df = final_ports_df.withColumn('iso_region',reg_split_iso_region('iso_region').alias('region'))

# Cache it for optimization
final_ports_df = final_ports_df.cache()


final_cities_df = combined_df.select(['City','State','Median Age','Male Population','Female Population','Total Population' \
                                  ,'Number of Veterans','Foreign-born','Average Household Size']) \
                        .withColumnRenamed('City','city') \
                        .withColumnRenamed('State','state') \
                        .withColumnRenamed('Median Age','median_age') \
                        .withColumnRenamed('Male Population','male_pop') \
                        .withColumnRenamed('Female Population','female_pop') \
                        .withColumnRenamed('Total Population','total_pop') \
                        .withColumnRenamed('Number of Veterans','veterans') \
                        .withColumnRenamed('Foreign-born','foreign_born') \
                        .withColumnRenamed('Average Household Size','avg_household_size')

# Cache it for optimization
final_cities_df = final_ports_df.cache()

# Start working on immigrations data

# These columns are either meaningless for the scope of the project or almost empty, so we are removing them
drop_cols = ("i94yr","i94mon","i94res","count","visapost","occup","entdepa","entdepd","entdepu","matflag" \
         ,"biryear","insnum","fltno","dtadfile","dtaddto","airline","admnum")

# Month name list
months = ['jan','feb','mar','apr','may','jun','jul','aug','sep','oct','nov','dec']

#for month in months:
immigration_data_path = f'../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Read data using month in input file name
# Jan
jan_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat')
jan_df = jan_df.dropDuplicates().drop(*drop_cols).na.drop()
# Feb
feb_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat')
feb_df = feb_df.dropDuplicates().drop(*drop_cols).na.drop()
# mar
mar_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat')
mar_df = mar_df.dropDuplicates().drop(*drop_cols).na.drop()
# apr
apr_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
apr_df = apr_df.dropDuplicates().drop(*drop_cols).na.drop()
# may
may_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat')
may_df = may_df.dropDuplicates().drop(*drop_cols).na.drop()
# jun
jun_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat')
jun_df = jun_df.dropDuplicates().drop(*drop_cols).na.drop()
# jul
jul_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat')
jul_df = jul_df.dropDuplicates().drop(*drop_cols).na.drop()
# aug
aug_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat')
aug_df = aug_df.dropDuplicates().drop(*drop_cols).na.drop()
# sep
sep_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat')
sep_df = sep_df.dropDuplicates().drop(*drop_cols).na.drop()
# oct
oct_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat')
oct_df = oct_df.dropDuplicates().drop(*drop_cols).na.drop()
# nov
nov_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat')
nov_df = nov_df.dropDuplicates().drop(*drop_cols).na.drop()
# dec
dec_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat')
dec_df = dec_df.dropDuplicates().drop(*drop_cols).na.drop()


# Cache before using it
df = df.cache()

df = df.dropDuplicates().drop(*drop_cols).na.drop()

# Fix column names and types
df = df.withColumnRenamed('cicid','immigration_id')
df = df.withColumn('immigration_id',df['immigration_id'].cast('int'))
df = df.withColumnRenamed('i94cit','origin')
df = df.withColumn('origin',df['origin'].cast('int'))
df = df.withColumnRenamed('i94bir','age')
df = df.withColumn('age',df['age'].cast('int'))
df = df.withColumnRenamed('i94mode','arrival_mode')
df = df.withColumn('arrival_mode',df['arrival_mode'].cast('int'))
df = df.withColumnRenamed('i94visa','visa')
df = df.withColumn('visa',df['visa'].cast('int'))
df = df.withColumnRenamed('i94port','landing_port') \
    .withColumnRenamed('arrdate','arrival_date') \
    .withColumnRenamed('i94mode','arrival_mode') \
    .withColumnRenamed('i94addr','state') \
    .withColumnRenamed('depdate','departure_date')

# Now generate time df from current immigration data using 'arrival_date' and 'departure_date' columns
time_df = generate_time_df(df)
time_df
#print(f'Month: {month}, DF row count: {time_df.count()}')

DataFrame[timestamp: double, year: int, month: int, day: int, week: int, weekday: int]

# Let's see some data distribution

## Time data

In [5]:
time_df.groupBy('year').count().orderBy('count', ascending=False).show()

+----+-----+
|year|count|
+----+-----+
|2016|  201|
|2015|    9|
|2012|    2|
|2014|    2|
|2069|    1|
|2020|    1|
|2001|    1|
|2084|    1|
+----+-----+



In [6]:
time_df.groupBy('month').count().orderBy('count', ascending=False).show()

+-----+-----+
|month|count|
+-----+-----+
|    5|   37|
|    4|   34|
|    6|   32|
|    7|   32|
|    8|   31|
|    9|   19|
|    3|   19|
|    1|    9|
|   12|    2|
|    2|    2|
|   10|    1|
+-----+-----+



In [7]:
time_df.groupBy('day').count().orderBy('count', ascending=False).show()

+---+-----+
|day|count|
+---+-----+
|  3|   10|
|  5|   10|
| 15|    9|
| 18|    9|
| 10|    8|
| 14|    8|
| 13|    8|
|  7|    8|
|  4|    8|
|  8|    8|
| 12|    8|
| 30|    7|
| 26|    7|
|  2|    7|
| 17|    7|
|  6|    7|
| 22|    7|
| 19|    7|
| 16|    7|
| 11|    7|
+---+-----+
only showing top 20 rows



## Immigrations data

In [8]:
df.groupBy('state').count().orderBy('count', ascending=False).show()

+-----+------+
|state| count|
+-----+------+
|   FL|516283|
|   NY|465335|
|   CA|393687|
|   HI|138868|
|   TX|108269|
|   NV|102373|
|   GU| 86276|
|   NJ| 64903|
|   IL| 64794|
|   MA| 54230|
|   WA| 44594|
|   GA| 30901|
|   VA| 26037|
|   PA| 25736|
|   MI| 24782|
|   DC| 23671|
|   MD| 21258|
|   NC| 19471|
|   NE| 19130|
|   LA| 17705|
+-----+------+
only showing top 20 rows



In [9]:
df.groupBy('visa').count().orderBy('count', ascending=False).show()

+----+-------+
|visa|  count|
+----+-------+
|   2|2020890|
|   1| 383181|
|   3|  27632|
+----+-------+



In [10]:
df.groupBy('visatype').count().orderBy('count', ascending=False).show()

+--------+-------+
|visatype|  count|
+--------+-------+
|      WT|1027119|
|      B2| 924936|
|      WB| 182475|
|      B1| 179414|
|     GMT|  67921|
|      F1|  25285|
|      E2|  15095|
|      E1|   2979|
|       I|   2887|
|      F2|   1638|
|      CP|    904|
|      M1|    679|
|      I1|    204|
|     GMB|    127|
|      M2|     30|
|     CPL|      8|
|     SBP|      2|
+--------+-------+



In [11]:
df.groupBy('arrival_mode').count().orderBy('count', ascending=False).show()

+------------+-------+
|arrival_mode|  count|
+------------+-------+
|           1|2378319|
|           3|  43401|
|           2|   7927|
|           9|   2056|
+------------+-------+



In [12]:
df.groupBy('age').count().orderBy('count', ascending=False).show()

+---+-----+
|age|count|
+---+-----+
| 30|57816|
| 31|56313|
| 33|56100|
| 34|55999|
| 32|55629|
| 35|55205|
| 29|54002|
| 36|53915|
| 40|52969|
| 37|52878|
| 28|52087|
| 38|50706|
| 39|49727|
| 41|49429|
| 42|48813|
| 44|48488|
| 45|48374|
| 43|48064|
| 27|47868|
| 46|46334|
+---+-----+
only showing top 20 rows



## Age seems distributed 'well' :)

In [13]:
df.groupBy('origin').count().orderBy('count', ascending=False).show()

+------+------+
|origin| count|
+------+------+
|   135|270066|
|   209|159489|
|   582|151434|
|   111|149476|
|   245|144744|
|   148|113654|
|   689|110979|
|   254|110165|
|   213| 81499|
|   438| 76587|
|   687| 63052|
|   123| 59081|
|   117| 56265|
|   129| 47507|
|   691| 45869|
|   692| 39018|
|   130| 36283|
|   696| 35434|
|   252| 33742|
|   251| 31547|
+------+------+
only showing top 20 rows



In [8]:
# Join conditions as a list
conditions = [df.state == final_ports_df.iso_region, df.landing_port == final_ports_df.iata_code]
joined_immig = df.join(final_ports_df, conditions)

NameError: name 'df' is not defined

In [7]:
# Read data using month in input file name
# Jan
jan_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat')
jan_df = jan_df.dropDuplicates().drop(*drop_cols).na.drop()
# Feb
feb_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat')
feb_df = feb_df.dropDuplicates().drop(*drop_cols).na.drop()
# mar
mar_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat')
mar_df = mar_df.dropDuplicates().drop(*drop_cols).na.drop()
# apr
apr_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
apr_df = apr_df.dropDuplicates().drop(*drop_cols).na.drop()
# may
may_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat')
may_df = may_df.dropDuplicates().drop(*drop_cols).na.drop()
# jun
jun_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat')
jun_df = jun_df.dropDuplicates().drop(*drop_cols).na.drop()
# jul
jul_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat')
jul_df = jul_df.dropDuplicates().drop(*drop_cols).na.drop()
# aug
aug_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat')
aug_df = aug_df.dropDuplicates().drop(*drop_cols).na.drop()
# sep
sep_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat')
sep_df = sep_df.dropDuplicates().drop(*drop_cols).na.drop()
# oct
oct_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat')
oct_df = oct_df.dropDuplicates().drop(*drop_cols).na.drop()
# nov
nov_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat')
nov_df = nov_df.dropDuplicates().drop(*drop_cols).na.drop()
# dec
dec_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat')
dec_df = dec_df.dropDuplicates().drop(*drop_cols).na.drop()

print(f'Row count jan: {jan_df.count()}')
print(f'Row count feb: {feb_df.count()}')
print(f'Row count mar: {mar_df.count()}')
print(f'Row count apr: {apr_df.count()}')
print(f'Row count may: {may_df.count()}')
print(f'Row count jun: {jun_df.count()}')
print(f'Row count jul: {jul_df.count()}')
print(f'Row count aug: {aug_df.count()}')
print(f'Row count sep: {sep_df.count()}')
print(f'Row count oct: {oct_df.count()}')
print(f'Row count nov: {nov_df.count()}')
print(f'Row count dec: {dec_df.count()}')

Row count jan: 1985026
Row count feb: 1888754
Row count mar: 2523391
Row count apr: 2431703
Row count may: 2634982
Row count jun: 2614153
Row count jul: 3368741
Row count aug: 3081796
Row count sep: 3303289
Row count oct: 2755589
Row count nov: 2358098
Row count dec: 3004757


In [15]:
conditions = [jan_df.i94addr == final_ports_df.iso_region, jan_df.i94port == final_ports_df.iata_code]
join_jan = jan_df.join(final_ports_df, conditions)
print(f'Row count jan: {join_jan.count()}')

Row count jan: 534975


In [27]:
conditions = [feb_df.i94addr == final_ports_df.iso_region, feb_df.i94port == final_ports_df.iata_code]
join_feb = feb_df.join(final_ports_df, conditions)
print(f'Row count feb: {join_feb.count()}')

Row count feb: 466855


In [17]:
conditions = [mar_df.i94addr == final_ports_df.iso_region, mar_df.i94port == final_ports_df.iata_code]
join_mar = mar_df.join(final_ports_df, conditions)
print(f'Row count mar: {join_mar.count()}')

Row count mar: 623297


In [18]:
conditions = [apr_df.i94addr == final_ports_df.iso_region, apr_df.i94port == final_ports_df.iata_code]
join_apr = apr_df.join(final_ports_df, conditions)
print(f'Row count apr: {join_apr.count()}')

Row count apr: 481368


In [19]:
conditions = [may_df.i94addr == final_ports_df.iso_region, may_df.i94port == final_ports_df.iata_code]
join_may = may_df.join(final_ports_df, conditions)
print(f'Row count may: {join_may.count()}')

Row count may: 614628


In [20]:
conditions = [jun_df.i94addr == final_ports_df.iso_region, jun_df.i94port == final_ports_df.iata_code]
join_jun = jun_df.join(final_ports_df, conditions)
print(f'Row count jun: {join_jun.count()}')

Row count jun: 600369


In [21]:
conditions = [jul_df.i94addr == final_ports_df.iso_region, jul_df.i94port == final_ports_df.iata_code]
join_jul = jul_df.join(final_ports_df, conditions)
print(f'Row count jul: {join_jul.count()}')

Row count jul: 816280


In [22]:
conditions = [aug_df.i94addr == final_ports_df.iso_region, aug_df.i94port == final_ports_df.iata_code]
join_aug = aug_df.join(final_ports_df, conditions)
print(f'Row count aug: {join_aug.count()}')

Row count aug: 724842


In [23]:
conditions = [sep_df.i94addr == final_ports_df.iso_region, sep_df.i94port == final_ports_df.iata_code]
join_sep = sep_df.join(final_ports_df, conditions)
print(f'Row count sep: {join_sep.count()}')

Row count sep: 723015


In [24]:
conditions = [oct_df.i94addr == final_ports_df.iso_region, oct_df.i94port == final_ports_df.iata_code]
join_oct = oct_df.join(final_ports_df, conditions)
print(f'Row count oct: {join_oct.count()}')

Row count oct: 645835


In [25]:
conditions = [nov_df.i94addr == final_ports_df.iso_region, nov_df.i94port == final_ports_df.iata_code]
join_nov = nov_df.join(final_ports_df, conditions)
print(f'Row count nov: {join_nov.count()}')

Row count nov: 609114


In [26]:
conditions = [dec_df.i94addr == final_ports_df.iso_region, dec_df.i94port == final_ports_df.iata_code]
join_dec = dec_df.join(final_ports_df, conditions)
print(f'Row count dec: {join_dec.count()}')

Row count dec: 824109


In [28]:
big_df = join_jan.union(join_feb)
big_df.printSchema()
print(f'Big df count: {big_df.count()}')

root
 |-- cicid: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- port_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- type: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- iso_region: string (nullable = true)

Big df count: 1001830


In [30]:
big_df.dropDuplicates().na.drop().count()

1001718