In [199]:
import findspark
findspark.init()

import pyspark.sql.functions as F
import pyspark.sql.types as T

import pandas as pd
pd.set_option('display.max_rows', 1000)

from etl import SparkETL

In [2]:
etl = SparkETL()

In [3]:
spark = etl.get_spark()

22/05/05 15:44:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [63]:
table_schema = T.ArrayType(T.ArrayType(T.StringType(),True),True)

i94_data_dict_schema = T.StructType([
    T.StructField('I94ADDR', table_schema, True),
    T.StructField('I94CIT', table_schema, True),
    T.StructField('I94MODE', table_schema, True),
    T.StructField('I94PORT', table_schema, True),
    T.StructField('VISATYPE', table_schema, True)
])

In [67]:
i94_data_dict_staging = (
    spark.read
    .format('json')
    .schema(i94_data_dict_schema)
    .option('multiline', 'true')
    .load(etl.data_sources['ports'])
)


In [71]:
def port_dictionary(df):
    return df.select('I94PORT')

In [182]:
def explode_dictionary(df):
    return (
        df
        .select(F.explode(port_col))
        .select(
            F.trim(F.element_at(F.col('col'), 1)).alias('port_id'),
            F.trim(F.element_at(F.col('col'), 2)).alias('port_desc')
        )
    )

In [188]:
state_suffix_expr = F.col('port_desc').rlike(', ?[A-Z]{2}$')

In [189]:
def filter_with_state_suffix(df):
    return df.where(state_suffix_expr)

In [190]:
def filter_without_state_suffix(df):
    return df.where(~state_suffix_expr)

In [193]:
def drop_mx_ports(df):
    """
    Mexican immigration ports are out of scope.
    """
    return df.where(~F.col('port_desc').rlike('MX$'))

In [354]:
def drop_unrecoverable_ports(df):
    """
    - we can't expect to extract information from them, like state and city
    - we can't expect to use them to join to other tables
    """
    return df.where(
        ~(
            F.col('port_desc').like('%No PORT Code%')
            | F.col('port_desc').like('%UNKNOWN%')
            | F.col('port_desc').like('%UNIDENTIFED%')
        )
    )

In [355]:
def drop_individual_ports(df):
    return df.where(
        ~(
            F.col('port_id').like('INT') 
            | F.col('port_id').like('ZZZ')
        )
    )

In [356]:
def clean_bps_ports(df):
        
    expr = F.expr(
    """
    IF(
        RLIKE(port_desc, ", [A-Z]{2} ((\(.*\))|(#ARPT))$"),
        REGEXP_EXTRACT(port_desc, "(.*, [A-Z]{2}) ((\(.*\))|(#ARPT))$", 1),
        port_desc
    )
    """
    )
    
    return df.withColumn('port_desc', expr)

In [357]:
def manually_replace(search, replace):
    return F.expr(f"replace(port_desc, '{search}', '{replace}')")

In [None]:
def manually_clean_ports(df):
    return (
        df
        .withColumn('port_desc', manually_replace('MARIPOSA AZ', 'MARIPOSA, AZ'))
        .withColumn('port_desc', manually_replace('WASHINGTON DC', 'WASHINGTON, DC'))
        .withColumn('port_desc', manually_replace('BELLINGHAM, WASHINGTON #INTL', 'BELLINGHAM, WA'))
    )

In [391]:
def split_port_desc(df):
    return (
        df
        .withColumn(
            'state_id',
            F.trim(
                F.substring_index(F.col('port_desc'), ',', -1)
            )
        )
        .withColumn(
            'name',
            F.trim(
                F.regexp_extract(F.col('port_desc'), '^(.*), ?[A-Z]{2}$', 1)
            )
        )
    )

In [351]:
def clean_collapsed_into_ports(df):
    return (
        df
        .withColumn('port_desc', F.expr("""
        IF(
            port_desc LIKE 'Collapsed into%',
            'INT, MN',
            port_desc
        )
        """))
    )

In [352]:
def drop_port_desc(df):
    return df.drop('port_desc')

In [396]:
def non_collapsed_ports(df):
    return (
        df
        .pipe(port_dictionary)
        .pipe(explode_dictionary)
        .pipe(drop_unrecoverable_ports)
        .pipe(drop_individual_ports)
        .pipe(clean_bps_ports)
        .pipe(manually_clean_ports) 
        .pipe(drop_mx_ports)
        .pipe(clean_collapsed_into_ports)
        .pipe(filter_with_state_suffix)
        .pipe(split_port_desc)
        .pipe(drop_port_desc)
    )

In [462]:
def prepare_collapsed_ports(df):
    return (
        df
        .pipe(port_dictionary)
        .pipe(explode_dictionary)
        .where(F.col('port_desc').like('Collapsed (%'))
        .select(
            F.col('port_id').alias('from_port_id'),
            F.regexp_extract(
                'port_desc',
                '^Collapsed \((.*)\)',
                1
            )
            .alias('to_port_id')
        )
    )

In [463]:
def merge_collapsed_ports(df):
    
    collapsed = port_staging.pipe(prepare_collapsed_ports)
    
    return (
        df
        .join(
            collapsed,
            on=collapsed['to_port_id'] == df['port_id'],
            how='inner'
        )
    )

In [464]:
def drop_after_merge(df):
    return (
        df
        .select(
            F.expr("""
                IF(
                    ISNOTNULL(from_port_id),
                    from_port_id,
                    port_id
                )
            """)
            .alias('port_id'),
            'state_id',
            'name'
        )
    )

In [467]:
def collapsed_ports(df):
    return (
        df
        .pipe(non_collapsed_ports)
        .pipe(merge_collapsed_ports)
        .pipe(drop_after_merge)
    )

In [475]:
def ports_union(df):
    return (
        df.
        pipe(non_collapsed_ports)
        .union(
            df.pipe(collapsed_ports)
        )
    )

In [538]:
def remove_ports_with_non_standard_states(df):
    
    states = etl.read_clean_table('state')
    
    state_ids = states.select(F.col('state_id').alias('id'))

    return (
        df
        .join(
            state_ids,
            on=df['state_id'] == state_ids['id'],
            how='inner'
        )
        .drop('id')
    )

In [565]:
def clean_ports(df):
    return (
        df
        .pipe(ports_union)
        .pipe(remove_ports_with_non_standard_states)
    )

In [574]:
def save_clean_ports(df):
    etl.save_clean_table(
        df.pipe(clean_ports).coalesce(1),
        'port'
    )

In [575]:
save_clean_ports(port_staging)

In [576]:
etl.read_clean_table('port').toPandas()

Unnamed: 0,port_id,state_id,name
0,ALC,AK,ALCAN
1,ANC,AK,ANCHORAGE
2,BAR,AK,BAKER AAF - BAKER ISLAND
3,DAC,AK,DALTONS CACHE
4,PIZ,AK,DEW STATION PT LAY DEW
5,DTH,AK,DUTCH HARBOR
6,EGL,AK,EAGLE
7,FRB,AK,FAIRBANKS
8,HOM,AK,HOMER
9,HYD,AK,HYDER
