# Preparing parquet files for sql loading

- From each lookup table, deduplicate and get the unique values.
- Assign those values an id
- These will server for your lookup tables

- Map those back to the main file
- You merge them back to your big file, to replace raw columns, with the corresponding IDs

- shipments table will have all these generated ids

In [1]:
import pandas as pd
from glob import glob
import dask.dataframe as dd
chunksize = 400_000

In [None]:
cleaned_fps_2019_cargodesc = [
    "./data/cleaned/2019/cargodesc_files/ams_cargodesc_2019_part_0.parquet",
    "./data/cleaned/2019/cargodesc_files/ams_cargodesc_2019_part_1.parquet",
    "./data/cleaned/2019/cargodesc_files/ams_cargodesc_2019_part_2.parquet",
    "./data/cleaned/2019/cargodesc_files/ams_cargodesc_2019_part_3.parquet",
    "./data/cleaned/2019/cargodesc_files/ams_cargodesc_2019_part_4.parquet"
]


cleaned_fps_2019_header = [
    "./data/cleaned/2019/header_files/ams_header_2019_part_0.parquet",
    "./data/cleaned/2019/header_files/ams_header_2019_part_1.parquet",
    "./data/cleaned/2019/header_files/ams_header_2019_part_2.parquet",
    "./data/cleaned/2019/header_files/ams_header_2019_part_3.parquet"
]

In [2]:

cleaned_fps_2020_cargodesc = [
    "./data/cleaned/2020/cargodesc_files/ams_cargodesc_2020_part_0.parquet",
    "./data/cleaned/2020/cargodesc_files/ams_cargodesc_2020_part_1.parquet",
    "./data/cleaned/2020/cargodesc_files/ams_cargodesc_2020_part_2.parquet",
    "./data/cleaned/2020/cargodesc_files/ams_cargodesc_2020_part_3.parquet"

]

cleaned_fps_2020_header = [
    "./data/cleaned/2020/header_files/ams_header_2020_part_0.parquet",
    "./data/cleaned/2020/header_files/ams_header_2020_part_1.parquet",
    "./data/cleaned/2020/header_files/ams_header_2020_part_2.parquet"
]

In [3]:
# Getting unique values for lookup tables

# Lookup table 1
unique_cargo_desc = set()

# Lookup table 2
unique_port_lading_info = set()
unique_port_unlading_info = set()

# Lookup table 3
unique_arrival_date_info = set()
unique_estimated_date_info = set()

unique_weight_info = set() # Lookup table 4
unique_weight_unit = set()

unique_manifest_quant = set() # Lookup table 5

In [None]:
# Getting unique values for lookup table cargodesc
for fp in cleaned_fps_2020_cargodesc:
    ddf = dd.read_parquet(fp,columns=['description_text'])
    uniques = ddf['description_text'].compute()
    unique_cargo_desc.update(uniques)

In [4]:
# Getting all unique actual arrival dates and unique estimated arrival dates

for fp in cleaned_fps_2020_header:
    ddf = dd.read_parquet(fp,columns=['estimated_arrival_date','actual_arrival_date'])
    
    unique_actual = ddf['actual_arrival_date'].drop_duplicates().compute()
    unique_estimated = ddf['estimated_arrival_date'].drop_duplicates().compute()

    unique_arrival_date_info.update(unique_actual)
    unique_estimated_date_info.update(unique_estimated)

In [16]:
for fp in cleaned_fps_2020_header:
    ddf = dd.read_parquet(fp,columns=['port_of_unlading','foreign_port_of_lading'])

    unique_ports_u = ddf['port_of_unlading'].drop_duplicates().compute()
    unique_port_l = ddf['foreign_port_of_lading'].drop_duplicates().compute()

    unique_port_unlading_info.update(unique_ports_u)
    unique_port_lading_info.update(unique_port_l)


In [5]:
# Getting unique weights and units
for fp in cleaned_fps_2020_header:
    ddf = dd.read_parquet(fp,columns=['weight','weight_unit'])

    unique_weight = ddf['weight'].drop_duplicates().compute()
    unique_unit = ddf['weight_unit'].drop_duplicates().compute()

    unique_weight_info.update(unique_weight)
    unique_weight_unit.update(unique_unit)

In [None]:
# Getting quantity of items per shipment
for fp in cleaned_fps_2020_header:
    ddf = dd.read_parquet(fp,columns=['manifest_quantity'])

    unique_quant = ddf['manifest_quantity'].drop_duplicates().compute()
    unique_manifest_quant.update(unique_quant)

In [7]:
# Initializing empty lists, memory purposes
cargo_desc_sorted = []
port_lading_sorted = []
port_unlading_sorted = []
arrival_date_sorted = []
estimated_date_sorted = []
weight_sorted = []
weight_unit_sorted = []
manisfest_unit_sorted = []

In [8]:
# Making lookup tables
# sort sets first, creating stable ids

