# Universal Maritime Data Preparation Pipeline

This notebook contains a universal data preparation pipeline suitable for various ML algorithms including:
- Classification algorithms (Logistic Regression, Random Forest, SVM, etc.)
- Deep learning models (LSTM, Neural Networks, etc.)
- Anomaly detection algorithms (One-Class SVM, Isolation Forest, etc.)
- Time series models

The pipeline focuses on feature engineering and data quality without algorithm-specific assumptions.

In [19]:
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
import warnings
from typing import List, Optional, Tuple

tqdm.pandas()

## Configuration

In [20]:
# ───────────────────────────── Configuration ────────────────────────────── #

# Data cleaning parameters
DROP_TRIPS = [10257]  # List of trip IDs to exclude from analysis

# Feature groups for different use cases
CORE_FEATURES = [
    "speed_over_ground", "course_over_ground", "draught",     # Basic vessel state
    "latitude", "longitude", "x_km", "y_km",     # Spatial features
    "zone", "route_id",     # Zone and route information
    "trip_duration_hours", "trip_distance_km"     # Trip context
]

ENGINEERED_FEATURES = [
    "speed_change", "course_change", "draught_change",     # Change-based features (deltas)
    "dist_to_route_center", "deviation_from_avg_route",    # Trajectory features
    "time_since_trip_start", "progress_along_trip",        # Temporal patterns
    "speed_rolling_mean", "speed_rolling_std"              # Statistical features
]

# Geographic zones [lat_max, lat_min, lon_max, lon_min]
GEOGRAPHIC_ZONES = [
    [53.8, 53.5, 8.6, 8.14],   # Zone 0
    [53.66, 53.0, 11.0, 9.5]   # Zone 1
]

# Constants
EARTH_RADIUS_KM = 6371.0
ROLLING_WINDOW = 5  # For rolling statistics

## Utility Functions

In [21]:
def haversine_distance(lat1: np.ndarray, lon1: np.ndarray, 
                      lat2: np.ndarray, lon2: np.ndarray) -> np.ndarray:
    """
    Calculate haversine distance between points in kilometers.
    Vectorized implementation for efficiency.
    """
    lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2])
    dlat, dlon = lat2 - lat1, lon2 - lon1
    a = (np.sin(dlat / 2) ** 2 + 
         np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2)
    return 2 * EARTH_RADIUS_KM * np.arcsin(np.sqrt(a))

In [22]:

def lat_lon_to_km(df: pd.DataFrame,
                  reference_lat: Optional[float] = None,
                  reference_lon: Optional[float] = None) -> pd.DataFrame:
    """
    Convert latitude/longitude to local Cartesian coordinates in kilometers.
    Uses the centroid as reference point if not specified.
    """
    df = df.copy()

    if reference_lat is None:
        reference_lat = df['latitude'].mean()
    if reference_lon is None:
        reference_lon = df['longitude'].mean()

    # Conversion factors (approximate, valid for small areas)
    km_per_deg_lon = 111.320 * np.cos(np.radians(reference_lat))
    km_per_deg_lat = 110.574

    df['x_km'] = (df['longitude'] - reference_lon) * km_per_deg_lon
    df['y_km'] = (df['latitude'] - reference_lat) * km_per_deg_lat

    return df

In [23]:
def assign_geographic_zone(lat: float, lon: float):
    """
    Assign geographic zone based on lat/lon coordinates.
    Returns zone index or -1 if outside all defined zones.
    """
    for i, (lat_max, lat_min, lon_max, lon_min) in enumerate(GEOGRAPHIC_ZONES):
        if lat_min <= lat <= lat_max and lon_min <= lon <= lon_max:
            return i
    return -1  # Outside all defined zones

## Core Data Loading and Cleaning

