# Data Preparation for Ocean Carrier Alliances Project

This notebook loads, cleans, and prepares the data used to analyse strategic alliances among containerized maritime freight carriers, which we call Ocean Carrier Alliances (OCAs). The primary data comes from S&P's [PIERS BOL database](https://www.spglobal.com/marketintelligence/en/mi/products/piers.html), which is processed via the seperate PIERS Data Project. To the PIERS data, we add data on alliance start and end dates as well as geographic scope from the [Federal Maritime Commission Agreement Library](https://www2.fmc.gov/FMC.Agreements.Web/Public), as well as [vessel capacity data](https://ndclibrary.sec.usace.army.mil/searchResults?series=Foreign%20Traffic%20Vessel%20Entrances%20Clearances) from the US Army Corp of Engineers. 

See the github repo and the README for more detail. 

In [1]:
#preliminaries 
import numpy as np
import pandas as pd #v2.1.3
import polars as pl #v1.1.0
import plotly_express as px #v0.4.1 
import datetime as dt
from sklearn.cluster import HDBSCAN
import time
import os
import geopy.distance
from geopy.geocoders import Bing
from geopy.extra.rate_limiter import RateLimiter

#display settings
pd.set_option('display.max_columns', None)

#enable string cache for polars categoricals
pl.enable_string_cache()

## PIERS BOL Data

