In [620]:
from pyspark.sql import SparkSession, SQLContext, GroupedData, HiveContext
from pyspark.sql.functions import *
from pyspark.sql.functions import date_add as d_add
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import lit
from pyspark.sql import Row
import os 
import pandas as pd 
import numpy as np
# Increased memory
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.config("spark.python.worker.memory", "15g") \
.enableHiveSupport().getOrCreate()
print("Setup complete")

Setup complete


In [621]:
os.listdir(os.getcwd()+'/data')

['I94_SAS_Labels_Descriptions.sas',
 'airport-codes_csv.csv',
 'arrival_port_map.csv',
 'country_details.csv',
 'flight_number.csv',
 'immigration_data_sample.csv',
 'mismatch.csv',
 'sas_data',
 'us-cities-demographics.csv',
 'valid_arrival_codes.csv']

In [622]:
# Setting up Files to read
data_dir = os.path.join(os.getcwd(), 'data')
country_detail_path = os.path.join(data_dir, r'country_details.csv')
airport_mismatch_path =  os.path.join(data_dir,r'mismatch.csv') 
airport_data = os.path.join(data_dir, 'airport-codes_csv.csv')
arrival_port_map = os.path.join(data_dir, 'arrival_port_map.csv')
demographic_data = os.path.join(data_dir, 'us-cities-demographics.csv')
valid_arrival_address_path = os.path.join(data_dir, 'valid_arrival_codes.csv')
sas_path = os.path.join(data_dir, 'sas_data')
sas_path_specific = os.path.join(sas_path, 'part-00001-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet')
output_path = os.path.join(os.getcwd(), 'spark_output_parquet')

In [623]:
# util functions
def shape(d):
    print((d.count(), len(d.columns)))
    
def detail(d):
    print("Shape ==>", (d.count(), len(d.columns)))
    print("Schema ::")
    d.printSchema()
    d.show(2)

In [624]:
!ls /home/jovyan/work/data/sas_data

part-00000-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00001-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00002-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00003-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00004-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00005-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00006-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00007-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00008-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00009-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00010-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00011-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00012-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
part-00013-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet
_SUCCESS


In [625]:
# Reading immigration data
print(sas_path)
df=spark.read.parquet(sas_path_specific)
detail(df)

/home/jovyan/work/data/sas_data
Shape ==> (220160, 28)
Schema ::
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- a

In [626]:
import json
def json_beautifier(d):
    s = json.loads(d.schema.json())['fields']
    cols = ['Field', 'Type', 'Nullable']
    schema_df = pd.DataFrame(columns=cols, data={
        'Field': [x['name'] for x in s],
        'Type': [x['type'] for x in s],
        'Nullable': [x['nullable'] for x in s]
    })
    return schema_df

x = json_beautifier(df)
x.to_csv('abc.csv')


In [627]:
def write_to_parquet(table, file_name):
    """
    Write the table as parquet file
    Parameters:
        table
        Name of the file
    Returns:
        Outputs the table as parquet file to a folder
    """
    file_output = os.path.join(output_path, file_name)
    table.write.mode("overwrite").parquet(file_output)

In [628]:
print("Immigration_table schema:")
df.printSchema()

Immigration_table schema:
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- a

In [629]:
# sampling down to run with a smaller dataset
df_small = df.sample(withReplacement=False, fraction=0.4, seed=3)
shape(df_small)
shape(df)

(87680, 28)
(220160, 28)


