In [1]:
import os
from functools import reduce
import pandas as pd

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql import DataFrame

In [3]:
spark

In [3]:
spark.sparkContext._jsc.sc().listJars()

JavaObject id=o38

**Reading the codes and mapping**

In [13]:
i94port_map = dict()
with open('i94port.txt') as f:
    for line in f.readlines():
        line = line.strip().split("=")
        code = eval(line[0]).strip()
        location = eval(line[1]).strip()
        location = location.split(',')
        if len(location) < 2:
            city, state = 'unknown', 'unknown'
        else:
            city, state = location[0].strip().lower(), location[1].strip().lower()
        i94port_map[code] = (city, state)
i94res_map = dict()
with open('i94res.txt') as f:
    for line in f.readlines():
        line = line.strip().split("=")
        code = eval(line[0])
        location = eval(line[1]).strip()
        i94res_map[code] = location
i94addr_map = dict()
with open('i94addr.txt') as f:
    for line in f.readlines():
        line = line.strip().split("=")
        code = eval(line[0]).strip()
        location = eval(line[1]).strip()
        i94addr_map[code] = location

In [65]:
for k,v in i94port_map.items():
    if len(v)!=2:
        print(k,v)

In [54]:
base_dir = 'data/18-83510-I94-Data-2016'
def read_sas_directory(dir_name):
    sas_filenames = []
    df_list = []
    for filename in os.listdir(dir_name):
        sas_filenames.append(os.path.join(dir_name, filename))
    for filename in sas_filenames:
        df = spark.read.format("com.github.saurfang.sas.spark").load(filename)
        df = df.select(['i94yr',
                        'i94mon', 
                        'i94res',
                        'i94port',
                        'i94bir',
                        'i94addr',
                        'i94visa',
                        'gender',
                        'visatype'])
        df_list.append(df)
    i94_df = reduce(DataFrame.unionByName, df_list)
    return i94_df.repartition(8)

i94_data = read_sas_directory(base_dir)

In [55]:
# filename = "data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
# i94_data_sample = spark.read.format("com.github.saurfang.sas.spark").load(*sas_filenames)

In [56]:
i94_data.printSchema()

root
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- visatype: string (nullable = true)



In [66]:
@F.udf
def state_code_to_name(state_code):
    return i94addr_map.get(state_code, "all other codes")
        
@F.udf
def source_code_to_name(source_code):
    return i94res_map.get(source_code, "invalid")

@F.udf
def destination_code_to_city(destination_code):
    if destination_code in i94port_map:
        return i94port_map[destination_code][0]
    return "unknown"

In [67]:
i94_data_transformed = i94_data.select(
    F.col('i94yr').cast(IntegerType()).alias('immigration_year'),
    F.col('i94mon').cast(IntegerType()).alias('immigration_month'),
    source_code_to_name(F.col('i94res')).alias('immigration_source'),
    destination_code_to_city(F.col('i94port')).alias('immigration_destination_port'),
    F.col('i94bir').cast(IntegerType()).alias('immigrant_age'),
    state_code_to_name(F.col('i94addr')).alias('immigration_state'),
    F.col('i94visa').alias('visa_code'),
    F.col('gender'),
    F.col('visatype').alias('visa_type')
)
i94_data_transformed.printSchema()

root
 |-- immigration_year: integer (nullable = true)
 |-- immigration_month: integer (nullable = true)
 |-- immigration_source: string (nullable = true)
 |-- immigration_destination_port: string (nullable = true)
 |-- immigrant_age: integer (nullable = true)
 |-- immigration_state: string (nullable = true)
 |-- visa_code: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_type: string (nullable = true)



In [68]:
# i94_data_transformed.show(10)

In [61]:
# i94_data_transformed.groupby('immigration_month').count().show()

+-----------------+-------+
|immigration_month|  count|
+-----------------+-------+
|               12|3432990|
|                1|2847924|
|                6|3574989|
|                3|3157072|
|                5|3444249|
|                9|3733786|
|                4|3096313|
|                8|4103570|
|                7|4265031|
|               10|3649136|
|               11|2914926|
|                2|2570543|
+-----------------+-------+



In [69]:
i94_data_transformed.printSchema()

root
 |-- immigration_year: integer (nullable = true)
 |-- immigration_month: integer (nullable = true)
 |-- immigration_source: string (nullable = true)
 |-- immigration_destination_port: string (nullable = true)
 |-- immigrant_age: integer (nullable = true)
 |-- immigration_state: string (nullable = true)
 |-- visa_code: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_type: string (nullable = true)



In [70]:
i94_data_transformed.repartition(8).write.mode('overwrite').parquet('data/i94_transformed/')

In [71]:
i94_data_transformed.select('immigration_destination_port').show(100)

+----------------------------+
|immigration_destination_port|
+----------------------------+
|                    new york|
|                       agana|
|                       agana|
|                     chicago|
|                      boston|
|                 los angeles|
|                     houston|
|                       miami|
|                       miami|
|                     seattle|
|                     seattle|
|                     seattle|
|                       miami|
|                      saipan|
|                     unknown|
|                    new york|
|                       miami|
|                    new york|
|               san francisco|
|               san francisco|
|                 los angeles|
|                 los angeles|
|                 los angeles|
|                 los angeles|
|                 los angeles|
|                 los angeles|
|                     chicago|
|            newark/teterboro|
|                      boston|
|       