Our primary dataset for this study comes from the [PIERS Data Project](https://github.com/epistemetrica/PIERS-Data-Project). The initial codeblock loads the relevant columns from this database into seperate Polars LazyFrames for the import and export data, and subsequent blocks address the various issues in the data. Although the PIERS Data Project includes data from Q12005 through Q12024, we limit the data here to the beginning of 2007 through the end of 2023, as the pre-2007 has outstanding structural issues and we prefer to look at entire years. 

In [2]:
#load PIERS bol data lazyframes
imports_lf = (
    pl.scan_parquet('../data/piers_raw/imports/*.parquet', parallel='columns')
    #drop unused columns 
    .select(
        #'weight',
        #'weight_unit',
        #'qty',
        #'qty_type',
        'teus',
        #'value_est',
        'date',
        #'container_piece_count',
        #'commod_short_desc_qty',
        'origin_territory',
        'origin_region',
        'arrival_port_code',
        'arrival_port_name',
        'departure_port_code',
        'departure_port_name',
        #'dest_final',
        'coast_region',
        #'clearing_district',
        #'place_receipt',
        #'shipper_name',
        #'shipper_address',
        #'consignee_name',
        #'consignee_address',
        #'notify_party1_name',
        #'notify_party1_address',
        #'notify_party2_name',
        #'notify_party2_address',
        #'commod_desc_raw',
        #'container_id_marks',
        #'marks_desc',
        'hs_code',
        #'joc_code',
        #'commod_short_desc',
        #'container_ids',
        'carrier_name',
        'carrier_scac',
        'vessel_name',
        'voyage_number',
        #'precarrier',
        'vessel_id',
        #'inbond_code',
        #'transport_mode',
        #'bol_number',
        'direction',
        'bol_id',
        'year',
        'month',
        'lane_id'
    )
    #filter dates
    .filter(pl.col('year').is_in(range(2007, 2024)))
)

exports_lf = (
    pl.scan_parquet('../data/piers_raw/exports/piers_exports_raw.parquet', parallel='columns') 
    #drop unused columns
    .select(
        #'shipper',
        #'shipper_address',
        #'weight',
        #'weight_unit',
        #'qty',
        #'quantity_type',
        'teus',
        'carrier_name',
        'carrier_scac',
        'vessel_name',
        'voyage_number',
        #'bol_number',
        'vessel_id',
        #'value_est',
        'departure_port_code',
        'departure_port_name',
        #'container_ids',
        #'container_piece_count',
        'coast_region',
        #'commod_desc_raw',
        #'commod_short_desc',
        'hs_code',
        #'joc_code',
        #'commod_short_desc_qty',
        'date',
        #'origin',
        'dest_territory',
        'dest_region',
        'arrival_port_code',
        'arrival_port_name',
        'direction',
        'bol_id',
        'year',
        'month',
        'lane_id'
    )
    #filter dates
    .filter(pl.col('year').is_in(range(2007, 2024)))
)

#create main lazyframe
main_lf = pl.concat([imports_lf, exports_lf], how='diagonal')

In [3]:
#notebook functions

#fill nulls in volume cols with mean
def fill_volume(lf):
    '''ad hod function to fill volume columns with their means'''
    return (
        lf
        .with_columns([
            pl.col('teus').replace(0,None).fill_null(strategy='mean'),
            #pl.col('weight').replace(0,None).fill_null(strategy='mean'),
            #pl.col('qty').replace(0,None).fill_null(strategy='mean')
            ])
        )

#plotly graph inspecting nulls over time by group
def nulls_over_time_plotly(data_lf, group_var, time_var, value_var, title=False):
    '''
    Plots proportion of null values over time by group.
    INPUTS:
        data_lf - polars lazyframe containing the relevant data
        group_var - str - the name of the column by which to group
        time_var - str - the name of the time column (e.g., year, month) over which values will be counted
        value_var - str - the name of the column containing the variable in question
        title (default=False) - str - the title of the graph
    OUTPUT:
        a plotly express figure
    DEPENDS ON:
        polars
        plotly express 
    '''
    df = (
        #select relevant columns
        data_lf.select([group_var, time_var, value_var])
        #group by, creating null count and non-null count cols
        .group_by(group_var, time_var)
        .agg([pl.col(value_var).null_count().alias('null_count'),
                pl.col(value_var).count().alias('count')])
        #compute percent null and fill new column
        .with_columns((pl.col('null_count')/(pl.col('count')+pl.col('null_count'))).alias('null_percent'))
        #cast group col to string to allow sensible ordering of legend
        .cast({group_var:pl.Utf8})
        #sort by date (to allow proper visualization of lines) and group (for legend ordering) 
        .sort(time_var, group_var)
    ).collect()
    #plot
    fig = px.line(
        data_frame=df,
        x=time_var, y='null_percent',
        color=group_var,
        title= 'Count of nulls over time by source frame.' if not title else title
    )
    fig.show()
    del df

#fill nulls over groups given a single unique value per group
def fill_nulls_by_group(data_lf, group_vars, val_var):
    '''Fills null values by group if and only if the val_var for that group contains exactly one non-null unique value.
    INPUTS:
        data_lf - polars lazyframe containing the relevant data
        group_vars - iterable - the names of the columns by which groups will be created
        val_var - string - the name of the column in which nulls will be filled
    OUTPUT:
        filled_lf - the resultant lazyframe 
    DEPENDS ON:
        polars - current version written in polars 0.20.1
    '''
    filled_lf = (
        data_lf.with_columns(
            #if the group contains exactly one unique value: 
            pl.when(pl.col(val_var).drop_nulls().unique(maintain_order=True).len().over(group_vars)==1)
            #then fill the group with that value
            .then(pl.col(val_var).fill_null(pl.col(val_var).drop_nulls().unique(maintain_order=True).first().over(group_vars)))
            #otherwise do nothing
            .otherwise(pl.col(val_var))
            )
        )
    return filled_lf

#assign primary carrier
def add_primary_carrier(lf):
    '''ad hoc function to find primary carrier for each vessel and indicate cargo sharing'''
    lf = (
        #sum teus over vessel, month, and carrier
        lf.with_columns(
            pl.col('teus').sum()
            .over('vessel_id', 'month', 'unified_carrier_scac')
            .alias('sum_teus')
            )
        #select carrier that moved the most cargo on that vessel during that month
        .with_columns(
            pl.col('unified_carrier_scac')
            .sort_by('sum_teus', descending=True)
            .drop_nulls().first()
            .over('vessel_id', 'month')
            .alias('vessel_owner')
            )
        #add bool col if bol is from primary carrier
        .with_columns(
            (pl.col('unified_carrier_scac')==pl.col('vessel_owner'))
            .alias('primary_cargo')
            )
        #set related columns to missing when vessel_id is missing
        .with_columns(
            pl.when(pl.col('vessel_id').is_null()).then(pl.lit(None)).otherwise(pl.col('vessel_owner')).alias('vessel_owner'),
            pl.when(pl.col('vessel_id').is_null()).then(pl.lit(None)).otherwise(pl.col('primary_cargo')).alias('primary_cargo')
        )
        #add shared teu column for convenience 
        .with_columns(
            (pl.col('teus')*(1-pl.col('primary_cargo')))
            .alias('shared_teus')
        )
        #drop ad hoc sum_teus col
        .drop('sum_teus')
    )
    return lf

#plot proportion of shared cargo over time
def sharing_over_time_plotly(data_lf, group_var, include_missing_vessels=True, limit=10, title=False):
    '''
    Plots proportion of shared cargo over time (months) by group_var.
    INPUTS:
        data_lf - polars lazyframe containing the relevant data
        group_var - str - the name of the column by which to group
        include_missing_vessels - bool - default=True, when False, drops missing vessel_ids
        title (default=False) - str - the title of the graph
    OUTPUT:
        a plotly express figure
    DEPENDS ON:
        polars
        plotly express 
    '''
    if not include_missing_vessels:
        df = data_lf.drop_nulls('vessel_id')
    else:
        df = data_lf
    
    df = (
        #select relevant columns
        df.select([group_var, 'month', 'primary_cargo', 'teus'])
        #sum teus over each group-month-shared 
        .group_by(group_var, 'month')
        .agg(
            (pl.col('teus')*pl.col('primary_cargo')).sum().alias('total_primary'),
            pl.col('teus').sum().alias('total_teus')
        )
        #create proportion shared
        .with_columns((1-(pl.col('total_primary')/pl.col('total_teus'))).alias('prop_shared'))
        #cast group col to string to allow sensible ordering of legend
        .cast({group_var:pl.Utf8})
        #sort by date (to allow proper visualization of lines) and group (for legend ordering) 
        .sort('month')
    ).collect()

    #limit categories
    top_groups = (
        data_lf.group_by(group_var)
        .agg(pl.col('teus').sum())
        .sort('teus', descending=True)
        .select(group_var)
        .limit(limit)
        .collect()
        .to_series()
        .cast(pl.Utf8)
    )
    
    #plot
    fig = px.line(
        data_frame=df.filter(pl.col(group_var).is_in(top_groups)).with_columns(pl.col('month').str.to_datetime('%Y%m')),
        x='month', y='prop_shared',
        color=group_var,
        title= 'Proportion of shared cargo over time.' if not title else title,
        labels={
            'prop_shared':'Proportion of cargo from non-primary carrier',
            'month':'Month'
        }
    )
    fig.show()

def cluster_dates(lf, samples=None, save_parquet=None):
    '''
    Finds arrival/departure date using the following algorithm:
        1. Create 1-D dataframe of dates for each vessel-lane pair, 
            with one date occurance per TEU processed on that date
        2. Find clusers of dates using SciKitLearn's HDBSCAN
        3. Assign mode date of each cluster as the arrival/departure date
        4. Assign any bols with dates occuring between the modes as arriving/departing
            on the date of the preceeding mode.
        5. Join imputed arrival/departure dates into main lazyframe. 
    INPUTS
        lf - a polars LazyFrame containing the relevant data
        samples - int - number of random samples 
        save_parquet - path - the location where the clustering output will be saved 
    OUTPUTS
        lf - the original lazyframe with imputed dates 
    '''
    #create relevant columns in main lf
    lf = (
        lf
        #create us_port
        .with_columns(
            pl.when(pl.col('direction')=='import')
            .then(pl.col('arrival_port_code'))
            .otherwise(pl.col('departure_port_code'))
            .alias('us_port')
        )
        #create vessel_port_pair
        .with_columns(
            (pl.col('vessel_id').cast(pl.Utf8)+'_'+pl.col('us_port').cast(pl.Utf8))
            .cast(pl.Categorical)
            .alias('vessel_port_pair')
        )
    )
    #collect relevant columns from lf
    begin_collect = time.time()
    df = (
        lf.group_by('date', 'vessel_port_pair')
        #get sum of TEUs on each date 
        .agg(pl.col('teus').sum().alias('sum_teus'))
        #drop missing vessel-port pairs
        .drop_nulls(subset=['vessel_port_pair'])
        #sort by date
        .sort('date')
        .collect()
    )
    print('clustering data collected; time = {:.2f} minutes'.format((time.time() - begin_collect)/60))
    #initialize variables
    samples=samples 
    if samples:
        pairs = df.select('vessel_port_pair').unique().sample(samples).to_series()
    else:
        pairs = df.select('vessel_port_pair').unique().to_series()
    pairs_df = pl.DataFrame()
    #loop through vessel-port pairs
    print('Looping through vessel-port pairs')
    for i in range(len(pairs)):
        if i%1000 == 0:
            begin_block = time.time()
        pair = pairs[i]
        #make single-column dataframe of dates where each date corresponds to a single TEU that arrived on that day 
        pair_1d = (
            df.filter(pl.col('vessel_port_pair')==pair)
            .select('date', pl.col('sum_teus').ceil())
            #explode dates by each teu 
            .select(pl.exclude('sum_teus').repeat_by('sum_teus').explode())
        )
        #find minimum number of occurances of a single date (needed for HDBSCAN param)
        min_sample = pair_1d.group_by('date').agg(pl.col('date').count().alias('count')).min().row(0)[1]
        #skip empty pairs
        if min_sample == 0:
            continue
        #skip vessel_port pairs with less than 2 dates
        if len(pair_1d) < 2:
            continue
        #instantiate clusterer
        clusterer = HDBSCAN(min_cluster_size=50, min_samples=min_sample) #we need to find a dynamic way of seleting these parameters
        #get clusters
        clusterer.fit(pair_1d)
        #add back to pair_1d
        pair_df = (
            pair_1d
            #add cluster column
            .with_columns(
                pl.Series(name='cluster', values=clusterer.labels_)
            )
            #add imputed date column
            .with_columns(
                    #when date matches the mode of each cluster
                    pl.when(pl.col('date') == pl.col('date').mode().first().over('cluster'))
                    #fill with that date, otherwise fill with null
                    .then(pl.col('date'))
                    .otherwise(pl.lit(None))
                    #forward fill the arrival date to the mode of next cluster
                    .forward_fill()
                    #backward fill the first part of first cluster
                    .backward_fill()
                    #name column
                    .alias('date_imputed')
                )
            #groupby date to simplify
            .group_by('date')
            .agg(pl.col('date_imputed').first())
            #add pair label
            .with_columns(pl.lit(pair).alias('vessel_port_pair').cast(pl.Categorical))
        )
        #init or concat pairs_df
        if i == 0:
            pairs_df = pair_df   
        else:
            pairs_df = pl.concat([pairs_df,pair_df], how='vertical')
        #print status update
        if (i != 0) and ((i+1)%1000 == 0):
            print('{:,} pairs of {:,} clustered. The previous 1000 pairs took {:.2f} minutes.'.format(i+1, len(pairs), (time.time()-begin_block)/60))
    #join imputed dates to main lf
    lf = (
        lf.join(pairs_df.lazy(), on=['date', 'vessel_port_pair'], how='left')
    )
    print('Total time to cluster dates: {:.2f} hours'.format((time.time()-begin_collect)/3600))
    #rename date columns
    lf = lf.rename({'date':'date_raw', 'date_imputed':'date'})
    pairs_df = pairs_df.rename({'date':'date_raw', 'date_imputed':'date'})
    #save pairs data to parquet if indicated 
    if save_parquet:
        pairs_df.write_parquet(save_parquet)
    #return main lf
    return lf

def add_alliance_data(lf):
    '''ad hoc function to add alliance info into main lazyframe'''
    lf = (
        lf
         #join alliance data into main lf
        .join(alliances_df.lazy(), on=['unified_carrier_scac', 'date'], how='left')
        #create related columns
        .with_columns(
            #create boolean for alliance membership
            pl.col('alliance').is_not_null().alias('alliance_member'),
            #set missing alliance_member cells to "Non-alliance Carriers"
            pl.col('alliance').replace({None:'Non-alliance Carriers'})
        )
        #get primary carrier alliance
        .with_columns(
            pl.when(pl.col('unified_carrier_scac')==pl.col('vessel_owner'))
            .then(pl.col('alliance'))
            .otherwise(pl.lit(None))
            .alias('pc_alliance')
        )
        #fill nulls in primary carrier alliance over month and vessel
        .with_columns(
            pl.col('pc_alliance')
            .fill_null(strategy='forward')
            .over('vessel_id', 'month')
        )
        .with_columns(
            pl.col('pc_alliance')
            .fill_null(strategy='backward')
            .over('vessel_id', 'month')
        )
        #get cargo source column
        .with_columns(
            pl.when((pl.col('primary_cargo')==True)&(pl.col('alliance_member')==True))
            .then(pl.lit('ally'))
            .otherwise(
                pl.when((pl.col('alliance_member')==True)&(pl.col('alliance')==pl.col('pc_alliance')))
                .then(pl.lit('ally'))
                .otherwise(pl.lit('non-ally'))
            )
            .alias('cargo_source')
        )
    )
    return lf 

def geocoder_trg(locations, bing_rest_api_key='Am19ZYf8qoO0j2DJGJDu6oZJkhtyvG9v9-8zJ-RDowSZ8QIKLMbjDIq0w7qAzSv1', 
                 df_export=False):
    '''
    Converts location inputs to geographic coordinates (decimal degrees format, datum WGS-84) using the Bing REST Services geocoder API 
    INPUTS:
        locations - array-like - the address/es or place name/s to be geocoded.
        bing_rest_api_key - an API key issued by Bing Rest Services. Uses Adam Wilson's by default.
        df_export - boolean - default=False - when True, returns a pandas dataframe containing the 'locations' inputs in the first column, 
                    the latitude in the second column, and the longitude in the third column.  
    RETURNS:
        when df_export = False (default), returns a list of (lat, long) tuples corresponding to the 'locations' input list. Uninterpretable
                    inputs are listed as np.NaN.
        when df_export = True, returns a pandas dataframe containing the 'locations' inputs in the first column, 
                    the latitude in the second column, and the longitude in the third column.
    RELIES ON:
        pandas
        numpy
        geopy
        Bing from geopy.geocoders
        RateLimiter from geopy.extras
    '''
    #define geocoder function
    def geocoder_latlong(loc):
        '''returns latitute and longitude of given location if interpretable by Bing, else NaN'''
        #instantiate Bing client
        geocoder_bing = Bing(bing_rest_api_key)
        #rate limit
        geocoder_bing = RateLimiter(geocoder_bing.geocode, min_delay_seconds=0.5)
        #geocode location
        geoloc = geocoder_bing(loc)
        #return latitude and longitude results 
        if type(geoloc) == geopy.Location:
            return geoloc.latitude, geoloc.longitude
        else:
            return np.NaN, np.NaN
    #coerse locations input to pd.Series
    locations = pd.Series(locations)
    #init df
    df = pd.DataFrame({'locations': locations})
    #apply geocoder to each location 
    df[['lat', 'long']] = df.apply(lambda row: geocoder_latlong(row), axis=1, result_type='expand')
    #create coordinate list
    coord_list = [coord if ~np.isnan(coord[0]) else np.NaN for coord in list(zip(df.lat, df.long))]
    #return results 
    if df_export:
        return df
    elif len(df)==1:
        return coord_list[0]
    else:
        return coord_list


### Drop Duplicate BOLs

Conversations with S&P indicate that BOLs are unique, thus as an initial preparation step, we drop duplicate BOLs from the database.  

In [4]:
main_lf = main_lf.unique(subset='bol_id')

### Unify Port, Region, and Territory Names

Like PIERS, we use the US Customs Port Code system to identify ports and lanes. All major container ports in the world are assigned unique codes in this system; however, smaller ports are sometimes assigned generalized codes (e.g. 55900 - "All other Singapore Ports"). Since our analysis is concerned, at least for the time being, with the major ports and lanes, we unify the names for ports, regions, and territories across the US Customs Port Codes. 

The port, region, and territory names occaisionally include typographical errors or inconsistencies in spelling; this unification also eliminated this issue. 

In [5]:
main_lf = (
    main_lf
    #unify port, region and territory names 
    .with_columns(
            #find most commonly used departure port name for a given port code
            pl.col('departure_port_name').drop_nulls().mode().first().over('departure_port_code').alias('departure_port_name'),
            #find most commonly used arrival port name for a given port code
            pl.col('arrival_port_name').drop_nulls().mode().first().over('arrival_port_code').alias('arrival_port_name'),
            #get best region and territory names
            pl.col('origin_territory').drop_nulls().mode().first().over('lane_id').alias('origin_territory'),
            pl.col('origin_region').drop_nulls().mode().first().over('lane_id').alias('origin_region'),
            pl.col('dest_territory').drop_nulls().mode().first().over('lane_id').alias('dest_territory'),
            pl.col('dest_region').drop_nulls().mode().first().over('lane_id').alias('dest_region')
        )
        #get lane name
        .with_columns(
            (pl.col('departure_port_name').cast(pl.Utf8)+' — '+pl.col('arrival_port_name').cast(pl.Utf8))
            .str.to_titlecase()
            .cast(pl.Categorical)
            .alias('lane_name')
        )
)

### Carrier names and Standard Carrier Alpha Codes (SCAC)

Similar to port names, carrier names are often long strings of inconsistent nature (e.g. "Maersk", "MAERSK LINE", "A.P. Moller Maersk", etc.), and SCAC codes can change over time for the same carrier. To address these issues, we simply carrier names to the most commonly used name string for a given SCAC, and we simplify SCAC codes to the most recent SCAC used for a given carrier name. 

As carrier alliances apply only to containerized freight, we also drop instances of bulk cargo, which are coded in this data as "BULK" under the carrier_scac column.

In [6]:
#drop bulk carriers
main_lf = main_lf.filter(pl.col('carrier_scac')!='BULK')


In [7]:
#unify carrier names and scacs

main_lf = (
    main_lf
    #get most commonly used form of carrier name
    .with_columns(
        pl.col('carrier_name').drop_nulls().mode().first().over('carrier_scac')
        .alias('unified_carrier_name')
    )
    #get most commonly used form of SCAC
    .with_columns(
        pl.col('carrier_scac').drop_nulls().mode().first().over('unified_carrier_name')
        .alias('unified_carrier_scac')
    )
)

### Tanker Transfer Ports

Some BOLs list offshore tanker transfer ports as their origin or destination. Since these are not relevant to containerized carrier alliances, we drop them from the database. 

In [8]:
main_lf = (
    main_lf
    .filter((pl.col('departure_port_code').cast(pl.Utf8).str.starts_with('999') == False))
    .filter(pl.col('arrival_port_code').cast(pl.Utf8).str.starts_with('999') == False)
)

### Missing Data

In [9]:
nulls_over_time_plotly(main_lf, group_var='direction', time_var='month', value_var='teus', title='Missing volume data over time.')

As can be seen above, the PIERS BOL data on TEUs is incomplete prior to 2015. Since each row corresponds to a unique bill of lading, and since we are not concerned with changes in volume at the BOL level, we fill this missing data with the average TEUs. 

In [10]:
#fill missing volumes with the mean value
main_lf = fill_volume(main_lf)

#### Missing Vessel info

A substantial portion of BOLs do not include vessel names or IDs. Note there is perfect correlation between missing vessels names and missing vessel IDs. 

In [11]:
nulls_over_time_plotly(
    data_lf=main_lf,
    group_var='direction',
    time_var='month',
    value_var='vessel_name',
    title='Proportion of Missing Vessel Names over time.')

Since our analysis concerns the practice of carriers sharing cargo with other carriers on a single vessel, we drop missing vessels.

In [12]:
#drop missing vessels
main_lf = main_lf.drop_nulls(subset='vessel_id')

We also drop bols with missing port data for the same reason. 

In [13]:
#drop missing ports
main_lf = main_lf.drop_nulls(subset=['arrival_port_code', 'departure_port_code'])

## Establish Primary Carrier

We define the primary carrier of a vessel as the carrier representing the mode of cargo on that vessel during any given month. Cargoes from the primary carrier are deemed "primary cargo" and cargo from other carriers are deemed "shared cargo." 

In [14]:
#add primary carrier
main_lf = add_primary_carrier(main_lf)

## Port Visit Identification

Unfortunately, the date listed on each BOL does not necessarily correspond to the actual date of the ship's US port visit, with date data spread out over several days to a week for a single vessel and port, even when the associated routes take several weeks to complete. We suspect this is due to US Customs processing delays and inconsistency in whether the PIERS data records the actual port visit date or the date on which Customs processed the BOL (i.e., the date data are noisy). We address this problem by emplying a Hierarchical Density-Based Spatial Clustering of Applications with Noise (HDBSCAN) algorithm to cluster the dates surrounding port visits together into a single date. This allows us to analyse the data with unique combinations of vessel, port, and date, as is the case for the actual activity of ships. 

The problem of noise in the original date data is most problematic when routes are very short (e.g., between Florida and the Dominican Republic), which is a very small percentage of cargo volumes in the database. In other words, the HDBSCAN clustering successfully solves the problem for the majority of volumes represented in the data. If more accurate date data is required, future analyses may add route distances from the SeaRoute database as a parameter in the clustering algorithm. In recent years, [vessel entrances and clearances data](https://ndclibrary.sec.usace.army.mil/searchResults?series=Foreign%20Traffic%20Vessel%20Entrances%20Clearances) are available from the US Army Corp of Engineers, which could also be used if the need arises. 

### HDBSCAN Clustering 



In [15]:
%%script echo skipping

#cluser imports
main_lf = cluster_dates(
    main_lf, 
    save_parquet='../data/misc/imputed_date_pairings.parquet'
    );

skipping


In [16]:
#%%script echo skipping

#load previously clusereed data
clusters_df = pl.read_parquet('../data/misc/imputed_date_pairings.parquet')

#add vessel_port_pair for join
main_lf = (
    main_lf.with_columns(
        pl.when(pl.col('direction')=='import')
        .then(pl.col('vessel_id').cast(pl.Utf8) +'_'+pl.col('arrival_port_code').cast(pl.Utf8))
        .otherwise(pl.col('vessel_id').cast(pl.Utf8) +'_'+pl.col('departure_port_code').cast(pl.Utf8))
        .alias('vessel_port_pair')
        .cast(pl.Categorical)
    )
)
#join to main
main_lf = (
    main_lf
    .join(
        clusters_df.lazy().rename({'date':'date_imputed'}),
        left_on=['vessel_port_pair', 'date'],
        right_on=['vessel_port_pair', 'date_raw'],
        how='left'
    )
    #drop unnecessary columns and rename date
    .drop('date', 'vessel_port_pair')
    .rename({'date_imputed':'date'})
)

## Add other data

### Alliance Membership

Data on which carriers are part of which alliances was collected from alliance agreements filed with the Federal Maritime Commission.

In [17]:
#load alliance membership data from csv (NOTE polars parsing dates is apparently broken in 1.1.0, hence the use of pandas here)
alliances_df = pd.read_csv('../data/alliance agreements/masterAlliances.csv')
alliances_df.date = pd.to_datetime(alliances_df.date)
alliances_df = (
    pl.DataFrame(alliances_df)
    .cast({'unified_carrier_scac':pl.Categorical, 'date':pl.Datetime})
    .select('date', 'unified_carrier_scac', 'alliance')
)


Columns (2) have mixed types. Specify dtype option on import or set low_memory=False.


Could not infer format, so each element will be parsed individually, falling back to `dateutil`. To ensure parsing is consistent and as-expected, please specify a format.



In [18]:
main_lf = add_alliance_data(main_lf)

### Add Drewery Rate Data

In [19]:
#load drewery data
drewery_df = (
    #load CSV
    pl.read_csv('../data/rates/tidy_rates.csv')
    #filter by US ports
    .filter(pl.col('route').str.contains('US '))
    #drop lanes containing "via" - these are not coast ports
    .filter(~pl.col('route').str.contains(' via '))
)

#get piers_lanes_df (used to merge back into main_lf)
piers_lanes_df = (
    main_lf
    #select columns
    .select('lane_id', 'lane_name', 'origin_territory', 'departure_port_name', 
            'coast_region', 'dest_territory', 'arrival_port_name', 'direction')
    #group by to get modes (NOTE: territory data is uncommonly messy/incorrect; this step avoids gets around that issue)
    .group_by('direction', 'lane_id')
    .agg(
        pl.all().mode().first()
    )
    #construct origin and destination port names for geocoder
    .with_columns(
        pl.when(pl.col('direction')=='import')
        .then(pl.col('origin_territory').cast(pl.Utf8)+' '+pl.col('departure_port_name').cast(pl.Utf8))
        .otherwise('US Port of '+pl.col('departure_port_name').cast(pl.Utf8))
        .alias('piers_origin'),
        pl.when(pl.col('direction')=='import')
        .then('US Port of '+pl.col('arrival_port_name').cast(pl.Utf8))
        .otherwise(pl.col('dest_territory').cast(pl.Utf8)+' '+pl.col('arrival_port_name').cast(pl.Utf8))
        .alias('piers_dest')
    )
    #drop unnessary cols
    .drop('origin_territory', 'departure_port_name', 'coast_region', 'dest_territory', 'arrival_port_name', 'direction')
    #recast to categorical data
    .cast(pl.Categorical)
    #drop duplicates
    .unique()
    #drop nulls
    .drop_nulls()
    #collect to memory
    .collect()
)

#get piers_ports_df
#convert origin ports to series
piers_ports = (
    piers_lanes_df
    .select('piers_origin')
    .rename({'piers_origin':'piers_ports'})
    .drop_nulls()
    .unique()
    .to_series()
)
#append dest ports
piers_ports_df = (
    pl.DataFrame(
        piers_ports.append(
            piers_lanes_df
            .select('piers_dest')
            .drop_nulls()
            .unique()
            .to_series()
        )
    )
    #cast to strings
    .cast(pl.Utf8)
    #convert to pandas
    .to_pandas()
)

#get drewery_lanes_df
drewery_lanes_df = (
    drewery_df
    .select('route')
    .unique()
    #split route col
    .with_columns(
        pl.col('route').str.split_exact(by=' to ', n=1)
        .alias('split')
    )
    #unnest into separate cols
    .unnest('split')
    #rename
    .rename({
        'field_0':'drewery_origin',
        'field_1':'drewery_dest'
    })
    #drop nulls
    .drop_nulls()
)

#get drewery_ports
#convert origin col to series
drewery_ports = (
    drewery_lanes_df
    .select('drewery_origin')
    .rename({'drewery_origin':'drewery_port'})
    .drop_nulls()
    .unique()
    .to_series()
)
#append dest col
drewery_ports_df = (
    pl.DataFrame(
        drewery_ports.append(
            drewery_lanes_df
            .select('drewery_dest')
            .drop_nulls()
            .unique()
            .to_series()
        )
    )
    #drop non-coast ports
    .filter(~pl.col('drewery_port').str.contains(' via '))
    #convert to pandas
    .to_pandas()
)

#### Geocode ports

In [20]:
%%script echo skipping #api calls are limited; only execute when necessary

#geocode drewery ports
drewery_ports_df['drewery_port_loc'] = (
    drewery_ports_df.drewery_port
    .apply(lambda r: geocoder_trg(r))
    .dropna()
)

#geocode piers ports
piers_ports_df['piers_port_loc'] = (
    piers_ports_df.piers_ports
    .apply(lambda r: geocoder_trg(r))
    .dropna()
)

#save geolocations
drewery_ports_df.to_parquet('../data/misc/drewery_port_geolocations.parquet')
piers_ports_df.to_parquet('../data/misc/piers_port_geolocations.parquet')

skipping #api calls are limited; only execute when necessary


In [21]:
#%%script echo skipping
#load previously geocoded data
drewery_ports_df = pl.read_parquet('../data/misc/drewery_port_geolocations.parquet').to_pandas()
piers_ports_df = pl.read_parquet('../data/misc/piers_port_geolocations.parquet').to_pandas()

In [22]:
#merge distances back to lanes

#merge drewery origin locs
drewery_loc_lanes_df = (
    drewery_lanes_df
    .join(
        pl.DataFrame(drewery_ports_df),
        left_on='drewery_origin',
        right_on='drewery_port',
    )
    .rename({'drewery_port_loc':'drewery_origin_loc'})
)
#merge drewery dest locs
drewery_loc_lanes_df = (
    drewery_loc_lanes_df
    .join(
        pl.DataFrame(drewery_ports_df),
        left_on='drewery_dest',
        right_on='drewery_port',
    )
    .rename({'drewery_port_loc':'drewery_dest_loc'})
    .unique()
)

#merge drewery origin locs
piers_loc_lanes_df = (
    piers_lanes_df
    .join(
        pl.DataFrame(piers_ports_df).cast({'piers_ports':pl.Categorical}),
        left_on='piers_origin',
        right_on='piers_ports',
    )
    .rename({'piers_port_loc':'piers_origin_loc'})
)
#merge drewery dest locs
piers_loc_lanes_df = (
    piers_loc_lanes_df
    .join(
        pl.DataFrame(piers_ports_df).cast({'piers_ports':pl.Categorical}),
        left_on='piers_dest',
        right_on='piers_ports',
    )
    .rename({'piers_port_loc':'piers_dest_loc'})
    .unique()
)

#### Match Ports on sum of origin-origin and destination-destination Haversine distances

In [23]:
#create df for matching 
df = (
    #cross join piers and drewery tables
    piers_loc_lanes_df.join(drewery_loc_lanes_df, how='cross')
    #convert to pandas
    .to_pandas()
)

#get summed haversine distance
def haversine(row, col1, col2):
    return geopy.distance.great_circle(row[col1], row[col2]).km

#get origin pair distances
df['origin_dist'] = (
    df.apply(lambda r: haversine(row=r, col1='piers_origin_loc', col2='drewery_origin_loc'), axis=1)
)
#get dest pair distances
df['dest_dist'] = (
    df.apply(lambda r: haversine(row=r, col1='piers_dest_loc', col2='drewery_dest_loc'), axis=1)
)
#sum
df['dist'] = df.origin_dist + df.dest_dist

#match on minimum dist
matched_df = (
    pl.DataFrame(df)
    .sort(by='dist')
    .group_by('lane_id')
    .agg(
        pl.col('route').first(),
        pl.col('dist').min()
    )
)

#save matches
matched_df.write_csv('../data/misc/matched_lanes_dist.csv')

#merge back to main lf
main_lf = (
    main_lf.join(
        matched_df.lazy(),
        on='lane_id',
        how='left'
    )
    .rename({'route':'drewery_lane'})
)

#### Merge Rate info to main lf

In [24]:
#prep drewery df for merge
df = (
    #convert to polars because I apparently live here now
    pl.DataFrame(drewery_df)
    #choose cols
    .select('route', 'container_type', 'date', 'rate')
    #drop duplicates on relevant cols
    .unique(subset=['route', 'container_type', 'date'])
    #pivot container type
    .pivot('container_type', values='rate')
    #rename
    .rename({
        '40ft Dry':'rate_40',
        '20ft Dry':'rate_20'
    })
    #convert date to dt
    .with_columns(
        pl.col('date').str.to_date(format='%Y-%m')
    )
    #drop rows with missing prices
    .drop_nulls(subset='rate_20')
)

#join asof date with main lf
main_lf = (
    main_lf
    #sort by date and recast to enable join_asof
    .sort(by='date')
    .cast({'date':pl.Date, 'drewery_lane':pl.Categorical})
    #join 
    .join_asof(
        df.lazy().sort(by='date').cast({'route':pl.Categorical}),
        on='date',
        by_left='drewery_lane',
        by_right='route'
    )
)

#### Inspect

In [25]:
nulls_over_time_plotly(
    data_lf=main_lf,
    group_var='direction',
    time_var='month',
    value_var='rate_20',
    title='Proportion of Missing Rates over time.')

### Vessel Capacities

Vessel Capacity info comes from the US Army Corp of Engineers in Net Register Tonnage (NRT), which is equal to 100 cubic feet. A standard Twenty-foot Equivalent Unit (TEU) container has an external volume of 1360 cubic feet, thus we divide the NRT capacity by 13.6 in order to obtain the TEU capacity of each vessel in the database. 

In [26]:
#read in vessel data
path = '../data/vessel_e&c/'
schema = {'IMO_UPD':pl.Int32, 'NRT':pl.Int32}
vessels_df = pl.DataFrame(schema=schema)

for file in os.listdir(path):
    if file.endswith('.xlsx'):
        #load data
        file_df = pl.read_excel(path+file, columns=['IMO_UPD', 'NRT'], schema_overrides=schema)
        #concat to main df
        vessels_df = pl.concat([vessels_df, file_df])

#polish df
vessels_df = (
    #match main_lf column names
    vessels_df.rename({'IMO_UPD':'vessel_id'})
    #drop missing values
    .drop_nulls()
    #drop duplicates
    .unique()
    #convert NRT to TEU capacity
    .with_columns(
        (pl.col('NRT')/13.6)
        .alias('vessel_capacity')
    )
    #drop nrt
    .drop('NRT')
    #set 0 capacities to missing
    .with_columns(
        pl.col('vessel_capacity').replace(0, None)
    )
)

#merge vessel capacity info to main_lf
main_lf = main_lf.join(vessels_df.lazy(), on='vessel_id', how='left')

In [27]:
nulls_over_time_plotly(main_lf, 'direction', 'month', 'vessel_capacity')

As seen above, vessel capacity data is mostly comprehensive during the years available from US Corp of Engineers (2013-2022) and progressively more missing as we move outside that window.

## Write to Parquet

In [28]:
#%%script echo skipping 
#write to clean parquet

#get years
years = pl.arange(2007,2024, eager=True)

start = time.time()

for year in years:
    print('Collecting {} dataframe...'.format(year))
    df = (
        main_lf
        .filter(pl.col('year')==year)
        .collect()
    )
    print('Writing {} data to parquet...'.format(year))
    df.write_parquet(file='../data/main/main_'+str(year)+'.parquet')
print('OCA data written to parquet')
runtime = time.time() - start
print('Total time to write data: {:.2f} hours'.format(runtime/3600))

Collecting 2007 dataframe...
Writing 2007 data to parquet...
Collecting 2008 dataframe...
Writing 2008 data to parquet...
Collecting 2009 dataframe...
Writing 2009 data to parquet...
Collecting 2010 dataframe...
Writing 2010 data to parquet...
Collecting 2011 dataframe...
Writing 2011 data to parquet...
Collecting 2012 dataframe...
Writing 2012 data to parquet...
Collecting 2013 dataframe...
Writing 2013 data to parquet...
Collecting 2014 dataframe...
Writing 2014 data to parquet...
Collecting 2015 dataframe...
Writing 2015 data to parquet...
Collecting 2016 dataframe...
Writing 2016 data to parquet...
Collecting 2017 dataframe...
Writing 2017 data to parquet...
Collecting 2018 dataframe...
Writing 2018 data to parquet...
Collecting 2019 dataframe...
Writing 2019 data to parquet...
Collecting 2020 dataframe...
Writing 2020 data to parquet...
Collecting 2021 dataframe...
Writing 2021 data to parquet...
Collecting 2022 dataframe...
Writing 2022 data to parquet...
Collecting 2023 datafram