# Data Processing Pipeline

In [3]:
import importlib
import preprocess_data
import regularize_tracks
import trajectory_segmentation
importlib.reload(preprocess_data)
importlib.reload(regularize_tracks)
importlib.reload(trajectory_segmentation)
from regularize_tracks import resample_segment

from preprocess_data import process_zip, drop_duplicate_messages, save_parquet_partitioned
from trajectory_segmentation import segment_trajectories
from download_data import download_ais_range

import matplotlib.pyplot as plt
import os
import glob
import pandas as pd
import numpy as np
from datetime import datetime, timedelta


## Input Parameters

In [4]:
START_DATE =  datetime(2025, 8, 1) # None to skip downloading
END_DATE  = datetime(2025, 8, 8) # None to skip downloading

# Directory where to store ZIP files
ZIP_DIR = os.path.join("..", "..", "data", "aisdk", "raw")
# Where to store the Parquet dataset
PARQUET_DIR = os.path.join("..", "..", "data", "aisdk", "interim", "aisdk_2025")

# Final Parquet file path
PARQUET_FILE_FINAL = os.path.join("..", "..", "data", "aisdk", "processed", "aisdk_2025")

## Data Download

In [5]:
if START_DATE and END_DATE:
    download_ais_range(start_date = START_DATE,
                       end_date = END_DATE,
                       output_dir = ZIP_DIR)

Downloading AIS data to: ../../data/aisdk/raw
Date range: 2025-08-01 → 2025-08-08 (exclusive)

→ 2025-08-01: http://aisdata.ais.dk/aisdk-2025-08-01.zip
   Saved: ../../data/aisdk/raw/aisdk-2025-08-01.zip
→ 2025-08-02: http://aisdata.ais.dk/aisdk-2025-08-02.zip
   Saved: ../../data/aisdk/raw/aisdk-2025-08-02.zip
→ 2025-08-03: http://aisdata.ais.dk/aisdk-2025-08-03.zip
   Saved: ../../data/aisdk/raw/aisdk-2025-08-03.zip
→ 2025-08-04: http://aisdata.ais.dk/aisdk-2025-08-04.zip
   Saved: ../../data/aisdk/raw/aisdk-2025-08-04.zip
→ 2025-08-05: http://aisdata.ais.dk/aisdk-2025-08-05.zip
   Saved: ../../data/aisdk/raw/aisdk-2025-08-05.zip
→ 2025-08-06: http://aisdata.ais.dk/aisdk-2025-08-06.zip
   Saved: ../../data/aisdk/raw/aisdk-2025-08-06.zip
→ 2025-08-07: http://aisdata.ais.dk/aisdk-2025-08-07.zip
   Saved: ../../data/aisdk/raw/aisdk-2025-08-07.zip

Done!


## Data Preprocessing

In [6]:
os.makedirs(PARQUET_DIR, exist_ok=True)

# Process all August 2025 ZIP files
pattern = os.path.join(ZIP_DIR, "aisdk-2025-08-*.zip")
zip_files = sorted(glob.glob(pattern))

print(f"Found {len(zip_files)} ZIP files to process.")

for zp in zip_files:
    process_zip(zp, PARQUET_DIR)

print("All files processed.")

Found 7 ZIP files to process.

=== Processing ../../data/aisdk/raw/aisdk-2025-08-01.zip ===
Output path: ../../data/aisdk/interim/aisdk_2025
Reading ../../data/aisdk/raw/aisdk-2025-08-01.zip ...
Columns in raw DF: ['# Timestamp', 'Type of mobile', 'MMSI', 'Latitude', 'Longitude', 'SOG', 'COG', 'Ship type']
Filtered to Ship type == Cargo and dropped column.
Applied geographic bounding box (60, 0, 50, 20).
Filtered to Type of mobile in ['Class A', 'Class B'] and dropped column.
Applied MMSI format and MID filters.
Parsed Timestamp column.
Converted SOG from knots to m/s.
Final columns: ['Timestamp', 'MMSI', 'Latitude', 'Longitude', 'SOG', 'COG', 'UTM_x', 'UTM_y', 'UTM_zone', 'UTM_letter']
Rows after filtering: 3470997
Saving to parquet dataset at ../../data/aisdk/interim/aisdk_2025 ...
Parquet save done.
=== Done for ../../data/aisdk/raw/aisdk-2025-08-01.zip ===


=== Processing ../../data/aisdk/raw/aisdk-2025-08-02.zip ===
Output path: ../../data/aisdk/interim/aisdk_2025
Reading ../../d

In [7]:
df = pd.read_parquet(PARQUET_DIR)

In [8]:
df.head()