In [630]:
# Adjusting the immigration dataframe
df2 = df_small \
.withColumn("cicid", col("cicid").cast("integer")) \
.withColumn("year", col("i94yr").cast("integer")) \
.drop("i94yr") \
.withColumn("month", col("i94mon").cast("integer")) \
.drop("i94mon") \
.withColumn("bornCountry", col("i94cit").cast("integer")) \
.drop("i94cit") \
.withColumn("residentCountry", col("i94res").cast("integer")) \
.drop("i94res") \
.withColumnRenamed("i94port", "arrivalPort") \
.withColumn("mode", col("i94mode").cast("integer")) \
.drop("i94mode") \
.withColumnRenamed("i94addr", "arrivalAddress") \
.withColumn("age", col("i94bir").cast("integer")) \
.drop("i94bir") \
.withColumn("visaId", col("i94visa").cast("integer")) \
.drop("i94visa") \
.withColumnRenamed("entdepa", "arrivalFlag") \
.withColumnRenamed("entdepd", "departureFlag") \
.withColumnRenamed("entdepu", "updateFlag") \
.withColumnRenamed("matflag", "matchFlag") \
.withColumn("birthYear", col("biryear").cast("integer")) \
.drop("biryear") \
.withColumnRenamed("fltno", "flightNumber") \
.withColumnRenamed("visaType", "visaType") \
.withColumn("sasDate", to_date(lit("01/01/1960"), "MM/dd/yyyy")) \
.withColumn("arrdate", col("arrdate").cast("integer")) \
.withColumn("depdate", col("depdate").cast("integer")) \
.withColumn("arrivalDate", expr("date_add(sasDate, arrdate)")) \
.withColumn("departureDate", expr("date_add(sasDate, depdate)")) \
.drop("sasDate", "arrdate", "depdate", "count", "admnum", "dtadfile", "visapost", "occup", "dtaddto", "insnum")



In [631]:
print("New Immigration data schema:")
df2.printSchema()

New Immigration data schema:
root
 |-- cicid: integer (nullable = true)
 |-- arrivalPort: string (nullable = true)
 |-- arrivalAddress: string (nullable = true)
 |-- arrivalFlag: string (nullable = true)
 |-- departureFlag: string (nullable = true)
 |-- updateFlag: string (nullable = true)
 |-- matchFlag: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- flightNumber: string (nullable = true)
 |-- visaType: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- bornCountry: integer (nullable = true)
 |-- residentCountry: integer (nullable = true)
 |-- mode: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- visaId: integer (nullable = true)
 |-- birthYear: integer (nullable = true)
 |-- arrivalDate: date (nullable = true)
 |-- departureDate: date (nullable = true)



In [632]:
# Time table creation
time_table = df2.select(['arrivalDate'])\
                    .withColumnRenamed('arrivalDate','time')\
                    .dropna()

time_table = time_table \
             .withColumn('day', F.dayofmonth('time')) \
             .withColumn('month', F.month('time')) \
             .withColumn('year', F.year('time')) \
             .withColumn('week', F.weekofyear('time')) \
             .withColumn('weekday', F.dayofweek('time'))\
             .dropDuplicates()
write_to_parquet(time_table,"time_table.parquet" )

In [633]:
detail(time_table)

Shape ==> (4, 6)
Schema ::
root
 |-- time: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+----------+---+-----+----+----+-------+
|      time|day|month|year|week|weekday|
+----------+---+-----+----+----+-------+
|2016-04-05|  5|    4|2016|  14|      3|
|2016-04-02|  2|    4|2016|  13|      7|
+----------+---+-----+----+----+-------+
only showing top 2 rows



In [634]:
# Person table creation
person_table = df2.select(['birthYear','gender']) \
                 .dropDuplicates().dropna() \
                 .withColumn("personId", \
                        monotonically_increasing_id())\
                 .withColumnRenamed("gender", "person_gender")
write_to_parquet(person_table,"person_table.parquet" )

In [635]:
detail(person_table)

Shape ==> (194, 3)
Schema ::
root
 |-- birthYear: integer (nullable = true)
 |-- person_gender: string (nullable = true)
 |-- personId: long (nullable = false)

+---------+-------------+--------+
|birthYear|person_gender|personId|
+---------+-------------+--------+
|     1972|            M|       0|
|     2004|            F|       1|
+---------+-------------+--------+
only showing top 2 rows



In [636]:
# Country table creation
country_table = df2.select('bornCountry').distinct().withColumnRenamed("bornCountry", "countryId") \
            
country_name = spark.read.csv(country_detail_path).dropna()
country_name = country_name.withColumn("_c0", col("_c0").cast("integer"))\
.withColumnRenamed("_c0", "countryId_").withColumnRenamed("_c1", "countryName")
                
country_table = country_table.join(country_name, (country_table.countryId == country_name.countryId_))\
                             .drop('countryId_')\
                             .dropna()\
                             .dropDuplicates()
