In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_timestamp, count, radians, sin, cos, sqrt, asin, lag, sum as _sum, round as spark_round, count_distinct, count
)
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType
import os
import findspark
import folium
import pandas
from sklearn.cluster import DBSCAN
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Staring up pyspark, based on the individual setup
# os.environ["SPARK_LOCAL_IP"] = "IP"
# os.environ["PYSPARK_PYTHON"] = r"C:\.....\pyspark_env\python.exe"
# os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\....\pyspark_env\python.exe"

findspark.init()
spark = (
    SparkSession.builder
    .master("local[8]")
    .appName("PortDetectionTask")
    .getOrCreate()
    )

# Read ship data
df = spark.read.csv("aisdk-2025-04-20.csv",
    header=True,
    inferSchema=True
)

# Define functions used for this task
def clean_data(df):
    '''
    Takes a dataframe, returns a cleaned one.

    Only take useful columns, drop NA values, cast to appopriate data types.
    Remove ships with non-standart coordinate types, helicopters.
    '''
    # Some standard cleaning
    df_clean = (
        df.select("MMSI", "# Timestamp", "Latitude", "Longitude", "SOG", "Ship type")
        .dropna(subset=["MMSI", "# Timestamp", "Latitude", "Longitude", "SOG"])
        .withColumn("Latitude", col("Latitude").cast(DoubleType()))
        .withColumn("Longitude", col("Longitude").cast(DoubleType()))
        .withColumn("SOG", col("SOG").cast(DoubleType()))
        .withColumn("Timestamp", to_timestamp(col("# Timestamp"), "dd/MM/yyyy HH:mm:ss"))
    )

    # Cleaning up the ships with weird coordinates
    df_clean = df_clean.filter(
        (col("Latitude").between(-90, 90)) &
        (col("Longitude").between(-180, 180))
    )

    # Remove search and rescue helicopters from the analysis
    df_clean = df_clean.filter(
        col("Ship type") != "SAR"
    )

    return df_clean


def filter_moving_ships(df_clean, sog_filter=0.5, sog_number_filter=5, total_distance_filter=10.0):
    '''
    Takes a dataframe and filtering parameters, returns a filtered one of stationary vessels.

    Remove vessels based on these two conditions:
        1. Moving vessels that send speed of 0.5 or higher for more than 5 times.
        2. Ones that have traveled for more than 10 kilometers in the given day.
    '''
    # Ships that are sending SOG >= (0.5) for more than (5) time are excluded.
    low_speed_df = df_clean.filter(col("SOG") < sog_filter)

    dwelling_ships = (
        low_speed_df.groupBy("MMSI")
                    .agg(count("*").alias("low_speed_msgs"))
                    .filter(col("low_speed_msgs") >= sog_number_filter)
    )

    slow_moving_df = (
        low_speed_df.join(dwelling_ships, on="MMSI", how="inner")
    )

    # Display the slow ships
    total_slow_moving_ships = slow_moving_df.select("MMSI").distinct().count()
    print(f"Total stationary/drifting ships: {total_slow_moving_ships}")

    slow_moving_df.show(5, truncate=False)

    # Now distance filtering is performed
    low_speed_filtered = slow_moving_df

    # Window.partition function for previous coordinates
    windowSpec = Window.partitionBy("MMSI").orderBy("Timestamp")

    df_distance = (
        low_speed_filtered.withColumn("prev_lat", lag("Latitude").over(windowSpec))
                        .withColumn("prev_lon", lag("Longitude").over(windowSpec))
                        .dropna(subset=["prev_lat", "prev_lon"])
    )

    # Convert to radians for distance calculation
    df_distance = (
        df_distance.withColumn("lat1", radians(col("prev_lat")))
                .withColumn("lon1", radians(col("prev_lon")))
                .withColumn("lat2", radians(col("Latitude")))
                .withColumn("lon2", radians(col("Longitude")))
    )


    # Calculate haversine distance in pyspark SQL
    df_distance = (
        df_distance.withColumn("dlat", col("lat2") - col("lat1"))
                .withColumn("dlon", col("lon2") - col("lon1"))
                .withColumn("a", sin(col("dlat") / 2) ** 2 +
                                    cos(col("lat1")) * cos(col("lat2")) *
                                    sin(col("dlon") / 2) ** 2)
                .withColumn("c", 2 * asin(sqrt(col("a"))))
                .withColumn("segment_km", col("c") * 6371) 
    )

    # Total distance traveled by MMSI
    total_distance = (
        df_distance.groupBy("MMSI")
                .agg(_sum("segment_km").alias("total_distance_km"))
    )

    # Filter ships that traveled more than (10)km
    stationary_ships = total_distance.filter(col("total_distance_km") < total_distance_filter)

    # Join the filtered low speed ships and the filtered small distance traveled ships
    df_filtered = low_speed_filtered.join(stationary_ships, on="MMSI", how="inner")

    return df_filtered


