# PIERS Imports ETL

This notebook extracts Bill of Lading data from csv files downloaded from S&P Global's PIERS BoL database. Transformations are limited here to setting appropriate datatypes for storage, adding year month and direction columns, and dropping duplicated rows. The data is then saved in .parquet format files by arrival year. 

In [1]:
#import libraries
import polars as pl #v0.20.7
import glob
import time

#enable string cache for polars categoricals
pl.enable_string_cache()

In [2]:
#define schema
imports_schema = {'Weight': pl.Float64,
            'Weight Unit': pl.Categorical,
            'Quantity': pl.Float64,
            'Quantity Type': pl.Categorical,
            'TEUs': pl.Float64,
            'Estimated Value': pl.Float64,
            'Arrival Date': pl.Utf8,
            'Container Piece Count': pl.Int32,
            'Quantity of Commodity Short Description': pl.Utf8,
            'Territory of Origin': pl.Categorical,
            'Region of Origin': pl.Categorical,
            'Port of Arrival Code': pl.Categorical,
            'Port of Arrival': pl.Categorical,
            'Port of Departure Code': pl.Categorical,
            'Port of Departure': pl.Categorical,
            'Final Destination': pl.Categorical,
            'Coastal Region': pl.Categorical,
            'Clearing District': pl.Categorical,
            'Place of Receipt': pl.Categorical,
            'Shipper': pl.Utf8,
            'Shipper Address': pl.Utf8,
            'Consignee': pl.Utf8,
            'Consignee Address': pl.Utf8,
            'Notify Party': pl.Utf8,
            'Notify Party Address': pl.Utf8,
            'Also Notify Party': pl.Utf8,
            'Also Notify Party Address': pl.Utf8,
            'Raw Commodity Description': pl.Utf8,
            'Marks Container Number': pl.Utf8,
            'Marks Description': pl.Utf8,
            'HS Code': pl.Utf8,
            'JOC Code': pl.Utf8,
            'Commodity Short Description': pl.Utf8,
            'Container Number': pl.Utf8,
            'Carrier': pl.Categorical,
            'SCAC': pl.Categorical,
            'Vessel Name': pl.Utf8,
            'Voyage Number': pl.Utf8,
            'Pre Carrier': pl.Float64,
            'IMO Number': pl.Int32,
            'Inbond Code': pl.Float64,
            'Mode of Transport': pl.Categorical,
            'Bill of Lading Number': pl.Utf8}

#define pythonic column names 
import_colnames_dict = {'Weight': 'weight',
            'Weight Unit': 'weight_unit',
            'Quantity': 'qty',
            'Quantity Type': 'qty_type',
            'TEUs': 'teus',
            'Estimated Value': 'value_est',
            'Arrival Date': 'date',
            'Container Piece Count': 'container_piece_count',
            'Quantity of Commodity Short Description': 'commod_short_desc_qty',
            'Territory of Origin': 'origin_territory',
            'Region of Origin': 'origin_region',
            'Port of Arrival Code': 'arrival_port_code',
            'Port of Arrival': 'arrival_port_name',
            'Port of Departure Code': 'departure_port_code',
            'Port of Departure': 'departure_port_name',
            'Final Destination': 'dest_final',
            'Coastal Region': 'coast_region',
            'Clearing District': 'clearing_district',
            'Place of Receipt': 'place_receipt',
            'Shipper': 'shipper_name',
            'Shipper Address': 'shipper_address',
            'Consignee': 'consignee_name',
            'Consignee Address': 'consignee_address',
            'Notify Party': 'notify_party1_name',
            'Notify Party Address': 'notify_party1_address',
            'Also Notify Party': 'notify_party2_name',
            'Also Notify Party Address': 'notify_party2_address',
            'Raw Commodity Description': 'commod_desc_raw',
            'Marks Container Number': 'container_id_marks',
            'Marks Description': 'marks_desc',
            'HS Code': 'hs_code',
            'JOC Code': 'joc_code',
            'Commodity Short Description': 'commod_short_desc',
            'Container Number': 'container_ids',
            'Carrier': 'carrier_name',
            'SCAC': 'carrier_scac',
            'Vessel Name': 'vessel_name',
            'Voyage Number': 'voyage_number',
            'Pre Carrier': 'precarrier',
            'IMO Number': 'vessel_id',
            'Inbond Code': 'inbond_code',
            'Mode of Transport': 'transport_mode',
            'Bill of Lading Number': 'bol_number'}