write_to_parquet(country_table,"country_table.parquet" )

In [637]:
detail(country_table)

Shape ==> (173, 2)
Schema ::
root
 |-- countryId: integer (nullable = true)
 |-- countryName: string (nullable = true)

+---------+-------------------+
|countryId|        countryName|
+---------+-------------------+
|      392|               MALI|
|      516|TRINIDAD AND TOBAGO|
+---------+-------------------+
only showing top 2 rows



In [638]:
# Arrival table creation
arrival_port_table = df2.groupby('arrivalPort') \
                     .agg(F.countDistinct("arrivalAddress"))\
                     .withColumnRenamed("count(arrivalAddress)","numArrivalAddress") \
                     .dropna() \
                     .dropDuplicates()\
                     .withColumn("arrivalPortId", \
                        monotonically_increasing_id())
arrival_data_map = spark.read.csv(arrival_port_map).dropna()\
.withColumnRenamed("_c0","arrivalPort_") \
.withColumnRenamed("_c1","stateCode")

arrival_port_table = arrival_port_table\
.join(arrival_data_map,  (arrival_port_table.arrivalPort == arrival_data_map.arrivalPort_))\
.dropna()

write_to_parquet(arrival_port_table,"arrival_port_table.parquet")

In [639]:
detail(arrival_port_table)

Shape ==> (98, 5)
Schema ::
root
 |-- arrivalPort: string (nullable = true)
 |-- numArrivalAddress: long (nullable = false)
 |-- arrivalPortId: long (nullable = false)
 |-- arrivalPort_: string (nullable = true)
 |-- stateCode: string (nullable = true)

+-----------+-----------------+-------------+------------+---------+
|arrivalPort|numArrivalAddress|arrivalPortId|arrivalPort_|stateCode|
+-----------+-----------------+-------------+------------+---------+
|        FMY|               33|            0|         FMY|       FL|
|        BGM|                2|            1|         BGM|       ME|
+-----------+-----------------+-------------+------------+---------+
only showing top 2 rows



In [640]:
# Status table creation
status_table = df2.select(['matchFlag']) \
               .withColumnRenamed('matchFlag', 'matchFlagField')\
               .dropna() \
               .dropDuplicates() \
               .withColumn("statusFlagId", monotonically_increasing_id()) 
write_to_parquet(status_table,"status_table.parquet")

In [641]:
detail(status_table)

Shape ==> (1, 2)
Schema ::
root
 |-- matchFlagField: string (nullable = true)
 |-- statusFlagId: long (nullable = false)

+--------------+------------+
|matchFlagField|statusFlagId|
+--------------+------------+
|             M|           0|
+--------------+------------+



In [688]:
# Creating airport table
airport = spark.read.format("csv").option("header", "true").option("delimiter", ",").load(airport_data)
y = json_beautifier(airport)
y.to_csv('airport_schema.csv')
airport.where(col('iata_code') == 'CLG').show()
shape(airport)
airport = airport.where(\
            (col("iso_country") == "US") &\
           (col("iata_code").isNotNull())\
          & (col("type").isin("large_airport", "medium_airport", "small_airport")) \
          & (col("type").isNotNull())) \
            .withColumn("isoRegion", substring(col("iso_region"), 4, 2)) \
            .dropna()\
            .dropDuplicates()

airport = airport\
.withColumn("lat", split(col("coordinates"), ",").getItem(0))\
.withColumn("long", split(col("coordinates"), ",").getItem(1))

airport = airport\
.withColumn("lat", col("lat").cast("float"))\
.withColumn("long", col("long").cast("float"))\
.drop('local_code', 'elevation_ft', 'continent', 'coordinates')

airport_table = airport.select(['ident', 'type', 'name', 'isoRegion', 'municipality',
                                'gps_code', 'iata_code','iso_country', 'lat', 'long']) \
               .dropDuplicates().dropna()
write_to_parquet(airport_table,"airport_table.parquet" )
shape(airport)


+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  CLG|       closed|    Coalinga Airport|        null|       NA|         US|     US-CA|        null|    null|      CLG|      null|-120.360116959, 3...|
| KC80|small_airport|New Coalinga Muni...|         622|       NA|         US|     US-CA|    Coalinga|    null|      CLG|       C80|-120.293998718261...|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+