In [29]:
def load_and_clean_data(file_path: str) -> pd.DataFrame:
    """
    Load maritime data and perform basic cleaning operations.
    
    Args:
        file_path: Path to the parquet file
    
    Returns:
        Cleaned DataFrame with basic preprocessing applied
    """
    print("Loading data...")
    df = pd.read_parquet(file_path)
    
    print(f"Loaded {len(df):,} rows")
    
    # Remove specified problematic trips
    if DROP_TRIPS:
        n_dropped = len(df[df.trip_id.isin(DROP_TRIPS)])
        df = df[~df.trip_id.isin(DROP_TRIPS)].reset_index(drop=True)
        print(f"Dropped {n_dropped:,} rows from trips {DROP_TRIPS}")
    
    # Parse datetime columns
    datetime_cols = ["start_time", "end_time", "time_stamp"]
    for col in datetime_cols:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col])
    
    # Create standardized target variable
    if 'is_anomaly' in df.columns:
        df['target'] = df['is_anomaly'].astype('boolean')
    
    # Create route identifier
    if 'start_port' in df.columns:
        df['route_id'] = df['start_port']
    
    # Sort by trip and time for consistent processing
    df = df.sort_values(['trip_id', 'time_stamp']).reset_index(drop=True)
    
    print(f"Cleaned data: {len(df):,} rows, {df.trip_id.nunique()} trips")
    return df

## Feature Engineering Functions

In [30]:
def add_spatial_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add spatial features including zone assignment and coordinate conversion.
    """
    df = df.copy()
    
    # Assign geographic zones
    print("Assigning geographic zones...")
    df['zone'] = df.progress_apply(
        lambda row: assign_geographic_zone(row['latitude'], row['longitude']), 
        axis=1
    )
    
    # Convert to local Cartesian coordinates
    print("Converting to Cartesian coordinates...")
    df = lat_lon_to_km(df)
    
    return df


def add_change_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add features based on changes between consecutive points within each trip.
    """
    df = df.copy()
    
    print("Computing change-based features...")
    
    # Calculate differences within each trip
    for feature, new_name in [
        ('speed_over_ground', 'speed_change'),
        ('course_over_ground', 'course_change'),
        ('draught', 'draught_change')
    ]:
        if feature in df.columns:
            df[new_name] = df.groupby('trip_id')[feature].diff().abs().fillna(0)
    
    # Handle course change wraparound (0-360 degrees)
    if 'course_change' in df.columns:
        df['course_change'] = np.minimum(df['course_change'], 360 - df['course_change'])
    
    return df


