In [68]:
# Comparison: Data Processing with Spark vs. Pandas
# -------------------------------------------------
#
# Goal of this Notebook:
# - Compare the data processing speed between Apache Spark and Pandas.
# - Analyze the data size limits where Pandas reaches its boundaries and Spark shows its advantages.
#
# Overview:
# - The first code block implements data processing with Spark.
# - The second code block implements the same logic using Pandas.
# - Both approaches are compared in terms of runtime and memory usage.
#
# Prerequisites:
# - Python version: 3.8 or higher
# - Apache Spark: 3.5.0
# - Installed libraries: pyspark, pandas, requests, folium
#
# Workflow:
# 1. Download and prepare the data.
# 2. Process the data using both approaches.
# 3. Measure and compare the runtime.
#
# Let's start by importing the necessary libraries and setting up the Spark environment.

In [69]:
from pyspark.sql import SparkSession
import requests
from io import BytesIO
import zipfile
from concurrent.futures import ThreadPoolExecutor
import tempfile
import os
from pyspark.sql.functions import col, to_timestamp, count, lit
import folium  # Import for map visualization
import time  # Import for time measurement

In [70]:
# Global configuration variables
# Path for local storage of CSV files
temp_storage_path = "./data/csv_files"  # Configurable storage path for CSV files
os.makedirs(temp_storage_path, exist_ok=True)  # Create the directory if it does not exist

In [71]:
# List of CSV (ZIP) URLs
csv_urls = [
    "https://web.ais.dk/aisdata/aisdk-2024-03-01.zip",
    "https://web.ais.dk/aisdata/aisdk-2024-03-02.zip"]
  #  "https://web.ais.dk/aisdata/aisdk-2024-03-03.zip",
  #  "https://web.ais.dk/aisdata/aisdk-2024-03-04.zip",
  # "https://web.ais.dk/aisdata/aisdk-2024-03-05.zip"
#]

In [72]:
# Step 1: Create a Spark session
spark = SparkSession.builder \
    .appName("AIS Data Processing") \
    .getOrCreate()

# Step 2: Function to download, extract, and save CSV files locally if not already present
def download_and_unzip_to_csv(url):
    # Extract ZIP filename and corresponding CSV filename
    zip_filename = url.split("/")[-1]
    csv_filename = zip_filename.replace(".zip", ".csv")
    csv_filepath = os.path.join(local_storage_path, csv_filename)
    
    # Check if the CSV file already exists
    if os.path.exists(csv_filepath):
        print(f"File already exists: {csv_filepath}, skipping download.")
        return csv_filepath  # Return the path to the existing file
    
    # Download and extract the ZIP file
    print(f"Downloading and extracting: {url}")
    response = requests.get(url)
    response.raise_for_status()
    zipfile_bytes = BytesIO(response.content)
    with zipfile.ZipFile(zipfile_bytes, 'r') as z:
        with z.open(z.namelist()[0]) as csv_file:
            # Save the extracted CSV locally
            with open(csv_filepath, "wb") as output_file:
                output_file.write(csv_file.read())
    
    return csv_filepath  # Return the path to the saved file

# Step 4: Parallel downloading and storing CSV files locally

# Start the timer
start_time = time.time()

# Count the total number of URLs
total_urls = len(csv_urls)

# Parallel downloading and storing CSV files
with ThreadPoolExecutor(max_workers=10) as executor:
    csv_file_paths = list(executor.map(download_and_unzip_to_csv, csv_urls))

# Stop the timer
end_time = time.time()