(55075, 12)
(1835, 11)


In [685]:
airport.where(col('iata_code') == 'CHI').show()

+-----+----+----+-----------+----------+------------+--------+---------+---------+---+----+
|ident|type|name|iso_country|iso_region|municipality|gps_code|iata_code|isoRegion|lat|long|
+-----+----+----+-----------+----------+------------+--------+---------+---------+---+----+
+-----+----+----+-----------+----------+------------+--------+---------+---------+---+----+



In [666]:
detail(airport)

Shape ==> (1909, 11)
Schema ::
root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- isoRegion: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- long: float (nullable = true)

+-----+--------------+--------------------+-----------+----------+--------------------+--------+---------+---------+--------+-------+
|ident|          type|                name|iso_country|iso_region|        municipality|gps_code|iata_code|isoRegion|     lat|   long|
+-----+--------------+--------------------+-----------+----------+--------------------+--------+---------+---------+--------+-------+
| KBPT|medium_airport|Southeast Texas R...|         US|     US-TX|Beaumont/Port Arthur|    KBPT|      BPT|       TX|-94.0207|29.9508

+-----+----+----+-----------+----------+------------+--------+---------+---------+---+----+
|ident|type|name|iso_country|iso_region|municipality|gps_code|iata_code|isoRegion|lat|long|
+-----+----+----+-----------+----------+------------+--------+---------+---------+---+----+
+-----+----+----+-----------+----------+------------+--------+---------+---------+---+----+



In [667]:
# Creating demographics table
dem = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(demographic_data)
z= json_beautifier(dem)
z.to_csv('dem.csv')
detail(dem)
dem_table = dem \
.groupBy(col("State Code").alias("stateCode"), col("State").alias("state")).agg(
        round(mean('Median Age'), 2).alias("medianAge"),\
        sum("Total Population").alias("totalPopulation"),\
        sum("Male Population").alias("malePopulation"), \
        sum("Female Population").alias("femalePopulation"),\
        sum("Number of Veterans").alias("numberOfVeterans"),\
        sum("Foreign-born").alias("foreignBorn"), \
        round(mean("Average Household Size"),2).alias("averageHouseholdSize")
).dropna()
write_to_parquet(dem_table,"dem_table.parquet" )


Shape ==> (2891, 12)
Schema ::
root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)

+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+-------------+----------+---------------+-----------------+-

In [668]:
detail(dem)

Shape ==> (2891, 12)
Schema ::
root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)

+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+-------------+----------+---------------+-----------------+-

In [669]:
valid_arrival_address = spark.read.format("csv").option("header", "true").option("delimiter", ",").load(valid_arrival_address_path)
valid_arrival_address = valid_arrival_address\
.withColumnRenamed('_c0', 'code')\
.withColumnRenamed('0', 'arrivalAddressDescription')\

valid_arrival_address.show()


+----+-------------------------+
|code|arrivalAddressDescription|
+----+-------------------------+
|  AL|                'ALABAMA'|
|  AK|                 'ALASKA'|
|  AZ|                'ARIZONA'|
|  AR|               'ARKANSAS'|
|  CA|             'CALIFORNIA'|
|  CO|               'COLORADO'|
|  CT|            'CONNECTICUT'|
|  DE|               'DELAWARE'|
|  DC|      'DIST. OF COLUMBIA'|
|  FL|                'FLORIDA'|
|  GA|                'GEORGIA'|
|  GU|                   'GUAM'|
|  HI|                 'HAWAII'|
|  ID|                  'IDAHO'|
|  IL|               'ILLINOIS'|
|  IN|                'INDIANA'|
|  IA|                   'IOWA'|
|  KS|                 'KANSAS'|
|  KY|               'KENTUCKY'|
|  LA|              'LOUISIANA'|
+----+-------------------------+
only showing top 20 rows



In [696]:
# Merging different tables
common_cols = ['cicid', 'arrivalAddress','arrivalDate','departureDate','mode','bornCountry', 'visaId','visaType',
                         'gender','arrivalPort', 'birthYear','matchFlag']