def add_trip_context_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add features that provide context about the trip.
    """
    df = df.copy()
    
    print("Computing trip context features...")
    
    # Time since trip start
    if 'time_stamp' in df.columns:
        df['time_since_trip_start'] = (
            df.groupby('trip_id')['time_stamp']
            .transform(lambda x: (x - x.min()).dt.total_seconds() / 3600)  # Hours
        )
    
    # Progress along trip (0 to 1)
    trip_stats = df.groupby('trip_id').agg({
        'time_stamp': ['min', 'max', 'count']
    }).round(2)
    trip_stats.columns = ['trip_start', 'trip_end', 'trip_points']
    trip_stats['trip_duration_hours'] = (
        (trip_stats['trip_end'] - trip_stats['trip_start']).dt.total_seconds() / 3600
    )
    
    # Merge trip statistics back to main dataframe
    df = df.merge(trip_stats[['trip_duration_hours']], left_on='trip_id', right_index=True)
    
    # Calculate cumulative distance within trip
    def calc_cumulative_distance(group):
        group = group.sort_values('time_stamp')
        if len(group) < 2:
            group['cumulative_distance'] = 0
            group['progress_along_trip'] = 0
            return group
        
        distances = haversine_distance(
            group['latitude'].iloc[1:].values,
            group['longitude'].iloc[1:].values,
            group['latitude'].iloc[:-1].values,
            group['longitude'].iloc[:-1].values
        )
        
        group['cumulative_distance'] = np.concatenate([[0], np.cumsum(distances)])
        total_distance = group['cumulative_distance'].iloc[-1]
        
        if total_distance > 0:
            group['progress_along_trip'] = group['cumulative_distance'] / total_distance
        else:
            group['progress_along_trip'] = 0
        
        return group
    
    tqdm.pandas(desc="Computing trip distances")
    df = df.groupby('trip_id').progress_apply(calc_cumulative_distance).reset_index(drop=True)
    
    # Add trip distance to trip stats
    trip_distances = df.groupby('trip_id')['cumulative_distance'].max()
    df = df.merge(trip_distances.rename('trip_distance_km'), left_on='trip_id', right_index=True)
    
    return df


def add_rolling_features(df: pd.DataFrame, window: int = ROLLING_WINDOW) -> pd.DataFrame:
    """
    Add rolling statistical features.
    """
    df = df.copy()
    
    print(f"Computing rolling features (window={window})...")
    
    # Rolling statistics for speed
    if 'speed_over_ground' in df.columns:
        df['speed_rolling_mean'] = (
            df.groupby('trip_id')['speed_over_ground']
            .rolling(window=window, min_periods=1)
            .mean()
            .reset_index(0, drop=True)
        )
        
        df['speed_rolling_std'] = (
            df.groupby('trip_id')['speed_over_ground']
            .rolling(window=window, min_periods=1)
            .std()
            .reset_index(0, drop=True)
            .fillna(0)
        )
    
    return df

## Route-Specific Features

In [31]:
def compute_route_reference(df_route: pd.DataFrame, n_points: int = 100) -> np.ndarray:
    """
    Compute reference trajectory for a route by averaging all trips.
    """
    trajectories = []
    
    for trip_id, trip_data in df_route.groupby('trip_id'):
        trip_data = trip_data.sort_values('time_stamp')
        
        if len(trip_data) < 2:
            continue
            
        lat, lon = trip_data['latitude'].values, trip_data['longitude'].values
        
        # Calculate cumulative distance
        distances = haversine_distance(lat[1:], lon[1:], lat[:-1], lon[:-1])
        cum_dist = np.concatenate([[0], np.cumsum(distances)])
        
        if cum_dist[-1] <= 0:
            continue
        
        # Normalize to 0-1
        normalized_dist = cum_dist / cum_dist[-1]
        
        # Interpolate to fixed number of points
        target_positions = np.linspace(0, 1, n_points)
        interp_lat = np.interp(target_positions, normalized_dist, lat)
        interp_lon = np.interp(target_positions, normalized_dist, lon)
        
        trajectories.append(np.column_stack([interp_lat, interp_lon]))
    
    if not trajectories:
        return np.array([])
    
    # Average all trajectories
    return np.mean(trajectories, axis=0)


def add_route_deviation_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add features measuring deviation from typical route patterns.
    """
    df = df.copy()
    
    print("Computing route deviation features...")
    
    route_features = []
    
    for route_id in tqdm(df['route_id'].unique(), desc="Processing routes"):
        route_data = df[df['route_id'] == route_id].copy()
        
        # Compute reference trajectory
        reference_traj = compute_route_reference(route_data)
        
        if reference_traj.size == 0:
            route_data['deviation_from_avg_route'] = 0.0
            route_data['dist_to_route_center'] = 0.0
        else:
            # Calculate route center
            route_center_lat = route_data['latitude'].mean()
            route_center_lon = route_data['longitude'].mean()
            
            # Distance to route center
            route_data['dist_to_route_center'] = haversine_distance(
                route_data['latitude'].values,
                route_data['longitude'].values,
                np.full(len(route_data), route_center_lat),
                np.full(len(route_data), route_center_lon)
            )
            
            # Deviation from average route
            deviations = []
            for _, trip in route_data.groupby('trip_id'):
                trip = trip.sort_values('time_stamp')
                
                if len(trip) < 2:
                    deviations.extend([0.0] * len(trip))
                    continue
                
                # Calculate progress along trip
                lat, lon = trip['latitude'].values, trip['longitude'].values
                distances = haversine_distance(lat[1:], lon[1:], lat[:-1], lon[:-1])
                cum_dist = np.concatenate([[0], np.cumsum(distances)])
                
                if cum_dist[-1] > 0:
                    progress = cum_dist / cum_dist[-1]
                else:
                    progress = np.zeros(len(trip))
                
                # Find closest points on reference trajectory
                trip_deviations = []
                for i, p in enumerate(progress):
                    ref_idx = min(int(p * (len(reference_traj) - 1)), len(reference_traj) - 1)
                    deviation = haversine_distance(
                        lat[i], lon[i],
                        reference_traj[ref_idx, 0], reference_traj[ref_idx, 1]
                    )
                    trip_deviations.append(deviation)
                
                deviations.extend(trip_deviations)
            
            route_data['deviation_from_avg_route'] = deviations
        
        route_features.append(route_data)
    
    return pd.concat(route_features, ignore_index=True)

## Pipeline execution

In [33]:
input_path = "../data/cleaned/kiel_anomalies_labeled_2_fixed.parquet"
df = load_and_clean_data(input_path)

Loading data...
Loaded 535,273 rows
Dropped 0 rows from trips [10257]
Cleaned data: 535,273 rows, 423 trips
