In [48]:
# use pathlib to scan for files
import pathlib

# use polars to read csv files faster
import polars as pl 
import numpy as np
# progress bar
import tqdm.auto as tqdm
# geopandas for coordinates
import geopandas as gpd
import pandas as pd
# dask for reading multiple files in parallel
import dask.dataframe as dd
import dask.distributed 
import dask_geopandas
import shapely

In [9]:
cluster = dask.distributed.LocalCluster()          # Fully-featured local Dask cluster
client = cluster.get_client()
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 49245 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:49245/status,

0,1
Dashboard: http://127.0.0.1:49245/status,Workers: 5
Total threads: 10,Total memory: 64.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:49246,Workers: 5
Dashboard: http://127.0.0.1:49245/status,Total threads: 10
Started: Just now,Total memory: 64.00 GiB

0,1
Comm: tcp://127.0.0.1:49259,Total threads: 2
Dashboard: http://127.0.0.1:49263/status,Memory: 12.80 GiB
Nanny: tcp://127.0.0.1:49249,
Local directory: /var/folders/fh/tcbrjg6n28b0lzjzh07b5t6m0000gn/T/dask-scratch-space/worker-71vtx7ax,Local directory: /var/folders/fh/tcbrjg6n28b0lzjzh07b5t6m0000gn/T/dask-scratch-space/worker-71vtx7ax

0,1
Comm: tcp://127.0.0.1:49260,Total threads: 2
Dashboard: http://127.0.0.1:49265/status,Memory: 12.80 GiB
Nanny: tcp://127.0.0.1:49251,
Local directory: /var/folders/fh/tcbrjg6n28b0lzjzh07b5t6m0000gn/T/dask-scratch-space/worker-gy3wx7_p,Local directory: /var/folders/fh/tcbrjg6n28b0lzjzh07b5t6m0000gn/T/dask-scratch-space/worker-gy3wx7_p

0,1
Comm: tcp://127.0.0.1:49261,Total threads: 2
Dashboard: http://127.0.0.1:49268/status,Memory: 12.80 GiB
Nanny: tcp://127.0.0.1:49253,
Local directory: /var/folders/fh/tcbrjg6n28b0lzjzh07b5t6m0000gn/T/dask-scratch-space/worker-e1hersew,Local directory: /var/folders/fh/tcbrjg6n28b0lzjzh07b5t6m0000gn/T/dask-scratch-space/worker-e1hersew

0,1
Comm: tcp://127.0.0.1:49262,Total threads: 2
Dashboard: http://127.0.0.1:49271/status,Memory: 12.80 GiB
Nanny: tcp://127.0.0.1:49255,
Local directory: /var/folders/fh/tcbrjg6n28b0lzjzh07b5t6m0000gn/T/dask-scratch-space/worker-f5bspidi,Local directory: /var/folders/fh/tcbrjg6n28b0lzjzh07b5t6m0000gn/T/dask-scratch-space/worker-f5bspidi

0,1
Comm: tcp://127.0.0.1:49270,Total threads: 2
Dashboard: http://127.0.0.1:49276/status,Memory: 12.80 GiB
Nanny: tcp://127.0.0.1:49257,
Local directory: /var/folders/fh/tcbrjg6n28b0lzjzh07b5t6m0000gn/T/dask-scratch-space/worker-0qnb1lpx,Local directory: /var/folders/fh/tcbrjg6n28b0lzjzh07b5t6m0000gn/T/dask-scratch-space/worker-0qnb1lpx