df3 = df2.select(common_cols)\
     .where((col("age") >= 0) & (col("cicid").isNotNull())) \
     .where(col("birthYear").isNotNull() & col("gender").isNotNull())\
     .where(col("bornCountry").isNotNull())\
     .where(col("arrivalAddress").isNotNull())\
     .dropDuplicates(['cicid'])\
     .dropna()

# Joining with valid address to pick correct records

df3 = df3.join(valid_arrival_address, \
               (df3.arrivalAddress == valid_arrival_address.code), how='inner')\
         .dropna(subset=('arrivalAddress'))\
         .drop('code', 'arrivalAddressDescription')
shape(df3)




(68355, 12)


In [697]:
small_table = df3.select(common_cols)
detail(small_table)

Shape ==> (68355, 12)
Schema ::
root
 |-- cicid: integer (nullable = true)
 |-- arrivalAddress: string (nullable = true)
 |-- arrivalDate: date (nullable = true)
 |-- departureDate: date (nullable = true)
 |-- mode: integer (nullable = true)
 |-- bornCountry: integer (nullable = true)
 |-- visaId: integer (nullable = true)
 |-- visaType: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- arrivalPort: string (nullable = true)
 |-- birthYear: integer (nullable = true)
 |-- matchFlag: string (nullable = true)

+------+--------------+-----------+-------------+----+-----------+------+--------+------+-----------+---------+---------+
| cicid|arrivalAddress|arrivalDate|departureDate|mode|bornCountry|visaId|visaType|gender|arrivalPort|birthYear|matchFlag|
+------+--------------+-----------+-------------+----+-----------+------+--------+------+-----------+---------+---------+
|459674|            GA| 2016-04-03|   2016-04-07|   1|        135|     2|      WT|     M|        ATL|   

In [698]:
# merge with time, commenting the code to have arrivalDate as the column
# small_table = small_table.withColumnRenamed("arrivalDate", "time")  
shape(small_table)

(68355, 12)


In [699]:
# merge with status table
small_table = small_table.join(status_table,\
                               (small_table.matchFlag == status_table.matchFlagField) , how = 'inner') \
                         .dropna() \
                        .drop('matchFlagField', 'matchFlag') \
                       .dropDuplicates()
shape(small_table)

(68355, 12)


In [700]:
print("Missing countries wrt country table")
small_table.select('bornCountry').subtract(country_table.select("countryId")).show()

Missing countries wrt country table
+-----------+
|bornCountry|
+-----------+
|        148|
|        133|
|        756|
|        734|
|        746|
|        766|
|        267|
|        714|
|        254|
|        574|
|        718|
|        999|
|        311|
|        407|
|        727|
+-----------+



In [701]:

# merge with country
small_table = small_table.join(country_table, (small_table.bornCountry == country_table.countryId) , how = 'inner') \
                         .dropna() \
                        .drop('bornCountry') \
                       .dropDuplicates()
shape(small_table)
print('''
[Quality check]
The number of rows has reduced because of missing countries from the I94_SAS_Labels_Descriptions.sas file
'''
)

(59910, 13)

[Quality check]
The number of rows has reduced because of missing countries from the I94_SAS_Labels_Descriptions.sas file



In [702]:
# merge with person
small_table = small_table.join(person_table.select(['personId', 'person_gender', 'birthYear']),
                               ((small_table.gender == person_table.person_gender) & \
                               (small_table.birthYear == person_table.birthYear)),\
                               how = 'inner') \
                      .drop('person_gender','birthYear') \
                      .dropna() \
                      .dropDuplicates()
shape(small_table)

(59910, 13)


In [703]:
print('''
[Quality Check]
Missing countries wrt arrival table, these dont have a state code
''')
small_table.select('arrivalPort').subtract(arrival_port_table.select("arrivalPort_")).show()


[Quality Check]
Missing countries wrt arrival table, these dont have a state code

+-----------+
|arrivalPort|
+-----------+
|        YGF|
|        XXX|
|        WAS|
|        MAA|
|        W55|
+-----------+



