# Code Implementation
This file contains the main pipeline for the project.

Additional helper functions and modules can be found under `src/`

## Download the data
The raw data is available from [aisdata.ais.dk/](http://aisdata.ais.dk/). \
For this notebook we use a subset of the data by using data only for 3 days: `2024-05-01`, `2024-05-02`, `2024-05-03`

In [1]:
import os
import requests
import zipfile
import io
from datetime import date, timedelta

def download_ais_data(from_date: date, to_date: date, destination_path: str):
    """Downloads and unzips AIS data for a given date range."""
    
    if not os.path.exists(destination_path):
        os.makedirs(destination_path)
        
    print(f"Starting AIS data download from {from_date} to {to_date} into {destination_path}")

    base_url = "http://aisdata.ais.dk/"
    current_date = from_date
    
    errors = []
    successes = 0
    while current_date <= to_date:
        year = current_date.strftime("%Y")
        month = current_date.strftime("%m")
        day = current_date.strftime("%d")
        
        file_name = f"aisdk-{year}-{month}-{day}.zip" # Construct the file name and URL
        file_url = f"{base_url}{year}/{file_name}"
        
        print(f"Downloading: {file_url}")
        
        try:
            response = requests.get(file_url, stream=True)
            if response.status_code == 200:
                with io.BytesIO(response.content) as zip_buffer:
                    with zipfile.ZipFile(zip_buffer, 'r') as zip_ref:
                        zip_ref.extractall(destination_path)
                        unzipped_files = zip_ref.namelist()
                        assert len(unzipped_files) == 1, "Expected exactly one file in the zip archive."
                        successes += 1
                        
            elif response.status_code == 404:
                print(f"Data not found for {current_date} (404 Error). Skipping.")
                errors.append((current_date, "404 Not Found"))
            else:
                print(f"Failed to download {file_name}. Status code: {response.status_code}")
                errors.append((current_date, f"HTTP {response.status_code}"))
                
        except requests.exceptions.RequestException as e:
            print(f"An error occurred during download for {current_date}: {e}")
            errors.append((current_date, str(e)))
        except zipfile.BadZipFile:
            print(f"Downloaded file for {current_date} is not a valid zip file.")
            errors.append((current_date, "Bad Zip File"))
        except AssertionError as ae:
            print(f"Assertion error for {current_date}: {ae}")
            errors.append((current_date, str(ae)))
        
        current_date += timedelta(days=1)

    if len(errors) == 0:
        print("\nAll files downloaded successfully.")
    else:
        print(f"\nDownload succeeded for {successes}/{(successes + len(errors))} days.")
        print(f"Errors encountered for the following dates:")
        for err_date, err_msg in errors:
            print(f" - {err_date}: {err_msg}")
    print("End of download process.")
    
download_ais_data(date(2024, 5, 1), date(2024, 5, 3), 'data/ais/')

Starting AIS data download from 2024-05-01 to 2024-05-03 into data/ais/
Downloading: http://aisdata.ais.dk/2024/aisdk-2024-05-01.zip
Downloading: http://aisdata.ais.dk/2024/aisdk-2024-05-02.zip
Downloading: http://aisdata.ais.dk/2024/aisdk-2024-05-03.zip

All files downloaded successfully.
End of download process.


## Data preprocessing using MapReduce
The data used in this project is maritime data from automatic identification systems (AIS) obtained obtained from the [Danish Maritime Authority](http://aisdata.ais.dk/). The data is available as a csv file for each day and contains a row for each AIS message with columns such as **Timestamp**, **MMSI**, **Latitude**, **Longitude**, and more. MMSI stands for Maritime Mobile Service Identity and is a unique identifier for a vessel.

Uncompressed, the data for a single day takes up around 3GB of memory and we wish to process 3 months worth of data leading to an infeasible amount of data to keep in memory at one time. However, since the data is time series data and vessel voyages often spans across days, in order to properly preprocess the data we can't process the files in isolation. Secondly, we wish to speed up the wall clock time of preprocessing by efficiently utilizing parallel processing on multiple CPU's running on DTU's High Performance Computing (HPC) cluster. This is where MapReduce comes in.

### Split

The preprocessing script is adapted from [CIA-Oceanix/GeoTrackNet](https://github.com/CIA-Oceanix/GeoTrackNet) and first converts each CSV file individually to dictionaries of arrays grouped by MMSI. The grouped dictionaries are saved as pickle files in a temporary directory. This is the split part of MapReduce. A simplified code this is presented below.

In [1]:
import numpy as np
import os
import pickle
import time
import polars as pl
import argparse
from tqdm import tqdm
from collections import defaultdict
import joblib
from src.preprocessing.preprocessing import LON_MIN, LON_MAX, LAT_MIN, LAT_MAX, SPEED_MAX as SOG_MAX
from src.preprocessing.csv2pkl import SHIP_TYPE_MAP, NAV_STT_MAP

# Define column indices
LAT, LON, SOG, COG, HEADING, ROT, NAV_STT, TIMESTAMP, MMSI, SHIPTYPE  = list(range(10))

def csv2pkl(input_dir="data/files/",
            output_dir="data/pickle_files"):
    
    global LON_MIN, LON_MAX, LAT_MIN, LAT_MAX, SOG_MAX, SHIP_TYPE_MAP, NAV_STT_MAP
      
    l_csv_filename = [filename for filename in os.listdir(input_dir) if filename.endswith('.csv')]
    print(f"Found {len(l_csv_filename)} CSV files in {input_dir}.")
    
    os.makedirs(output_dir, exist_ok=True)
    
    results = {file_name: {"total_messages": 0, "filtered_messages": 0} for file_name in l_csv_filename}

    messages_processed = 0
    unique_vessels = set()
    for csv_filename in tqdm(l_csv_filename, desc=f'Reading csvs'):
        try:
            t_date_str = '-'.join(csv_filename.split('.')[0].split('-')[1:4])
            t_min = time.mktime(time.strptime(t_date_str + ' 00:00:00', "%Y-%m-%d %H:%M:%S"))
            t_max = time.mktime(time.strptime(t_date_str + ' 23:59:59', "%Y-%m-%d %H:%M:%S"))
            
            # Lazy load data using Polars
            lf = pl.scan_csv(os.path.join(input_dir, csv_filename),
                            schema_overrides={
                                "# Timestamp": pl.Utf8,
                                "MMSI": pl.Int64,
                                "Latitude": pl.Float64,
                                "Longitude": pl.Float64,
                                "Navigational status": pl.Utf8,
                                "ROT": pl.Float64,
                                "SOG": pl.Float64,
                                "COG": pl.Float64,
                                "Heading": pl.Int64,
                                "Ship type": pl.Utf8
                            })
            total_messages = lf.select(pl.len()).collect()[0,0]
            messages_processed += total_messages
            results[csv_filename]["total_messages"] = total_messages

            lf = (
                lf.with_columns(
                    pl.col("# Timestamp").str.to_datetime("%d/%m/%Y %H:%M:%S").dt.epoch("s").alias("Timestamp"), # Convert to UNIX timestamp
                    pl.col("Navigational status").replace_strict(NAV_STT_MAP, default=15) # Map navigational status to integers
                ).filter(
                    (pl.col("Latitude") >= LAT_MIN) & (pl.col("Latitude") <= LAT_MAX) &
                    (pl.col("Longitude") >= LON_MIN) & (pl.col("Longitude") <= LON_MAX) &
                    (pl.col("SOG") >= 0) & (pl.col("SOG") <= SOG_MAX) &
                    (pl.col("COG") >= 0) & (pl.col("COG") <= 360) &
                    (pl.col("Timestamp") >= t_min) & (pl.col("Timestamp") <= t_max)
                ).select( # Select only the 9 columns needed for the track + ship type
                    pl.col("Latitude"),
                    pl.col("Longitude"),
                    pl.col("SOG"),
                    pl.col("COG"),
                    pl.col("Heading"),
                    pl.col("ROT"),
                    pl.col("Navigational status"),
                    pl.col("Timestamp"),
                    pl.col("MMSI"),
                    pl.col("Ship type")
                )
            )
                    
            ### Vessel Type Mapping
            vessel_type_dir = os.path.join(output_dir, "vessel_types")
            os.makedirs(vessel_type_dir, exist_ok=True)
            vt_df = (
                lf.with_columns(
                    pl.col("Ship type").replace_strict(SHIP_TYPE_MAP, default=0)
                )
                .filter(pl.col("Ship type") != 0) # "Undefined"
                .group_by("MMSI")
                .agg(
                    pl.col("Ship type").mode().first().alias("VesselType")  # If multiple use the most frequent type
                )
                .collect()
            )
            
            unique_vessels.update(vt_df["MMSI"].to_list())
            
            # Save vessel types mapping
            VesselTypes = {row[0]: row[1] for row in vt_df.iter_rows()}
            vt_output_filename = csv_filename.replace('csv', 'pkl')
            with open(os.path.join(vessel_type_dir, vt_output_filename), "wb") as f:
                pickle.dump(VesselTypes, f)
                
            df = lf.drop("Ship type").collect() # Ship type column no longer needed
            results[csv_filename]["filtered_messages"] = df.height
            
            # Build tracks
            Vs_list = defaultdict(list)
            for row_tuple in tqdm(df.iter_rows(named=False), total=len(df), desc="Building track lists..."):
                mmsi = row_tuple[MMSI] 
                Vs_list[mmsi].append(row_tuple)
                
            del df # Free memory
            
            Vs = {} # Final dictionary
            for mmsi, track_list in tqdm(Vs_list.items(), desc="Sorting and converting to NumPy..."):
                track_list.sort(key=lambda x: x[TIMESTAMP])
                Vs[mmsi] = np.array(track_list, dtype=np.float64)

            del Vs_list # Free memory
            
            output_filename = csv_filename.replace('csv', 'pkl') 
            output_path = os.path.join(output_dir, output_filename)
            joblib.dump(Vs, output_path, compress=3)

            del Vs  # Free memory
    
        except Exception as e:
            print(f"Error processing file {csv_filename}: {e}")
        
    print("Conversion completed.")
    
    total_messages = sum(info["total_messages"] for info in results.values())
    total_filtered = sum(info["filtered_messages"] for info in results.values())
    print(f"Total messages processed: {total_messages}")
    print(f"Total messages after filtering: {total_filtered}")
    print(f"Total unique vessels: {len(unique_vessels)}")
    
csv2pkl(input_dir="data/ais/", output_dir="data/ais/pickle_files/")

Found 3 CSV files in data/ais/.


Building track lists...: 100%|██████████| 15464605/15464605 [00:16<00:00, 943790.46it/s]
Sorting and converting to NumPy...:  22%|██▏       | 878/3963 [00:16<00:58, 52.62it/s]
Reading csvs:  33%|███▎      | 1/3 [00:48<01:37, 48.52s/it]

Error processing file aisdk-2024-05-03.csv: Unable to allocate 1.56 MiB for an array with shape (22770, 9) and data type float64


Reading csvs:  33%|███▎      | 1/3 [01:00<02:01, 60.60s/it]

Unexpected exception formatting exception. Falling back to standard exception



Traceback (most recent call last):
  File "/zhome/ea/6/187439/computational-tools-project/.venv/lib/python3.11/site-packages/IPython/core/interactiveshell.py", line 3699, in run_code
  File "/tmp/ipykernel_719712/705180081.py", line 136, in <module>
  File "/tmp/ipykernel_719712/705180081.py", line 50, in csv2pkl
  File "/zhome/ea/6/187439/computational-tools-project/.venv/lib/python3.11/site-packages/polars/_utils/deprecation.py", line 97, in wrapper
  File "/zhome/ea/6/187439/computational-tools-project/.venv/lib/python3.11/site-packages/polars/lazyframe/opt_flags.py", line 328, in wrapper
  File "/zhome/ea/6/187439/computational-tools-project/.venv/lib/python3.11/site-packages/polars/lazyframe/frame.py", line 2422, in collect
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/zhome/ea/6/187439/computational-tools-project/.venv/lib/python3.11/site-packages/IPython/core/interactiveshell.py", line 2194, i

In [3]:
assert 1==2 # Dont run

AssertionError: 

### Mapping and shuffling
Now that the full dataset has been chunked (split) we map each item (trajectory) based on MMSI to a MMSI directory ready for preprocessing (reduction).

The resulting temporary directory has the structure:\
```
data/
└── temp_dir/
    ├── 123456789/                      # MMSI (unique vessel identifier)
    │   ├── chunk_0001.pkl              # Segment(s) from input_dir
    │   ├── chunk_0002.pkl
    │
    ├── 987654321/
    │   ├── chunk_0001.pkl
    │
    └── ...                             # One folder per MMSI
```

In [None]:
def map_and_shuffle(input_dir: str, temp_dir: str):
    """ Goes through all input files and re-sorts them by MMSI into a temporary directory. """
    
    # Input files from chunking step
    input_files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.endswith(".pkl")]

    for file_path in input_files:
        with open(file_path, "rb") as f:
            data_dict = pickle.load(f)
            
            for mmsi, track_segment in data_dict.items():
                
                # Create a directory for this specific MMSI
                mmsi_dir = os.path.join(temp_dir, str(mmsi))
                os.makedirs(mmsi_dir, exist_ok=True)
                
                # Save this segment into the MMSI's folder
                # We name it after the original file to avoid collisions
                segment_filename = os.path.basename(file_path)
                output_path = os.path.join(mmsi_dir, segment_filename)
                
                with open(output_path, "wb") as out_f:
                    pickle.dump(track_segment, out_f)
                    
temp_dir = 'data/temp_mapped'
map_and_shuffle(input_dir=output_dir, temp_dir=temp_dir)

### Reduce
In the final step of the MapReduce algorithm, the reduction step, we apply preprocessing of the vessel trajectories. As we consider vessels' trajectories as independent from each other, and we have split and shuffled the trajectories by MMSI in the previous step, we are able to perform this step in parallel. 

The preprocessing includes identifying a vessels "voyages". We define a voyage as a contiguous sequence of AIS messages from the same vessel (possible across days), where the time interval between any two consecutive messages does not exceed two hours, and the vessel is actively moving (i.e., not moored or at anchor). See [D. Nguyen, R. Fablet](https://arxiv.org/pdf/2109.03958) for the full preprocessing rules implemented.

The folder structure for the finally preprocessed files will look like:
```
final_processed/
├── 123456789_0_processed.pkl            # Processed trajectory for MMSI 123456789 (segment 0)
├── 123456789_1_processed.pkl            # (if multiple processed trajectories exist for same MMSI)
├── 987654321_0_processed.pkl
├── 987654321_1_processed.pkl
└── ...
```
where each pickle file constitutes one sample.

In [None]:
from src.preprocessing.preprocessing import preprocess_mmsi_track

def process_single_mmsi(mmsi_info):
    """ Wrapper to unpack arguments for multiprocessing."""
    mmsi, mmsi_dir_path, final_dir = mmsi_info
    
    # Load all segments for this MMSI
    all_segments = []
    segment_files = [f for f in os.listdir(mmsi_dir_path) if f.endswith(".pkl") and not f.startswith("vessel_types_")]
    if not segment_files:
        return None
    for seg_file in segment_files:
            segment_path = os.path.join(mmsi_dir_path, seg_file)
            with open(segment_path, "rb") as f:
                track_segment = pickle.load(f)
                all_segments.append(track_segment)
    
    # Merge into one track
    try:
        full_track = np.concatenate(all_segments, axis=0)
    except ValueError:
        print(f"    MMSI {mmsi}: Error concatenating. Skipping.")
        return None

    # Run processing for single MMSI's track
    processed_data = preprocess_mmsi_track(mmsi, full_track)
    
    # Save final result
    if processed_data:
        for k, traj in processed_data.items():
            final_output_path = os.path.join(final_dir, f"{mmsi}_{k}_processed.pkl")
            data_item = {'mmsi': mmsi, 'traj': traj}
            with open(final_output_path, "wb") as f:
                pickle.dump(data_item, f)
        return True
    return None
    
def reduce(final_dir: str, temp_dir: str,  n_workers: int = None):
    """
    Preprocess vessel trajectories by MMSI in parallel.
    """
    os.makedirs(final_dir, exist_ok=True)
    
    mmsi_folders = os.listdir(temp_dir)
    
    # Prepare list of (mmsi, path, output_dir) tuples for parallel processing
    mmsi_tasks = []
    for mmsi in mmsi_folders:
        mmsi_dir_path = os.path.join(temp_dir, mmsi)
        if os.path.isdir(mmsi_dir_path):
            mmsi_tasks.append((mmsi, mmsi_dir_path, final_dir))
    
    # Process in parallel
    with Pool(processes=n_workers) as pool:
        results = [pool.imap_unordered(process_single_mmsi, mmsi_tasks)]
        
num_workers = cpu_count() - 1  # Leave 1 core free
final_dir = 'data/final_processed'
reduce(final_dir=final_dir, temp_dir=temp_dir, n_workers=num_workers)

### Combine vessel_types and cleanup temporary files

In [None]:
import shutil

vessel_types_combined = dict()
vessel_type_files = [f for f in os.listdir(vessel_type_dir) if f.startswith("vessel_types_") and f.endswith(".pkl")]
for vt_file in vessel_type_files:
    vt_path = os.path.join(vessel_type_dir, vt_file)
    with open(vt_path, "rb") as f:
        vt_mapping = pickle.load(f)
        vessel_types_combined.update(vt_mapping) # In case of conflicts, later files overwrite earlier ones
    os.remove(vt_path)
combined_vt_path = os.path.join(final_dir, "vessel_types.pkl")
with open(combined_vt_path, "wb") as f:
    pickle.dump(vessel_types_combined, f)
    
shutil.rmtree(temp_dir)

## Clustering
TODO

## Something "new"
TODO

## References
D. Nguyen, R. Fablet. "A Transformer Network With Sparse Augmented Data Representation and Cross Entropy Loss for AIS-Based Vessel Trajectory Prediction," in IEEE Access, vol. 12, pp. 21596–21609, 2024.