# Combine all IVS data files into one dataset
This notebook reads data from [goederenvervoer](https://downloads.rijkswaterstaatdata.nl/scheepvaart/goederenvervoer/archief/) IVS data and stores it as one files for easier processing.

In [10]:
# Download / update the files with the following command
# Run this command from your ~/data/ivs folder
# wget -c -nd -r -np -l 1 -A zip 'https://downloads.rijkswaterstaatdata.nl/scheepvaart/goederenvervoer/archief/'

In [11]:
# Get the merged-unlo codes from Fedor (this is sent to UN in order to give them the opportunity to update their codes). 
unlo_path = pathlib.Path('~/data/unlo/unlo-geocoded-v0.1.gpkg').expanduser()
ivs_path = pathlib.Path('~/data/ivs').expanduser()

In [70]:
# define all column names 
schema = dict([
    ('Jaarmaand', int),
    ('Jaar', int),
    ('Maand', int),
    ('Weeknr', int),
    ('v05_06_begindt_evenement_iso', str),
    ('v05_06_Begindt_evenement', str),
    ('UNLO_herkomst', str),
    ('UNLO_bestemming', str),
    ('v15_1_Scheepstype_RWS', str),
    ('SK_CODE', str),
    ('v18_Laadvermogen', float),
    ('v28_Beladingscode', int), 
    ('v38_Vervoerd_gewicht', float), # check units in kg?
    ('v30_4_Containers_TEU_S', float),
    ('nstr_nw', str), # categories: see https://www.cbs.nl/en-gb/our-services/methods/definitions/commodity-nomenclature-nstr
    ('nst2007_nw', str)
])

old_schema = schema.copy()
del old_schema['v05_06_begindt_evenement_iso']


In [82]:
# create an empty data frame based on our prefered schema
empty_df = pd.DataFrame(columns=schema.keys(), dtype=object).astype(schema)


In [83]:
def ivs_csv_to_df(path):
    df = pd.read_csv(path, sep=';', quotechar='"', dtype=object, keep_default_na=False)
    # replace missing gewicht with nans
    no_gewicht_idx = (df['v38_Vervoerd_gewicht'] == '')
    df.loc[no_gewicht_idx, 'v38_Vervoerd_gewicht'] = np.nan
    no_laadvermogen_idx = (df['v18_Laadvermogen'] == '')
    df.loc[no_laadvermogen_idx, 'v18_Laadvermogen'] = np.nan
    no_container_idx = (df['v30_4_Containers_TEU_S'] == '')
    df.loc[no_container_idx, 'v30_4_Containers_TEU_S'] = np.nan
    
    try:
        df = df.astype(schema)
    except KeyError:
        df = df.astype(old_schema)

    # add an empty row to make sure all columns exist in proper order
    df = pd.concat([empty_df, df])
    return df

ivs_csv_to_df(ivs_path / 'IVS_weekmonitor_16FEB2021.csv')

Unnamed: 0,Jaarmaand,Jaar,Maand,Weeknr,v05_06_begindt_evenement_iso,v05_06_Begindt_evenement,UNLO_herkomst,UNLO_bestemming,v15_1_Scheepstype_RWS,SK_CODE,v18_Laadvermogen,v28_Beladingscode,v38_Vervoerd_gewicht,v30_4_Containers_TEU_S,nstr_nw,nst2007_nw
0,2012,2020,12,53,,31 december 2020 13:00:00 uur,FRXXX,NLLSH,1,M1,370.0,7,244000.0,0.0,0,1.1
1,2012,2020,12,53,,31 december 2020 13:00:00 uur,FRXXX,NLLSH,1,M1,380.0,7,265000.0,0.0,0,1.1
2,2012,2020,12,53,,31 december 2020 22:00:00 uur,BEANR,DEMAL,4,M8,1740.0,7,1493000.0,0.0,8,8.2
3,2012,2020,12,53,,31 december 2020 23:00:00 uur,DENSS,NLDZL,1,M8,3150.0,1,,0.0,,
4,2012,2020,12,53,,31 december 2020 23:00:00 uur,NLVLI,BEBRU,2,M10,4290.0,7,3004000.0,0.0,3,7.2
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
43187,2102,2021,2,6,,13 februari 2021 23:00:00 uur,NLAPN,BEANR,1,M8,1940.0,7,1295987.0,101.0,9,12.1
43188,2102,2021,2,6,,13 februari 2021 23:00:00 uur,NLRTM,DEGDO,2,M8,2780.0,3,0.0,0.0,8,8.1
43189,2102,2021,2,6,,13 februari 2021 23:00:00 uur,NLGOR,NLRTM,1,M12,6400.0,7,1266887.0,274.0,9,12.1
43190,2102,2021,2,6,,13 februari 2021 23:00:00 uur,NLAMS,FRMLH,2,M9,4520.0,7,1065000.0,0.0,3,7.2


In [84]:
paths = list(sorted(ivs_path.glob('*.csv')))


In [85]:
for path in tqdm.tqdm(paths):
    df = ivs_csv_to_df(path)
    df.to_parquet(path.with_suffix('.parquet'))

  0%|          | 0/1079 [00:00<?, ?it/s]

Now we can re-read all the singular files. They should now all have static column types. We'll convert it into one file using dask. Dask can read multiple files and treat them as one. 

In [86]:
ddf = dd.read_parquet(list(ivs_path.glob('IVS*.parquet')))
# TODO: check if we now found all duplicates (maybe subset on certain columns)
ddf = ddf.drop_duplicates()
ddf = ddf.persist()



In [87]:
# drop all double records (this might need some 

ddf.to_parquet('ivs-2024.parquet', overwrite=True)



In [88]:
# convert to one file (to read into memory)
ddf.compute().to_parquet(ivs_path / 'ivs-2024-one-file.parquet')


In [89]:
# test read performance
ivs_df = pd.read_parquet(ivs_path / 'ivs-2024-one-file.parquet')

In [90]:
unlo_gdf = gpd.read_file(unlo_path)
def unlo_code(row):
    return row['country_code'] + row['location_code']
unlo_gdf['unlo_code'] = unlo_gdf.apply(unlo_code, axis=1)
unlo_gdf = unlo_gdf[['unlo_code', 'geometry']].set_index('unlo_code')
# add the final missing record in Veghel
unlo_gdf.loc['NLVEG'] = shapely.Point(5.509574, 51.619686)

In [91]:
ivs_gdf = (
    ivs_df
        .merge(unlo_gdf, left_on='UNLO_herkomst', right_index=True, how='left')
        .merge(unlo_gdf, left_on='UNLO_bestemming', right_index=True, how='left')
)                                                                                         

In [93]:
def points2geometry(row):
    geometry = None
    
    if row['geometry_x'] and row['geometry_y']:
        geometry = shapely.LineString([row['geometry_x'], row['geometry_y']])
    elif row['geometry_x']:
        geometry = row['geometry_x']
    elif row['geometry_y']:
        geometry = row['geometry_y']
    return geometry
    
ivs_gdf['geometry'] = ivs_gdf.apply(points2geometry, axis=1)

In [101]:
print(ivs_gdf.shape[0])
ivs_gdf[['geometry_x', 'geometry_y']].describe()
ivs_gdf[~ivs_gdf['geometry_x'].astype(bool)]

2504165


Unnamed: 0,Jaarmaand,Jaar,Maand,Weeknr,v05_06_begindt_evenement_iso,v05_06_Begindt_evenement,UNLO_herkomst,UNLO_bestemming,v15_1_Scheepstype_RWS,SK_CODE,v18_Laadvermogen,v28_Beladingscode,v38_Vervoerd_gewicht,v30_4_Containers_TEU_S,nstr_nw,nst2007_nw,geometry_x,geometry_y,geometry
43576,2304,2023,4,15,2023-04-13T06:00:00+02:00,13 april 2023 06:00:00 uur,NOXXX,FRXXX,1,M8,3250.0,7,3175000.0,0.0,9,10.5,,,
37196,1802,2018,2,6,2018-02-06T10:00:00+01:00,06 februari 2018 10:00:00 uur,FRXXX,NLAAL,1,M8,3150.0,7,3152000.0,,,,,POINT (5.53333 51.95000),POINT (5.533333333333333 51.95)
365214,1811,2018,11,45,2018-11-11T12:00:00+01:00,11 november 2018 12:00:00 uur,FRXXX,NLOSS,1,M6,1540.0,7,1050000.0,,0,1.1,,POINT (5.53333 51.76667),POINT (5.533333333333333 51.766666666666666)
406078,1812,2018,12,50,2018-12-12T13:00:00+01:00,12 december 2018 13:00:00 uur,FRFRH,NLAMS,9,C3l,5240.0,7,3395000.0,,6,3.5,,POINT (4.81667 52.40000),POINT (4.816666666666666 52.4)
91493,1903,2019,3,13,2019-03-25T07:00:00+01:00,25 maart 2019 07:00:00 uur,FRXXX,BEANR,1,M6,1700.0,7,940000.0,0.0,9,16.1,,POINT (4.41667 51.21667),POINT (4.416666666666667 51.21666666666667)
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
171619,1806,2018,6,23,2018-06-05T07:00:00+02:00,05 juni 2018 07:00:00 uur,LNTLB,NLHRW,1,M3,710.0,1,,,,,,POINT (5.39257 51.81739),POINT (5.392569041 51.81739405)
286158,1809,2018,9,37,2018-09-11T14:00:00+02:00,11 september 2018 14:00:00 uur,HERKO,BEGNE,2,M0,150.0,7,38000.0,,3,7.2,,POINT (3.71667 51.05000),POINT (3.716666666666667 51.05)
94958,1903,2019,3,13,2019-03-28T05:00:00+01:00,28 maart 2019 05:00:00 uur,FRXXX,NLZOU,1,M2,870.0,7,660000.0,0.0,1,4.7,,POINT (4.50000 52.11667),POINT (4.5 52.11666666666667)
85812,2003,2020,3,13,2020-03-26T12:00:00+01:00,26 maart 2020 12:00:00 uur,FRXXX,NLWRT,1,M1,340.0,7,260000.0,0.0,1,4.6,,POINT (5.70720 51.25148),POINT (5.707201368 51.251481692)


In [102]:
ivs_gdf = ivs_gdf.drop(columns=['geometry_x', 'geometry_y'])
ivs_gdf = gpd.GeoDataFrame(ivs_gdf, geometry='geometry')

In [103]:
ivs_gdf.to_file(ivs_path / 'ivs-2024-geocoded.gpkg')

In [108]:
!open {ivs_path}

In [105]:
ivs_gdf.sample(n=100000).to_file(ivs_path / 'ivs-2024-geocoded-sample.gpkg')

In [106]:
ivs_path

PosixPath('/Users/baart_f/data/ivs')

In [None]:
ivs_gdf