# Convert elapsed time into minutes and seconds
elapsed_time = end_time - start_time
minutes = int(elapsed_time // 60)
seconds = int(elapsed_time % 60)
download_time = f"The download time for {total_urls} files with Spark is {minutes} minutes and {seconds} seconds."

# Print the download time
print(download_time)

# Automatically update the README file
try:
    # Open the file and read its contents
    with open("README.md", "r") as readme:
        lines = readme.readlines()
    
    # Create a new list of lines
    updated_lines = []
    section_found = False
    for line in lines:
        if line.strip() == "### Download Time Results with Spark":
            # Replace the existing value with the new one
            updated_lines.append(line)
            updated_lines.append(f"{download_time}\n")
            section_found = True
        elif not section_found or line.strip() != f"{download_time}":
            updated_lines.append(line)

    # If the section was not found, append it
    if not section_found:
        updated_lines.append("\n### Download Time Results with Spark\n")
        updated_lines.append(f"{download_time}\n")

    # Overwrite the file
    with open("README.md", "w") as readme:
        readme.writelines(updated_lines)

    print("The download time and URL count have been successfully updated in the README.")
except FileNotFoundError:
    # If the file does not exist, create it
    with open("README.md", "w") as readme:
        readme.write("### Download Time Results with Spark\n")
        readme.write(f"{download_time}\n")
    print("The README file was created, and the download time has been added.")
except Exception as e:
    print(f"Error writing to the README file: {e}")

File already exists: ./data/temp/aisdk-2024-03-01.csv, skipping download.
File already exists: ./data/temp/aisdk-2024-03-02.csv, skipping download.
The download time for 2 files with Spark is 0 minutes and 0 seconds.
The download time and URL count have been successfully updated in the README.


In [73]:
# Step 5: Read CSV files with Spark and combine them
# Create a list of DataFrames for each CSV file
dataframes = [spark.read.csv(path, header=True, inferSchema=True) for path in csv_file_paths]

# Combine all DataFrames into a single large DataFrame
combined_df = dataframes[0]
for df in dataframes[1:]:
    combined_df = combined_df.union(df)

# Print the number of rows in the combined DataFrame
print(f"The combined dataset contains {combined_df.count()} rows.")



The combined dataset contains 31817670 rows.


                                                                                

In [74]:
# Step 6: Display some sample rows to show possible MMSI numbers
combined_df.show(10)

# Print the original number of entries
print(f"The original dataset contains {combined_df.count()} entries.")

+-------------------+--------------+---------+---------+---------+--------------------+----+----+-----+-------+-------+--------+----+---------+----------+-----+------+------------------------------+-------+-----------+----+----------------+----+----+----+----+
|        # Timestamp|Type of mobile|     MMSI| Latitude|Longitude| Navigational status| ROT| SOG|  COG|Heading|    IMO|Callsign|Name|Ship type|Cargo type|Width|Length|Type of position fixing device|Draught|Destination| ETA|Data source type|   A|   B|   C|   D|
+-------------------+--------------+---------+---------+---------+--------------------+----+----+-----+-------+-------+--------+----+---------+----------+-----+------+------------------------------+-------+-----------+----+----------------+----+----+----+----+
|01/03/2024 00:00:00|       Class A|219000873| 56.99091|10.304543|Under way using e...|NULL| 0.0| 30.2|   NULL|Unknown| Unknown|NULL|Undefined|      NULL| NULL|  NULL|                     Undefined|   NULL|    Unknown



The original dataset contains 31817670 entries.


                                                                                

In [75]:
# Step 5: Read CSV files with Spark and combine them
# Create a list of DataFrames for each CSV file
dataframes = [spark.read.csv(path, header=True, inferSchema=True) for path in csv_file_paths]

# Combine all DataFrames into one large DataFrame
combined_df = dataframes[0]
for df in dataframes[1:]:
    combined_df = combined_df.union(df)

                                                                                

In [76]:
########################################################
# 2. Filter out base stations as they do not display navigation data ("Type of mobile" != "Base Station")
########################################################

# Check if the column "Type of mobile" exists and filter
if "Type of mobile" in combined_df.columns:
    combined_df = combined_df.filter(col("Type of mobile") != "Base Station")
else:
    print("Warning: 'Type of mobile' column not found, skipping this step.")

print(f"The adjusted dataset contains {combined_df.count()} entries.")



The adjusted dataset contains 29508390 entries.


                                                                                

In [77]:
########################################################
# 3. Keep only relevant columns to reduce data size
########################################################

relevant_columns = ["MMSI", "Latitude", "Longitude", "# Timestamp"]
combined_df = combined_df.select(*relevant_columns)

########################################################
# 4. Convert Timestamp to datetime format
########################################################

combined_df = combined_df.withColumn("# Timestamp", to_timestamp(col("# Timestamp"), "dd/MM/yyyy HH:mm:ss"))

########################################################
# 5. Filter MMSI numbers with enough data points to display meaningful routes
########################################################

# Count the number of data points per MMSI
mmsi_counts = combined_df.groupBy("MMSI").agg(count("*").alias("count"))

# Define a threshold (e.g., at least 50 points)
threshold = 50
valid_mmsi = mmsi_counts.filter(col("count") >= threshold).select("MMSI").rdd.flatMap(lambda x: x).collect()

# Filtered DataFrame containing only MMSI numbers with sufficient data points
filtered_by_count_df = combined_df.filter(col("MMSI").isin(valid_mmsi))

25/01/21 10:40:28 ERROR Executor: Exception in task 0.0 in stage 140.0 (TID 1860)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1100, in main
    raise PySparkRuntimeError(
    ...<5 lines>...
    )
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 13) than that in driver 3.11, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$Rea

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 140.0 failed 1 times, most recent failure: Lost task 0.0 in stage 140.0 (TID 1860) (10.172.65.246 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1100, in main
    raise PySparkRuntimeError(
    ...<5 lines>...
    )
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 13) than that in driver 3.11, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1100, in main
    raise PySparkRuntimeError(
    ...<5 lines>...
    )
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 13) than that in driver 3.11, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


