In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, col, sin, cos, acos, length, radians, lit, when, first
from pyspark.sql.types import DoubleType
import os
import math

spark = SparkSession.builder.appName("ShipDataProcessing").getOrCreate()

file_directory = "C:/Users/User/Desktop/data_folder/raw_data/"
file_names = [f"aisdk-2021-12-{str(day).zfill(2)}.csv" for day in range(1, 32)]  # For files from Dec 1 to Dec 31

for file_name in file_names:
    file_path = os.path.join(file_directory, file_name)
    
    df = spark.read.csv(file_path, header=True)
    
    df = df.withColumnRenamed('# Timestamp', 'timestamp')\
           .withColumnRenamed('Latitude', 'lat')\
           .withColumnRenamed('Longitude', 'long')\
           .withColumnRenamed('Navigational status', 'nav_st')\
           .withColumnRenamed('Type of mobile', 'type_of_mobile')\
           .withColumnRenamed('Ship type', 'ship_type')\
           .withColumn('timestamp', to_timestamp(col('timestamp'), 'dd/MM/yyyy HH:mm:ss'))\
           .withColumn('lat', col('lat').cast(DoubleType()))\
           .withColumn('long', col('long').cast(DoubleType()))\
           .withColumn('SOG', col('SOG').cast(DoubleType()))
    
    df = df.filter(~df['nav_st'].isin(['At anchor', 'Moored', 'Engaged in fishing', 'Unknown value']))\
        .filter(col('SOG').isNotNull())\
        .filter(col('SOG') != 0)\
        .filter(col('SOG') < 58)\
        .filter(col('ship_type') != 'Undefined')\
        .filter(col('ship_type') != 'Law enforcement')\
        .filter(col('type_of_mobile') != 'SAR airborne')\
        .filter(col('MMSI') != 111219507)
    
    df = df.filter(
        (
            (col('type_of_mobile') == 'Class A') & (length(col('MMSI')) == 9)
        ) | (
            (col('type_of_mobile') == 'Class B') & (length(col('MMSI')) == 9)
        ) | (
            (col('type_of_mobile') == 'Base Station') & (length(col('MMSI')) == 7)
        )
    ).filter(
        (length(col('IMO')) <= 7) | col('IMO').isNull()
    )

    reduced_df = df.select("timestamp", "MMSI", "lat", "long", "IMO", "type_of_mobile")
    
    latitude_center = 55.225000
    longitude_center = 14.245000
    radius_km = 25

    lat_center_rad = math.radians(latitude_center)
    long_center_rad = math.radians(longitude_center)

    filtered_df = reduced_df.withColumn(
    "distance_km",
    acos(
        sin(radians(col("lat"))) * sin(lit(lat_center_rad)) +
        cos(radians(col("lat"))) * cos(lit(lat_center_rad)) *
        cos(lit(long_center_rad) - radians(col("long")))
    ) * 6371  # Earth's radius in km
    ).filter(col("distance_km") <= radius_km)

    filtered_df = filtered_df.drop("distance_km")

    filtered_df = filtered_df.dropDuplicates(['timestamp', 'MMSI'])

    output_path = f"C:/Users/User/Desktop/data_folder/manipulated_data/{file_name.replace('.csv', '')}_processed"
    filtered_df.write.csv(output_path, mode="overwrite", header=True)

    print(f"Processed and saved data for {file_name}  | rows count = {filtered_df.count()}")


Processed and saved data for aisdk-2021-12-01.csv  | rows count = 114294
Processed and saved data for aisdk-2021-12-02.csv  | rows count = 118868
Processed and saved data for aisdk-2021-12-03.csv  | rows count = 153660
Processed and saved data for aisdk-2021-12-04.csv  | rows count = 164851


In [None]:
import pandas as pd
import numpy as np
import glob
from itertools import combinations
from pathlib import Path
from haversine import haversine  
import logging

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Constants
BASE_DIRECTORY = Path("C:/Users/User/Desktop/data_folder/manipulated_data/")
WINDOW_SIZE_MINUTES = 20  # 20 minutes window
MIN_REPORTS_PER_MINUTE = 1  # Minimum required frequency of reports

def generate_directories(base_directory, year, month):
    """Generate directory names for each day of a given month."""
    days_in_month = (Path(f"{base_directory}/aisdk-{year}-{month:02d}-{day:02d}_processed") 
                     for day in range(1, 32))
    return [d for d in days_in_month if d.exists()]

def read_and_process_csvs(directory):
    """Read and process CSV files in the given directory."""
    csv_files = glob.glob(str(directory / "*.csv"))
    
    if not csv_files:
        logging.warning(f"No CSV files found in {directory}")
        return None
    
    df = pd.concat((pd.read_csv(f, usecols=['timestamp', 'MMSI', 'lat', 'long']) for f in csv_files), ignore_index=True)
    
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df.set_index('timestamp', inplace=True)
    df.sort_index(inplace=True)
    
    numeric_cols = df.select_dtypes(include=[np.number])
    resampled = numeric_cols.groupby('MMSI').resample('1T').mean()

    resampled.dropna(inplace=True)
    
    if 'MMSI' in resampled.columns:
            resampled.drop(columns=['MMSI'], inplace=True)

    resampled.reset_index(inplace=True)
    return resampled