In [704]:
# merge with arrival
small_table = small_table.join(arrival_port_table.select(['arrivalPortId', 'arrivalPort_', 'stateCode']),  \
                              (small_table.arrivalPort == arrival_port_table.arrivalPort_) , how = 'inner') \
                       .drop('arrivalPort_')\
                       .dropna()\
                       .dropDuplicates()
shape(small_table)

(57852, 15)


In [705]:
print('''[Quality Check]
Missing arrival ports i.e 
which do not have an IATA code or
are not based in USA
or have nulls in the airport table row''')
print(small_table.select('arrivalPort').subtract(airport.select("iata_code")).count())
print(small_table.select('arrivalPort').subtract(airport.select("iata_code")).show(truncate=False))
df3.where(col('arrivalPort') == 'AGA').take(1)




[Quality Check]
Missing arrival ports i.e 
which do not have an IATA code or
are not based in USA
or have nulls in the airport table row
39
+-----------+
|arrivalPort|
+-----------+
|AGA        |
|BAL        |
|CHI        |
|CLG        |
|DUB        |
|EDA        |
|FRB        |
|FTL        |
|HAL        |
|HAM        |
|HEF        |
|HHW        |
|INP        |
|KAN        |
|LNB        |
|LOS        |
|LVG        |
|MCA        |
|MON        |
|NAS        |
+-----------+
only showing top 20 rows

None


[Row(cicid=461933, arrivalAddress='GU', arrivalDate=datetime.date(2016, 4, 3), departureDate=datetime.date(2016, 4, 11), mode=1, bornCountry=135, visaId=2, visaType='WT', gender='M', arrivalPort='AGA', birthYear=1974, matchFlag='M')]

In [706]:
# merge airports data
small_table = small_table.join(airport.select(['ident', 'iata_code']),\
                               (small_table.arrivalPort == airport['iata_code']),\
                               how = 'inner')\
                         .drop('iata_code') \
                         .dropna() \
                         .dropDuplicates()
shape(small_table)

(24903, 16)


In [708]:
print('''[Quality Check]
Missing state code i.e 
which are present in immigration table but missing from demographics data set''')
print(small_table.select('stateCode').subtract(dem.select('State Code')).count())
print(small_table.select('stateCode').subtract(dem.select('State Code')).show(truncate=False))

[Quality Check]
Missing state code i.e 
which are present in immigration table but missing from demographics data set
3
+---------+
|stateCode|
+---------+
|CANADA   |
|VI       |
|WV       |
+---------+

None


In [709]:
# merge demographics data
small_table = small_table.join(dem.select(['State Code', 'State']),\
                               (small_table.stateCode == dem['State Code']),\
                               how = 'inner')\
                         .drop('stateCode') \
                         .dropna() \
                         .dropDuplicates()

In [710]:
detail(small_table)

Shape ==> (24210, 17)
Schema ::
root
 |-- cicid: integer (nullable = true)
 |-- arrivalAddress: string (nullable = true)
 |-- arrivalDate: date (nullable = true)
 |-- departureDate: date (nullable = true)
 |-- mode: integer (nullable = true)
 |-- visaId: integer (nullable = true)
 |-- visaType: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- arrivalPort: string (nullable = true)
 |-- statusFlagId: long (nullable = false)
 |-- countryId: integer (nullable = true)
 |-- countryName: string (nullable = true)
 |-- personId: long (nullable = false)
 |-- arrivalPortId: long (nullable = false)
 |-- ident: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- State: string (nullable = true)

+------+--------------+-----------+-------------+----+------+--------+------+-----------+------------+---------+-----------+--------+-------------+-----+----------+-------+
| cicid|arrivalAddress|arrivalDate|departureDate|mode|visaId|visaType|gender|arrivalPort|statusFl

In [711]:
list_of_tables = [country_table, time_table, arrival_port_table, person_table, status_table, airport, dem, small_table]
labels = ['country', 'time','arrival', 'person', 'status', 'airport', 'dem', 'fact']

In [712]:
for table, label in zip(list_of_tables, labels):
    f = json_beautifier(table)
    f.to_csv('{}_schema.csv'.format(label))

## In conclusion we see for the given dataset, airline mismatches reduces the maximum number of data, but in order to maintain different constraints the final fact table is populated correctly i.e. with 24210 rows