Objective: convert the i94 monthly sas7bdata files into parquet files.

Approach: during development we do this locally (within Udacity virtual environment) to avoid AWS compute  compute cost. Only when we are ready, we may migrate this process to AWS completely.

This notebook is designed to be run on Udacity virtual environment only.

Read the 12 monthly SAS7BDAT i94 files from 2016 into a Spark DataFrame. Write the Spark DataFrame into the parquet files in 3 ways:

1. no partitioning  <-- turns out to be most useful for initial EDA.
2. partiion by month  <-- for experiment only. Just to prove we can do it.
3. partition by year and month. <-- for experiment only. Just to prove we can do it.

Lesson learnt: whenever we use a Spark DataFrame column as a partition column, we lose that column in the actual dataset. e.g. if we use i94yr and i94mon as partition keys, we lose these two columns in the actual dataset. Partition keys may be useful for filtering (via Spark SQL WHERE), but may not be that useful if we want to do a GROUP BY for these (as partitioning mean we lose these potentially useful GROUP BY columns).

In [1]:
import os
import pyspark
import configparser
from pyspark.sql import SparkSession

In [2]:
# Dev config from the non-secret configuration file
config_dev = configparser.ConfigParser()
config_dev.read_file(open('aws_dev.cfg'))

In [3]:
RAW_I94_MTHLY_SAS7BDAT_DIR = config_dev.get('DATA_PATHS_UDACITY', 'RAW_I94_MTHLY_SAS7BDAT_DIR')

PAR_I94_DIR = config_dev.get('DATA_PATHS_LOCAL', 'PAR_I94_DIR')
PAR_I94_DIR_BY_NONE = config_dev.get('DATA_PATHS_LOCAL', 'PAR_I94_DIR_BY_NONE')
PAR_I94_DIR_BY_I94MON = config_dev.get('DATA_PATHS_LOCAL', 'PAR_I94_DIR_BY_I94MON')
PAR_I94_DIR_BY_I94YR_I94MON = config_dev.get('DATA_PATHS_LOCAL', 'PAR_I94_DIR_BY_I94YR_I94MON')

In [5]:
print(f"""\
Objective:

(1) For each sas7bdat file in this this directory:
    {RAW_I94_MTHLY_SAS7BDAT_DIR}
    ... read into a Spark DataFrame, 
    
(2) and immediately write it out into partitioned parquet files to these directory:
    (2a) No Partition (for overall stats summary): 
        {PAR_I94_DIR_BY_NONE}
    (2b) Partitioned by i94mon (for ease if analysis for just the 2016 data): 
        {PAR_I94_DIR_BY_I94MON}
    (2c) Partitioned by i94yr, i94mon (for on-going ETL and analysis. e.g. 2017, 2018, etc.): 
        {PAR_I94_DIR_BY_I94YR_I94MON}
"""
)

Objective:

(1) For each sas7bdat file in this this directory:
    /data/18-83510-I94-Data-2016
    ... read into a Spark DataFrame, 
    
(2) and immediately write it out into partitioned parquet files to these directory:
    (2a) No Partition (for overall stats summary): 
        par_input_data/i94/i94_parquet/by_none/
    (2b) Partitioned by i94mon (for ease if analysis for just the 2016 data): 
        par_input_data/i94/i94_parquet/by_i94mon/
    (2c) Partitioned by i94yr, i94mon (for on-going ETL and analysis. e.g. 2017, 2018, etc.): 
        par_input_data/i94/i94_parquet/by_i94yr_i94mon/



In [6]:
spark = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()

In [7]:
!ls -l {PAR_I94_DIR}

total 12
drwxr-xr-x 3 root root 4096 Dec 11 15:34 by_i94mon
drwxr-xr-x 3 root root 4096 Dec 11 15:36 by_i94yr_i94mon
drwxr-xr-x 2 root root 4096 Dec 11 15:35 by_none


In [8]:
!rm -r {PAR_I94_DIR}

In [9]:
!mkdir {PAR_I94_DIR}

In [10]:
!ls -l {PAR_I94_DIR}

total 0


Read the 12 monthly SAS7BDAT i94 files from 2016 into a Spark DataFrame. Write the Spark DataFrame into the parquet files in 3 ways:

1. no partitioning
2. partiion by month
3. partition by year and month.

In [11]:
yy_list = ['16']
mmmm_list = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']

for yy in yy_list:
    for mmm in mmmm_list:
        input_dir = f"{RAW_I94_MTHLY_SAS7BDAT_DIR}"
        input_file = f"i94_{mmm}{yy}_sub.sas7bdat"
        input_path = f"{input_dir}/{input_file}"
        
        print(f"Reading (input_path): {input_path}")
        df_spark_i94 = spark.read.format('com.github.saurfang.sas.spark').load(input_path)
        
        print(f"Writing (PAR_I94_DIR_BY_NONE): {PAR_I94_DIR_BY_NONE}")
        df_spark_i94.write.mode("append").parquet(PAR_I94_DIR_BY_NONE)
        
        print(f"Writing (PAR_I94_DIR_BY_I94MON): {PAR_I94_DIR_BY_I94MON}")
        df_spark_i94.write.mode("append").partitionBy("i94mon").parquet(PAR_I94_DIR_BY_I94MON)
        
        print(f"Writing (PAR_I94_DIR_BY_I94YR_I94MON): {PAR_I94_DIR_BY_I94YR_I94MON}")
        df_spark_i94.write.mode("append").partitionBy("i94yr", "i94mon").parquet(PAR_I94_DIR_BY_I94YR_I94MON)


Reading (input_path): /data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat
Writing (PAR_I94_DIR_BY_NONE): par_input_data/i94/i94_parquet/by_none/
Writing (PAR_I94_DIR_BY_I94MON): par_input_data/i94/i94_parquet/by_i94mon/
Writing (PAR_I94_DIR_BY_I94YR_I94MON): par_input_data/i94/i94_parquet/by_i94yr_i94mon/
Reading (input_path): /data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat
Writing (PAR_I94_DIR_BY_NONE): par_input_data/i94/i94_parquet/by_none/
Writing (PAR_I94_DIR_BY_I94MON): par_input_data/i94/i94_parquet/by_i94mon/
Writing (PAR_I94_DIR_BY_I94YR_I94MON): par_input_data/i94/i94_parquet/by_i94yr_i94mon/
Reading (input_path): /data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat
Writing (PAR_I94_DIR_BY_NONE): par_input_data/i94/i94_parquet/by_none/
Writing (PAR_I94_DIR_BY_I94MON): par_input_data/i94/i94_parquet/by_i94mon/
Writing (PAR_I94_DIR_BY_I94YR_I94MON): par_input_data/i94/i94_parquet/by_i94yr_i94mon/
Reading (input_path): /data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
Writi

In [12]:
df_spark_i94.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [16]:
spark.stop()