In [3]:
# Filtering results
df_clean = clean_data(df)
df_filtered = filter_moving_ships(df_clean)

total_stationary_ships = df_filtered.select("MMSI").distinct().count()
print(f"Total stationary/drifting ships: {total_stationary_ships}")

df_filtered.show(5, truncate=False)

Total stationary/drifting ships: 3078
+---------+-------------------+---------+---------+---+---------+-------------------+--------------+
|MMSI     |# Timestamp        |Latitude |Longitude|SOG|Ship type|Timestamp          |low_speed_msgs|
+---------+-------------------+---------+---------+---+---------+-------------------+--------------+
|219024194|20/04/2025 00:00:01|54.995438|11.874798|0.0|Undefined|2025-04-20 00:00:01|11417         |
|246389000|20/04/2025 00:00:02|54.994648|11.87351 |0.0|Undefined|2025-04-20 00:00:02|15552         |
|246389000|20/04/2025 00:00:02|54.994648|11.87351 |0.0|Undefined|2025-04-20 00:00:02|15552         |
|257476500|20/04/2025 00:00:03|56.701   |8.21937  |0.0|Undefined|2025-04-20 00:00:03|16294         |
|257476500|20/04/2025 00:00:03|56.701   |8.21937  |0.0|Undefined|2025-04-20 00:00:03|16294         |
+---------+-------------------+---------+---------+---+---------+-------------------+--------------+
only showing top 5 rows

Total stationary/drifting sh

In [6]:
df_filtered.groupBy("Ship type").count().orderBy("count", ascending=False).show()

+---------------+-------+
|      Ship type|  count|
+---------------+-------+
|        Fishing|2463211|
|          Other| 374638|
|          Cargo| 348770|
|       Pleasure| 346419|
|            Tug| 274958|
|          Pilot| 252789|
|        Sailing| 251809|
|      Passenger| 245374|
|       Dredging| 190984|
|      Undefined| 165755|
|         Tanker| 132113|
|       Military|  80599|
|       Reserved|  70828|
|            HSC|  56624|
|    Port tender|  33997|
|         Towing|  25141|
|Law enforcement|  17229|
|        Medical|   8499|
|         Diving|   7582|
| Anti-pollution|   2370|
+---------------+-------+
only showing top 20 rows



In [None]:
def bin_ships(df, filter_ships=5):
    '''
    Bin ships to clusters based on lattidute and longitude.
    Grid size is approximately 0.64 x 1.11km rectangles (roughly 0.7km squared), may differ depending on latidute and longitude.
    '''
    # Round of the coordinates to create bins
    binned_df = (
        df.withColumn("lat_bin", spark_round(col("Latitude"), 2))
        .withColumn("lon_bin", spark_round(col("Longitude"), 2))
    )

    # Assing vessel count to bins and filter small potential ports
    port_candidates = (
        binned_df.groupBy("lat_bin", "lon_bin")
                .agg(count_distinct("MMSI").alias("vessel_count"))
                .filter(col("vessel_count") >= filter_ships)
    )
    
    # Print out the biggest ports
    port_candidates.orderBy(col("vessel_count").desc()).show(5, truncate=False)
    
    return port_candidates

port_candidates = bin_ships(df_filtered)
port_candidates.show(5, truncate=False)

+-------+-------+------------+
|lat_bin|lon_bin|vessel_count|
+-------+-------+------------+
|57.59  |9.96   |56          |
|57.12  |8.6    |44          |
|56.7   |8.22   |43          |
|54.38  |10.98  |40          |
|57.72  |10.59  |34          |
|56.13  |12.31  |28          |
|55.06  |10.62  |22          |
|54.81  |9.45   |21          |
|56.41  |10.92  |21          |
|57.06  |9.9    |19          |
|57.49  |10.5   |18          |
|55.1   |14.69  |18          |
|54.85  |10.52  |17          |
|55.33  |11.13  |16          |
|54.67  |9.94   |16          |
|55.47  |8.42   |16          |
|55.69  |12.62  |16          |
|57.49  |10.51  |15          |
|55.19  |14.7   |15          |
|55.59  |12.68  |14          |
+-------+-------+------------+
only showing top 20 rows