cargo_desc_sorted = sorted(unique_cargo_desc)
port_lading_sorted = sorted(unique_port_lading_info)
port_unlading_sorted = sorted(unique_port_unlading_info)
arrival_date_sorted = sorted(unique_arrival_date_info)
estimated_date_sorted = sorted(unique_estimated_date_info)
weight_sorted = sorted(unique_weight_info)
weight_unit_sorted = sorted(unique_weight_unit)
manisfest_unit_sorted = sorted(unique_manifest_quant)

In [9]:
#Making the lookup tables

cargo_desc_lookup = pd.DataFrame({
    'cargodesc_id': range(1,len(cargo_desc_sorted) + 1),
    'cargodesc' : cargo_desc_sorted
})

port_lading_lookup = pd.DataFrame({
    'port_lading_id': range(1,len(port_lading_sorted) + 1),
    'port_of_lading': port_lading_sorted
})

port_unlading_lookup = pd.DataFrame({
    'port_unlading_id' : range(1, len(port_unlading_sorted) + 1),
    'port_of_unlading' : port_unlading_sorted
})

arrival_date_lookup = pd.DataFrame({
    'arrival_id': range(1, len(arrival_date_sorted) + 1),
    'arrival_date': arrival_date_sorted
})

estimated_arrival_lookup = pd.DataFrame({
    'estimated_arrival_id': range(1,len(estimated_date_sorted) + 1),
    'estimated_arrival_date': estimated_date_sorted
})

weight_lookup = pd.DataFrame({
    'weight_id': range(1,len(weight_sorted) + 1),
    'weight': weight_sorted
})

weight_unit_lookup = pd.DataFrame({
    'weight_unit_id': range(1,len(weight_unit_sorted) + 1),
    'weight_unit':weight_unit_sorted
})

manifest_unit_lookup = pd.DataFrame({
    'manifest_quantity_id': range(1,len(unique_manifest_quant) + 1),
    'manifest_quantity': manisfest_unit_sorted
})


In [11]:
# Lookup tables should be their own seperate files

cargo_desc_lookup.to_csv('cargo_desc_lookup.csv',index=False)
port_lading_lookup.to_csv('port_of_lading_lookup.csv',index=False)
port_unlading_lookup.to_csv('port_of_unlading.csv',index=False)
arrival_date_lookup.to_csv('arrival_date_lookup.csv',index=False)
estimated_arrival_lookup.to_csv('estimated_arrival_lookup.csv',index=False)
weight_lookup.to_csv('weight_lookup.csv',index=False)
weight_unit_lookup.to_csv('weight_unit_lookup.csv',index=False)
manifest_unit_lookup.to_csv('manifest_units_lookup.csv',index=False)

In [21]:
# Creating mapping ids for fast lookups

cargo_mapping = dict(zip(cargo_desc_lookup['cargodesc'], cargo_desc_lookup['cargodesc_id']))

port_lading_mapping = dict(zip(port_lading_lookup['port_of_lading'], port_lading_lookup['port_lading_id']))

port_unlading_mapping = dict(zip(port_unlading_lookup['port_of_unlading'], port_unlading_lookup['port_unlading_id']))

arrival_mapping = dict(zip(arrival_date_lookup['arrival_date'], arrival_date_lookup['arrival_id']))

estimated_arrival_mapping = dict(zip(estimated_arrival_lookup['estimated_arrival_date'], estimated_arrival_lookup['estimated_arrival_id']))

weight_mapping = dict(zip(weight_lookup['weight'], weight_lookup['weight_id']))

weight_unit_mapping = dict(zip(weight_unit_lookup['weight_unit'], weight_unit_lookup['weight_unit_id']))

manifest_unit_mapping = dict(zip(manifest_unit_lookup['manifest_quantity'],manifest_unit_lookup['manifest_quantity_id']))

In [22]:
# We will be merging everything into the header files where the bulk of the data is
# So we make header dfs for our purposes

idx = 0
for file in cleaned_fps_2020_header:
    df = pd.read_parquet(file)

    df['port_lading_id'] = df['foreign_port_of_lading'].map(port_lading_mapping)
    df['port_unlading_id'] = df['port_of_unlading'].map(port_unlading_mapping)
    df['arrival_id'] = df['actual_arrival_date'].map(arrival_mapping)
    df['estimated_arrival_id'] = df['estimated_arrival_date'].map(estimated_arrival_mapping)
    df['weight_id'] = df['weight'].map(weight_mapping)
    df['manifest_quantity_id'] = df['manifest_quantity'].map(manifest_unit_mapping)
    df['weight_unit_id'] = df['weight_unit'].map(weight_unit_mapping)
    df = df.drop(columns=['vessel_name','port_of_unlading','measurement_unit','foreign_port_of_destination','estimated_arrival_date','foreign_port_of_lading','manifest_quantity','manifest_unit','measurement','weight','weight_unit','port_of_destination','mode_of_transportation','actual_arrival_date'])
    df.to_csv(f'header_table{idx}.csv',index=False)
    idx +=1