In [19]:
########################################################
# 6. Filter by specific MMSI and time range + plot the route
########################################################

mmsi_number = 219016832  # Replace with your desired MMSI

# Define start and end timestamps (in the format "dd/MM/yyyy HH:mm:ss")
start_str = "01/03/2024 00:00:00"  # Start time
end_str = "01/03/2024 06:59:59"    # End time

# Convert start and end times to datetime objects
start_dt = to_timestamp(lit(start_str), "dd/MM/yyyy HH:mm:ss")
end_dt = to_timestamp(lit(end_str), "dd/MM/yyyy HH:mm:ss")

# Check if the MMSI has enough data points
if mmsi_number not in valid_mmsi:
    print(f"MMSI {mmsi_number} does not have enough data points to display a meaningful route.")
else:
    # Filter by MMSI and time range
    route_df = filtered_by_count_df.filter(
        (col("MMSI") == mmsi_number) &
        (col("# Timestamp") >= start_dt) &
        (col("# Timestamp") <= end_dt)
    ).orderBy("# Timestamp")

    # Check if filtered data is available
    if route_df.count() == 0:
        print(f"No data for MMSI {mmsi_number} between {start_str} and {end_str}")
    else:
        # Convert data to Pandas DataFrame for plotting
        pandas_df = route_df.toPandas()

        # Create a map and plot the route
        mean_lat = pandas_df["Latitude"].mean()
        mean_lon = pandas_df["Longitude"].mean()
        
        route_map = folium.Map(location=[mean_lat, mean_lon], zoom_start=8)
        
        # Create a list of coordinates for the PolyLine
        coords = pandas_df[["Latitude", "Longitude"]].values.tolist()
        
        # Add the PolyLine to the map
        folium.PolyLine(coords, color="blue", weight=2.5, opacity=1).add_to(route_map)
        
        # Save the map
        route_map.save("ship_route.html")
        print("Route successfully saved as 'ship_route.html'.")

                                                                                

Route wurde erfolgreich als 'ship_route.html' gespeichert.