#define years
years = pl.arange(2005,2025, eager=True)

## ETL

In [3]:
#set path
path = 'data/raw_csv/imports/'
#set files and order
import_files = glob.glob(path+'*')
import_files.sort()

#identify columns from new format that are not necessary - these will be dropped
dropcolumns = [
    'Shipper City', 'Also Notify Party State', 'US Destination City', 
    'Also Notify Party City', 'US Destination State', 'Consignee State', 
    'Consignee City', 'Shipper State', 'Notify Party State', 'Notify Party City'
]

#process data from each file in folder (lazy mode)
for i in range(len(import_files)):
    #scan csv
    im_lf = pl.scan_csv(import_files[i], infer_schema_length=0)
    #coerse new format to match old format
    if set(dropcolumns).issubset(set(im_lf.columns)):
        im_lf = im_lf.drop(dropcolumns)
    #process file
    im_lf = (
        im_lf
        #strip whitespace and replace empty str with null
        .with_columns(pl.all().str.strip_chars().replace('',None))
        #set schema
        .cast(imports_schema)
        #rename cols
        .rename(import_colnames_dict)
        #reorder columns
        .select(import_colnames_dict.values())
        #cast date col to datetime
        .with_columns(pl.col('date').str.to_datetime('%Y%m%d'))
        .with_columns(
            #create direction column
            pl.lit('import').cast(pl.Categorical).alias('direction'),
            #create bol_id
            (pl.col('carrier_scac').fill_null('')+'_'+pl.col('bol_number')).alias('bol_id'),
            #extract year
            pl.col('date').dt.year().alias('year'),
            #extract month (e.g., '202304')
            pl.col('date').dt.strftime('%Y%m').alias('month'),
            #create lane_id
            (pl.col('departure_port_code').cast(pl.Utf8)+'_'+pl.col('arrival_port_code').cast(pl.Utf8))
            .cast(pl.Categorical)
            .alias('lane_id'),
            #convert zero volume values to null
            pl.col('teus').replace(0,None),
            pl.col('weight').replace(0,None),
            pl.col('qty').replace(0,None)
        )
    )
    if i == 0:
        #create main lazyframe
        imports_lf = im_lf
    else:
        #concat file frame to main lazyframe
        imports_lf = (
            pl.concat([imports_lf, im_lf], how='diagonal')
        )

In [4]:
#collect each year and write to parquet 
for year in years:
    print('Collecting observations from '+str(year)+'...')
    start = time.time()
    df = (
        imports_lf
        #filter by year
        .filter(pl.col('date').str.starts_with(str(year)))
        #drop duplicates
        .unique()
        #collect
        .collect()
    )
    #print status
    print(str(year)+' dataframe collected. \nWriting to parquet...')

    #write file to parquet
    df.write_parquet(file='data/raw_parquet/imports/piers_imports_'+str(year)+'_raw.parquet')
    #print status
    print('Total Time: {:2f} minutes'.format((time.time()-start)/60))
    del df

print('\nImports ETL complete.')

Collecting observations from 2020...
2020 dataframe collected. 
Writing to parquet...
Total Time: 4.196024 minutes
Collecting observations from 2021...
2021 dataframe collected. 
Writing to parquet...
Total Time: 4.895440 minutes
Collecting observations from 2022...
2022 dataframe collected. 
Writing to parquet...
Total Time: 5.137726 minutes
Collecting observations from 2023...
2023 dataframe collected. 
Writing to parquet...
Total Time: 4.736244 minutes
Collecting observations from 2024...
2024 dataframe collected. 
Writing to parquet...
Total Time: 4.549712 minutes

Imports ETL complete.


In [5]:
#close string cache
pl.disable_string_cache()