Data Cleansing


*   **Dropped rows** where key columns (`MMSI`, `Latitude`, `Longitude`, `timestamp`) are null.
*   **Removed duplicates** based on `MMSI`, `Latitude`, `Longitude`, and `timestamp`.
* **Filtered out** invalid latitude and longitude values (Latitude: -90 to 90, Longitude: -180 to 180).
* **Excluded rows** with unrealistic speeds (greater than 107 km/h) based on calculated speed. Treshold was selected based on current quickest ship - HSC Francisco.  


Longest distance Travelled
*  Using `geopy` module, we calculated the distance traveled between consecutive points for each vessel. This step required handling missing values carefully to avoid calculation errors.
4. We identified the vessel as `219133000` with `790 km`

# Data retrieval

In [None]:
# Download the zipped dataset
!wget http://web.ais.dk/aisdata/aisdk-2024-05-04.zip -O aisdk-2024-05-04.zip

In [None]:
import zipfile
import os
import shutil

zip_file_path = 'aisdk-2024-05-04.zip'
extract_to = 'ais_data'

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extract_to)

extracted_files = os.listdir(extract_to)
print(f"Extracted files: {extracted_files}")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
source_file_path = os.path.join(extract_to, extracted_files[0])  # Assuming there's only one file in the ZIP
destination_dir = "dest/dir"

os.makedirs(destination_dir, exist_ok=True)

destination_file_path = os.path.join(destination_dir, extracted_files[0])
shutil.copy(source_file_path, destination_file_path)

print(f"File copied to: {destination_file_path}")

# PySpark in Google Colab

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [1 In[0m[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn[0m                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
[33m0% [Connecting to archive.ubuntu.com (91.189.91.83)] [Connecting to security.ub[0m                                                                               Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Waiting for headers] [Connecting to ppa.launchpadcontent.net (185.125.190.8[0m                                                                               Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRele

# Script

In [None]:
!pip install geopy



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, lag, unix_timestamp, udf, sum as spark_sum
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql import Window
from geopy.distance import geodesic

In [None]:
def process_ais_data(file_path):
    '''The script identifies the vessel that traveled the longest distance based on AIS data.
    The distance calculations use the geodesic formula, ensuring accurate measurements over the Earth's surface.
    Data is cleaned from unrealistic speeds, missing and incorrect values.'''

    # Read and preprocess data
    df = spark.read.csv(file_path, header=True, inferSchema=True).select(
        col("MMSI").cast(IntegerType()),
        col("Latitude").cast(DoubleType()),
        col("Longitude").cast(DoubleType()),
        to_timestamp(col("# Timestamp"), "dd/MM/yyyy HH:mm:ss").alias("timestamp")
    ).dropna(how='all', subset=["MMSI", "Latitude", "Longitude", "timestamp"]).dropDuplicates()

    # Filter out invalid latitude and longitude values
    df = df.filter(
        (col("Latitude") >= -90) & (col("Latitude") <= 90) &
        (col("Longitude") >= -180) & (col("Longitude") <= 180)
    )

    # Define UDF to calculate distance using geodesic formula
    def calculate_distance(lat1, lon1, lat2, lon2):
        if None in (lat1, lon1, lat2, lon2):
            return None
        return geodesic((lat1, lon1), (lat2, lon2)).km

    distance_udf = udf(calculate_distance, DoubleType())

    # Create window specification to partition by MMSI and order by timestamp
    window_spec = Window.partitionBy("MMSI").orderBy("timestamp")

    # Calculate previous latitude, longitude, and timestamp
    df = df.withColumn("prev_latitude", lag(col("Latitude")).over(window_spec)) \
           .withColumn("prev_longitude", lag(col("Longitude")).over(window_spec)) \
           .withColumn("prev_timestamp", lag(col("timestamp")).over(window_spec))

    # Filter rows with complete previous coordinates and timestamps
    df = df.filter(
        col("prev_latitude").isNotNull() & col("prev_longitude").isNotNull() &
        col("prev_timestamp").isNotNull()
    )

    # Calculate distance and speed
    df = df.withColumn("distance", distance_udf(col("prev_latitude"), col("prev_longitude"), col("Latitude"), col("Longitude"))) \
           .withColumn("time_diff_hours", (unix_timestamp(col("timestamp")) - unix_timestamp(col("prev_timestamp"))) / 3600) \
           .withColumn("speed_kmh", col("distance") / col("time_diff_hours"))

    # Filter out rows with unrealistic speed
    max_realistic_speed_kmh = 107
    df = df.filter(col("speed_kmh") <= max_realistic_speed_kmh)

    # Aggregate distances by MMSI to get the total distance traveled by each vessel and identify the vessel with the longest distance
    longest_route_vessel = df.groupBy("MMSI").agg(spark_sum("distance").alias("total_distance")) \
                              .orderBy(col("total_distance").desc()).first()

    if longest_route_vessel:
        print(f"Vessel with the longest route: MMSI {longest_route_vessel['MMSI']} with distance {longest_route_vessel['total_distance']} km")
    else:
        print("No valid data to process.")

    spark.stop()

In [None]:
#"/path/to/your/aisdk-2024-05-04.csv"
file_path = "/content/drive/MyDrive/Colab Notebooks/BD/Task 4/aisdk-2024-05-04.csv"  # Update this path to your file location
process_ais_data(file_path)

Vessel with the longest route: MMSI 219133000 with distance 790.4958713586379 km