Unnamed: 0,Timestamp,Latitude,Longitude,SOG,COG,UTM_x,UTM_y,UTM_zone,UTM_letter,MMSI
0,2025-08-04 00:02:56,57.112582,12.245685,0.0,78.1,333213.135376,6333286.0,33,V,205136000
1,2025-08-04 00:08:21,57.112583,12.245675,0.0,264.8,333212.534515,6333286.0,33,V,205136000
2,2025-08-04 00:08:22,57.112583,12.245675,0.0,245.4,333212.534515,6333286.0,33,V,205136000
3,2025-08-04 00:14:21,57.112588,12.245672,0.0,13.6,333212.375384,6333286.0,33,V,205136000
4,2025-08-04 00:14:21,57.112588,12.245672,0.0,13.6,333212.375384,6333286.0,33,V,205136000


## Create trajectories

In [None]:
df = drop_duplicate_messages(df)
df = segment_trajectories(df, 
                              sog_threshold=0.5, # 1 knot in m/s
                             position_threshold=50, # 50 meters
                              time_threshold=30 # 30 minutes
                             )

## Missing values

In [10]:
# Missing segments 
mask_missing = df["SOG"].isna() | df["COG"] .isna()
bad_segments = df[mask_missing][["MMSI", "Trajectory"]].drop_duplicates()

# All unique segments
all_segments = df[["MMSI", "Trajectory"]].drop_duplicates()

# Clean segments = all_segments MINUS bad_segments
clean_segments = all_segments.merge(
    bad_segments,
    on=["MMSI", "Trajectory"],
    how="left",
    indicator=True
).query('_merge == "left_only"').drop(columns="_merge")

print(clean_segments)

clean_df = df.merge(clean_segments, on=["MMSI", "Trajectory"], how="inner")

           MMSI  Trajectory
0     205136000           0
1     205136000           1
2     205136000           2
3     205136000           3
4     205136000           4
...         ...         ...
2621  636093310        2621
2622  636093318        2622
2623  636093318        2623
2625  667002347        2625
2626  667002348        2626

[2482 rows x 2 columns]


In [11]:
# Check how many segments were removed
print("Number of segments before cleaning:", all_segments.shape)
print("Number of segments after cleaning:", clean_segments.shape)
print("Number of removed segments:", bad_segments.shape)

Number of segments before cleaning: (2627, 2)
Number of segments after cleaning: (2482, 2)
Number of removed segments: (145, 2)


## Resampling and Imputation

In [12]:
df_resampled = clean_df.groupby(["MMSI", "Trajectory"]).apply(resample_segment)
df_resampled = df_resampled.reset_index(names=["MMSI", "Trajectory", "Timestamp"])

  df_resampled = clean_df.groupby(["MMSI", "Trajectory"]).apply(resample_segment)
  df_resampled = clean_df.groupby(["MMSI", "Trajectory"]).apply(resample_segment)


In [13]:
df_resampled.head()

Unnamed: 0,MMSI,Trajectory,Timestamp,COG,UTM_x,UTM_y,SOG
0,205136000,0,2025-08-05 17:50:21,4.0,333256.30374,6333506.0,0.0
1,205136000,0,2025-08-05 17:51:21,3.118897,333256.303469,6333506.0,0.000412
2,205136000,0,2025-08-05 17:52:21,352.187557,333256.266741,6333507.0,0.025815
3,205136000,0,2025-08-05 17:53:21,319.46877,333256.111054,6333507.0,0.077486
4,205136000,0,2025-08-05 17:54:21,254.94889,333255.75521,6333508.0,0.136992


In [14]:
# Quick check for missing values
df_resampled.isnull().sum()

MMSI          0
Trajectory    0
Timestamp     0
COG           0
UTM_x         0
UTM_y         0
SOG           0
dtype: int64

## Degrees conversion

In [15]:
# Decompose COG into its vector components
cog_radians = np.radians(df_resampled['COG']) # convert to radians
df_resampled['v_east'] = df_resampled['SOG'] * np.sin(cog_radians) # eastward component
df_resampled['v_north'] = df_resampled['SOG'] * np.cos(cog_radians) # northward component
df_resampled.drop(columns=['COG'], inplace=True)

In [16]:
df_resampled.head()

Unnamed: 0,MMSI,Trajectory,Timestamp,UTM_x,UTM_y,SOG,v_east,v_north
0,205136000,0,2025-08-05 17:50:21,333256.30374,6333506.0,0.0,0.0,0.0
1,205136000,0,2025-08-05 17:51:21,333256.303469,6333506.0,0.000412,2.2e-05,0.000411
2,205136000,0,2025-08-05 17:52:21,333256.266741,6333507.0,0.025815,-0.003509,0.025575
3,205136000,0,2025-08-05 17:53:21,333256.111054,6333507.0,0.077486,-0.050355,0.058894
4,205136000,0,2025-08-05 17:54:21,333255.75521,6333508.0,0.136992,-0.132292,-0.035574


In [17]:
df_resampled.shape

(1620712, 8)

## Write to a final parquet file

In [18]:
save_parquet_partitioned(df_resampled, out_path=PARQUET_FILE_FINAL, partition_cols=["MMSI", "Trajectory"])

Saving to parquet dataset at ../../data/aisdk/processed/aisdk_2025 ...
Parquet save done.
