In [None]:
# Do all imports and installs here - Done
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType as R, StructField as Fld,\
    DoubleType as Dbl, StringType as Str, IntegerType as Int,\
    TimestampType as Timestamp, DateType as Date, LongType as Long
from pyspark.sql.types import DoubleType
from pyspark.sql.types import DateType
import pandas as pd
import re
import configparser
import os
import shutil
from pathlib import Path
from datetime import datetime

# Create environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
config = configparser.ConfigParser()
config.read('etl.cfg')

os.environ["AWS_ACCESS_KEY_ID"] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"] = config['AWS']['AWS_SECRET_ACCESS_KEY']
AWS_ACCESS_KEY_ID = config['AWS']['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = config['AWS']['AWS_SECRET_ACCESS_KEY']

# Create Spark session
def spark_session_init():
    spark = SparkSession.builder\
            .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
            .config("spark.hadoop.fs.s3a.access.key",AWS_ACCESS_KEY_ID)\
            .config("spark.hadoop.fs.s3a.secret.key",AWS_SECRET_ACCESS_KEY)\
            .enableHiveSupport().getOrCreate()
    return spark

# Procedure read out validation pair values from SAS Labels Description
def get_validation_code_from_SAS_labels(sas_input_label):
    '''
    This procedure read a input SAS Labels Description and then write out validation code datasets.
    The SAS Labels Description included validation code datasets with labels: I94RES (same to I94CIT), I94PORT, I94ADDR, I94MODE, I94VISA.
    
    Parameters
    ----------
    sas_input_label : string
        The label name of validation code dataset. Its can be one of I94RES (same to I94CIT), I94PORT, I94ADDR, I94MODE, I94VISA.
    
    Returns
    -------
    validation_code_list : validation_value_pairs(tuple(str_valid_code, str_valid_value))
        The return output is a specific SAS label list of validation code value pairs.
    '''

    # Read input SAS Labels Descriptions
    with open('I94_SAS_Labels_Descriptions.SAS') as sas_validation_code:
            labels_from_sas = sas_validation_code.read()

    # Parse labels from SAS Label Description input
    sas_labels = labels_from_sas[labels_from_sas.index(sas_input_label):]
    sas_labels = sas_labels[:sas_labels.index(';')]
    
    # Processing line by line, remove separate charaters and then append value pair
    lines = sas_labels.splitlines()
    validation_code_list = []
    for line in lines:
        try:
            valid_code, valid_value = line.split('=')
            valid_code = valid_code.strip().strip("'").strip('"')
            valid_value = valid_value.strip().strip("'").strip('"').strip()
            validation_code_list.append((valid_code, valid_value))
        except:
            pass
        
    return validation_code_list

# Procedure extract parts from SAS Labels Description
def extract_staging_sas_label(spark, label):
    """
    Process dataset 'I94_SAS_Labels_Descriptions.SAS' and output to dataframe.
    
    Steps of dataset processing:
    - Extract data from 'I94_SAS_Labels_Descriptions.SAS'.
    - Transform to dataframe.
        
    Keyword arguments:
        * label_str | label name as string datatype.

    Output:
        Return dataframe for next step.
    """
    label_name = label
    valid_code = label + "_valid_code"
    valid_value = label + "_valid_value"
    csv_output = label + "_sas_label_validation"
    
    # Create dir for output
    parent_dir = "./"
    path = os.path.join(parent_dir, csv_output)
    try:
        os.makedirs(path, exist_ok = True)
        print("Directory '%s' created successfully" % csv_output)
    except OSError as error:
        print("Directory '%s' can not be created" % csv_output)

    # Define output dataframe structure
    schema = R([
        Fld(valid_code, Str()),
        Fld(valid_value, Str())
    ])

    # Create dataframe from extracted label
    df = spark.createDataFrame(
        data=get_validation_code_from_SAS_labels(label_name),
        schema=schema
    )

    # df.write.mode('overwrite').csv(csv_output)
    shutil.rmtree(csv_output, ignore_errors=False, onerror=None)
    df.write.options(header='True', delimiter=',').csv(csv_output)

    df = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv(csv_output)

    print("Top 20 rows of {} ".format(csv_output))
    df.show()

    print("Count rows of {}: {} ".format(csv_output, df.count()))
    
    print("Check unique value of {}: {} ".format(csv_output, df.select(valid_code).distinct().count()))

    print("Staging csv files in: {}".format(csv_output))

    return df

# Procedure convert column name
def convert_column_names(df):
    '''
    This procedure standardizing column names to snake case format. 
    Format ex: customer_name, billing_address, total_price.
    
    Parameters
    ----------
    dataframe : string_of_dataframe
        The input dataframe with column names might have elements of:
            - Messy columns names
            - Including accents
            - Different delimiters
            - Casing and multiple white spaces.
        Snake case style replaces the white spaces and symbol delimiters
          with underscore and converts all characters to lower case.
    
    Returns
    -------
    Dataframe with column names has been changed to snake_case format.
    '''
    cols = df.columns
    column_name_changed = []

    for col in cols:
        new_column = col.lstrip().rstrip().lower().replace (" ", "_").replace ("-", "_")
        column_name_changed.append(new_column)

    df.columns = column_name_changed

# Procedure remove specific dir (if need)
def rmdir(directory):
    '''
    This procedure perform pure recursive a directory.
    
    Parameters
    ----------
    directory : string_of_path_to_dir
        The input directory is a path to target dir. 
        This dir and all its belong child objects wil be deleted.

        Syntax note: rmdir(Path("target_path_to_dir"))
            with Path("target_path_to_dir") returns path to dir format as 'directory' input
    
    Returns
    -------
    None
    '''
    directory = Path(directory)
    for item in directory.iterdir():
        if item.is_dir():
            rmdir(item)
        else:
            item.unlink()
    directory.rmdir()

# Defines procedure count data files - Done
def get_list_of_files(dir_name):
    # create a list of file and sub directories 
    # names in the given directory 
    listOfFile = os.listdir(dir_name)
    allFiles = list()
    # Iterate over all the entries
    for entry in listOfFile:
        # Create full path
        fullPath = os.path.join(dir_name, entry)
        # If entry is a directory then get the list of files in this directory 
        if os.path.isdir(fullPath):
            allFiles = allFiles + get_list_of_files(fullPath)
        else:
            allFiles.append(fullPath)
                
    return allFiles


In [None]:
parquet_outputs = './ws_parquet_outputs'
spark = spark_session_init()

In [None]:
print("Extracting I94 Immigration dataset (from './sas_data').")
i94immi_df = spark.read.parquet('./sas_data')

In [None]:
# Cleaning and staging I94 Immigration dataset
print("Transforming I94 Immigration dataset:")
i94immi_df.createOrReplaceTempView('i94immi_table')

In [None]:
# Drop amount of un-makesance records cause by `DepartureDate >= ArrivalDate`
spark.sql("""
    SELECT *
    FROM i94immi_table
    WHERE arrdate <= depdate
""").createOrReplaceTempView("i94immi_table")

# Add column `arrival_date = timestone + arrdate_offset_day`, with:
# - timestone = '1960-01-01' (***datetime*** datatype)
# - arrdate_offset_day = 'arrdate' (***integer*** datatype)
# - arrival_date (***datetime*** datatype)
spark.sql("""
    SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date 
    FROM i94immi_table
""").createOrReplaceTempView("i94immi_table")

# Add column `departure_date = timestone + depdate_offset_day`, with:
# - `timestone` = '1960-01-01' (***datetime*** datatype)
# - `depdate_offset_day` = 'depdate' (***integer*** datatype)
# - `departure_date` (***datetime*** datatype)
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= arrdate THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'NaN' END AS departure_date 
                FROM i94immi_table
            """).createOrReplaceTempView("i94immi_table")

# Extracted i94mode from `I94_SAS_Labels_Descriptions_SAS`
# i94mode includes:
# {'1': 'Air', '2': 'Sea', '3': 'Land', '9': 'Not reported'}
# Keep air arrival only, mean keep `i94mode=1`
spark.sql("""
    SELECT *
    FROM i94immi_table
    WHERE i94mode == 1.0
""").createOrReplaceTempView("i94immi_table")

# Mapping `i94visa` numbers to `visatype` instead
spark.sql("""
    SELECT *, CASE 
                WHEN i94visa = 1.0 THEN 'Business' 
                WHEN i94visa = 2.0 THEN 'Pleasure'
                WHEN i94visa = 3.0 THEN 'Student'
                ELSE 'NaN' END AS visa_type
    FROM i94immi_table
""").createOrReplaceTempView("i94immi_table")

# Keep user records of `male = 'M'` and `female = 'F'` only
spark.sql("""
    SELECT * 
    FROM i94immi_table 
    WHERE gender IN ('F', 'M')
""").createOrReplaceTempView("i94immi_table")

# Drop NULL value on arrival state
spark.sql("""
    SELECT *
    FROM i94immi_table
    WHERE i94addr IS NOT NULL
""").createOrReplaceTempView("i94immi_table")

# Keep necessary columns
# Convert month and year timestamp
spark.sql("""
    SELECT 
        cicid,
        i94cit,
        i94res,
        i94port,
        arrival_date,
        YEAR(arrival_date) as i94yr,
        MONTH(arrival_date) as i94mon,
        i94mode,
        i94addr,
        departure_date,
        i94bir,
        i94visa,
        count,
        dtadfile,
        biryear,
        dtaddto,
        gender,
        insnum,
        airline,
        admnum,
        fltno,
        visatype,
        visa_type
    FROM i94immi_table
""").createOrReplaceTempView('i94immi_table')

In [None]:
# Baseline the cleaned
i94immi_df = spark.sql("""
    SELECT *
    FROM i94immi_table
""")

In [None]:
# Cleaning staging output location
path = './i94immi_df_clean' # Use local storage
try:
    os.makedirs(path, exist_ok = True)
    print("Directory '%s' created successfully" % path)
except OSError as error:
    print("Directory '%s' can not be created" % path)

rmdir(Path("i94immi_df_clean")) # use this line from 2nd running

# Staging to csv file
i94immi_df.write.options(header='True', delimiter=',').csv("i94immi_df_clean")

In [None]:
# Read out from staging
i94immi_df = spark.read.options(inferSchema="true", delimiter=",", header = "true").csv("i94immi_df_clean")

In [None]:
# 'fact_i94immi'
# Transform to Spark SQL TempView
fact_i94immi_df = i94immi_df
fact_i94immi_df.createOrReplaceTempView('fact_i94immi')

In [None]:
# Create fact table fact_i94immi
fact_i94immi = spark.sql("""
    SELECT
        cicid as travel_cicid,
        i94cit as from_country_code,
        i94res as immi_country_code,
        i94port as arrival_port_code,
        arrival_date as immi_arrival_date,
        i94yr as arrival_year,
        i94mon as arrival_month,
        i94mode as airline_mode_code,
        i94addr as immi_state_code,
        departure_date,
        i94bir as traveller_age,
        i94visa as visatype_by_number,
        biryear as traveller_birth_year,
        gender as traveller_sex,
        fltno as immi_flight_code,
        visatype as visatype_by_code,
        visa_type
    FROM fact_i94immi
""")

# Save table to parquet files
fact_i94immi_parquet_outputs = parquet_outputs + '/fact_i94immi.parquet'
fact_i94immi.write.mode("overwrite").parquet(fact_i94immi_parquet_outputs)

# fact_i94immi parquet files inventory
print("'fact_i94immi' parquet files inventory")
list_of_files = get_list_of_files(fact_i94immi_parquet_outputs)
for item in list_of_files:
    print(item)
print ("Parquet files inventory finished")

In [None]:
def i94immi_dataset_handling(spark, input_dataset, parquet_outputs):
    """
    Process I94 Immigration dataset (from './sas_data').
    
    Steps of dataset processing:
    - Extract data from dataset.
    - Transform data: cleaning -> staging -> output to csv for later.
    - Load to star data modeling fact & dim tables.
    
    Keyword arguments:
        * spark                 | Spark session.
        * processed_outputs     | path to output dir

    Output:
        The output of fact&dim tables saved out to parquet files in corresponding:
        * fact_i94immi
        * dim_visa
        * dim_immi_flight
        * dim_immi_travaller
    """

    print("===== ETL processing - I94 Immigration =====")

    

    

    

    

    

    
    # Loading to fact & dim tables
    print("Loading I94 Immigration dataset to fact & dim tables:")
    
    
    
    
    
    

In [None]:
def main():

    # Dir definitions
    parquet_outputs = './ws_parquet_outputs'

    # Define Spark session initilization
    spark = spark_session_init()

    # Dataset definitions
    i94immi_dataset = 'sas_data'

    # ETL processes - done
    i94immi_dataset_handling(spark, i94immi_dataset, parquet_outputs)
    
    # All output parquet parts and files inventory - Done
    print("All parquet parts and files inventory")
    list_of_files = get_list_of_files(parquet_outputs)
    for item in list_of_files:
        print(item)
    print ("All parquet parts and files inventory finished")

    # 
    

if __name__ == "__main__":
    main()