def filter_by_reporting_frequency(df, window_size_minutes, min_reports_per_minute):
    """Filter out data for vessels that report less frequently than the minimum required frequency within each 20-minute window."""
    window_size_seconds = window_size_minutes * 60
    min_reports_in_window = window_size_minutes * min_reports_per_minute

    filtered_df = pd.DataFrame()

    for mmsi, group in df.groupby('MMSI'):
        group = group.sort_values('timestamp')

        group['count_in_window'] = group.rolling(f'{window_size_seconds}S', on='timestamp')['timestamp'].count()

        consistent_group = group[group['count_in_window'] >= min_reports_in_window]

        filtered_df = pd.concat([filtered_df, consistent_group])

    filtered_df = filtered_df.drop(columns=['count_in_window'])

    return filtered_df

def find_closest_vessels(resampled):
    """Find the closest pair of vessels at each timestamp."""
    min_distance = float('inf')
    closest_pair = None
    closest_time = None
    collision_coords = None  
    
    unique_times = resampled['timestamp'].unique()
    
    for time in unique_times:
        subframe = resampled[resampled['timestamp'] == time]
        for (idx1, row1), (idx2, row2) in combinations(subframe.iterrows(), 2):
            dist = haversine((row1['lat'], row1['long']), (row2['lat'], row2['long']))
            if dist < min_distance:
                min_distance = dist
                closest_pair = (row1['MMSI'], row2['MMSI'])
                closest_time = time
                collision_coords = (
                    (row1['lat'], row1['long']),
                    (row2['lat'], row2['long'])
                )

    return min_distance, closest_pair, closest_time, collision_coords

def process_directory(directory):
    """Process each directory to find the closest vessels."""
    resampled_data = read_and_process_csvs(directory)
    
    if resampled_data is not None:
        consistent_data = filter_by_reporting_frequency(resampled_data, WINDOW_SIZE_MINUTES, MIN_REPORTS_PER_MINUTE)
        
        if consistent_data.empty:
            logging.info(f"No consistent data found for directory: {directory}")
            return None
        
        min_distance, closest_pair, closest_time, collision_coords = find_closest_vessels(consistent_data)
        
        if min_distance < float('inf'):
            print(f"Closest vessels were {closest_pair} with a distance of {min_distance:.3f} km at {collision_coords} at {closest_time}")
            return min_distance, closest_pair, closest_time, collision_coords
        else:
            logging.info(f"No close encounters found for directory: {directory}")
            return None
    else:
        logging.info(f"No data processed for directory: {directory}")
        return None

In [None]:

year = 2021
month = 12

overall_min_distance = float('inf')
overall_closest_pair = None
overall_closest_time = None
overall_day = None

directories = generate_directories(BASE_DIRECTORY, year, month)

for directory in directories:
    logging.info(f"Processing directory: {directory}")
    
    result = process_directory(directory)
    if result is not None:
        min_distance, closest_pair, closest_time, collision_coords = result
        if min_distance < overall_min_distance:
            overall_min_distance = min_distance
            overall_closest_pair = closest_pair
            overall_closest_time = closest_time
            overall_day = directory.name

print('------------------------------------------------------------------------------------------------------------------------------------------')
print(f"During the whole month closest vessels were {overall_closest_pair} with a distance of {overall_min_distance:.3f} km at {collision_coords} at {overall_closest_time}")

In [None]:
from datetime import datetime, timedelta
import matplotlib.pyplot as plt

window_size_minutes = 20  # 20 minutes window
min_reports_per_minute = 1  # Minimum required frequency of reports

window_size_seconds = window_size_minutes * 60
min_reports_in_window = window_size_minutes * min_reports_per_minute

collision_time_str = "2021-12-13 06:18:00+02:00"
collision_time = pd.to_datetime(collision_time_str)

start_time = collision_time - timedelta(minutes=400)
end_time = collision_time

path = 'C:/Users/User/Desktop/data_folder/manipulated_data/aisdk-2021-12-13_processed'
all_files = glob.glob(path + "/part*.csv")
mmsi_list = [219001468, 219019015]


df = pd.concat((pd.read_csv(f) for f in all_files), ignore_index=True)
    
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df[df['MMSI'].isin(mmsi_list)]
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)

numeric_cols = df.select_dtypes(include=[np.number])
resampled = numeric_cols.groupby('MMSI').resample('1T').mean()
resampled.dropna(inplace=True)

if 'MMSI' in resampled.columns:
        resampled.drop(columns=['MMSI'], inplace=True)

resampled.reset_index(inplace=True)

filtered_df = pd.DataFrame()

for mmsi, group in resampled.groupby('MMSI'):
    group = group.sort_values('timestamp')
    group['count_in_window'] = group.rolling(f'{window_size_seconds}S', on='timestamp')['timestamp'].count()
    consistent_group = group[group['count_in_window'] >= min_reports_in_window]
    filtered_df = pd.concat([filtered_df, consistent_group])
    filtered_df = filtered_df.drop(columns=['count_in_window'])

filtered_df = resampled[(resampled['timestamp'] >= start_time) & (resampled['timestamp'] <= end_time)]

for mmsi in mmsi_list:
    mmsi_df = filtered_df[filtered_df['MMSI'] == mmsi].sort_values(by='timestamp')
    plt.plot(mmsi_df['long'], mmsi_df['lat'], marker='o', label=f'MMSI {mmsi}')

plt.title('Ship Trajectories 20 Minutes Before Closest point')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.legend()
plt.grid(True)

plt.show()