+-------+-------+------------+
|lat_bin|lon_bin|vessel_count|
+-------+-------+------------+
|54.47  |9.04   |9           |
|54.91  |9.6    |6           |
|57.59  |9.95   |8           |
|54.91  |9.89   |8           |
|54.18  |12.1

In [7]:
port_candidates.count()

144

In [28]:

# Simple scikit-learn clustering is used, as we have only 144 port candidates and parallel processing would not improve computing speed
def cluster_dbscan(df, eps=0.1, min_samples=1):
    '''
    Create dbscan clusters based on coordinate bins, eps of 0.1 ~ 11km
    '''

    pandas_df = df.toPandas()
    coordinates = pandas_df[['lat_bin', 'lon_bin']].values

    dbscan = DBSCAN(eps=eps, min_samples=min_samples)
    cluster_labels = dbscan.fit_predict(coordinates)

    pandas_df['cluster'] = cluster_labels
    ports_clusters = pandas_df.groupby('cluster').agg(
        lat_bin = ('lat_bin', 'mean'),
        lon_bin = ('lon_bin', 'mean'),
        vessel_count = ('vessel_count', 'sum')
    )

    print(ports_clusters.sort_values('vessel_count', ascending=False).head())

    return ports_clusters

ports_clusters = cluster_dbscan(port_candidates)

           lat_bin    lon_bin  vessel_count
cluster                                    
8        55.734615  12.600000           131
2        57.590000   9.965000            76
5        57.462857  10.528571            68
19       57.717500  10.587500            63
21       54.367500  11.007500            62


In [4]:
def bin_ports(df):
    '''
    Bin ports to clusters based on lattidute and longitude.
    Grid cell size is approximately 6.4 × 11.1km rectangles (roughly 71km squared), may differ depending on latidute and longitude.
    '''

    binned_df = (
        df.withColumn("lat_bin_ports", spark_round(col("lat_bin"), 1))
        .withColumn("lon_bin_ports", spark_round(col("lon_bin"), 1))
    )
    
    port_candidates_agg = (
        binned_df.groupBy("lat_bin_ports", "lon_bin_ports")
                .agg(_sum("vessel_count").alias("vessel_total_count"))
    )
    
    port_candidates_agg.orderBy(col("vessel_total_count").desc()).show(truncate=False)
    
    return port_candidates_agg

port_candidates_agg = bin_ports(port_candidates)
port_candidates_agg.show(5)

+-------------+-------------+------------------+
|lat_bin_ports|lon_bin_ports|vessel_total_count|
+-------------+-------------+------------------+
|57.6         |10.0         |76                |
|55.7         |12.6         |73                |
|57.7         |10.6         |63                |
|57.1         |8.6          |55                |
|54.4         |11.0         |54                |
|57.5         |10.5         |44                |
|55.5         |8.4          |44                |
|56.7         |8.2          |43                |
|55.6         |12.4         |31                |
|57.1         |9.9          |31                |
|56.1         |12.3         |28                |
|55.1         |14.7         |28                |
|54.6         |11.4         |26                |
|55.6         |12.9         |23                |
|55.6         |12.7         |23                |
|55.1         |10.6         |22                |
|56.4         |10.9         |21                |
|54.8         |9.5  

In [None]:
def visualize_ports(port_candidates, map_name='port_map', to_pandas=True):
    # Transfer pyspark to pandas dataframe
    if to_pandas == True:
        port_candidates_pd = port_candidates.toPandas()
    else:
        port_candidates_pd = port_candidates

    # Zoom into Denmark
    mean_lat = port_candidates_pd["lat_bin"].mean()
    mean_lon = port_candidates_pd["lon_bin"].mean()
    m = folium.Map(location=[mean_lat, mean_lon], zoom_start=7)
    
    # Visualize the ports as circles
    for _, row in port_candidates_pd.iterrows():
        folium.CircleMarker(
            location=[row["lat_bin"], row["lon_bin"]],
            radius=min(row["vessel_count"], 20), 
            color="blue",
            fill=True,
            fill_opacity=0.6,
            popup=f"Vessels: {row['vessel_count']}"
        ).add_to(m)

    m.save(f"{map_name}.html")

visualize_ports(port_candidates, "binning_ports")
visualize_ports(port_candidates_agg, "binning_ports_agg")

In [29]:
visualize_ports(ports_clusters, map_name='dbscan_ports', to_pandas=False)