# Space Debris Detection Pipeline (PySpark + TF/KerasCV + HDFS)

This notebook implements an end-to-end pipeline for detecting space debris in images using:
1.  **HDFS:** For storing image data and annotations.
2.  **PySpark:** For reading and processing annotation metadata from HDFS.
3.  **TensorFlow & KerasCV:** For building, training (on GPU), and evaluating a YOLOv8 object detection model using data read directly from HDFS.

In [2]:
# Cell 1: Imports and Configuration

# --- Protobuf Workaround ---
# Set environment variable BEFORE importing tensorflow
# to potentially avoid descriptor errors with certain protobuf/TF versions
import os
os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python'
print(f"Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python")

# Core libraries
%pip install findspark
%pip install pandas
%pip install matplotlib
%pip install pyspark
import findspark
import json
import pandas as pd
import numpy as np
# Ensure TensorFlow, Keras, and KerasCV are compatible
%pip install tensorflow
import tensorflow as tf # Now import tensorflow
import subprocess # To run hdfs commands
import matplotlib.pyplot as plt

# PySpark for data processing
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, size
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, IntegerType, FloatType
from pyspark import StorageLevel # Corrected Import! For persisting DataFrames if needed

# Keras / KerasCV for the model
# Ensure Keras 3 is used if possible, otherwise adjust imports
%pip install keras-cv
import keras_cv
from keras_cv import bounding_box
# from keras_cv import visualization # Removed/Commented out: May not exist in keras-cv 0.3.5 top-level
# from tensorflow import keras # Use tf.keras usually
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam # Or other optimizers like AdamW

print(f"TensorFlow Version: {tf.__version__}")
# Check if keras_cv is imported before accessing version
if 'keras_cv' in locals() or 'keras_cv' in globals():
    # Attempt to get version, handle potential AttributeError if __version__ doesn't exist
    try:
        print(f"KerasCV Version: {keras_cv.__version__}")
    except AttributeError:
        print("KerasCV Version: Not available (likely older version)")
else:
    print("KerasCV not imported (likely due to previous error). Please ensure it's installed.")


# --- Configuration ---
# HDFS Configuration (!! MODIFY IF NEEDED !!)
HDFS_NAMENODE = "hdfs://localhost:9000"
HDFS_USER = os.getenv("HADOOP_USER_NAME", "dhanu") # Use system env var or default to 'dhanu'
HDFS_BASE_DIR_NAME = "debris_detection"
HDFS_BASE_PATH = f"{HDFS_NAMENODE}/user/{HDFS_USER}/{HDFS_BASE_DIR_NAME}"

# Local Paths (Ensure these are correct based on your 'debris-detection' folder location)
# Assuming your notebook is running from a directory *outside* 'debris-detection'
LOCAL_BASE_DIR = r"C:\college\CV\COSMOS\debris-detection" # Main project folder
LOCAL_TRAIN_IMG_DIR = os.path.join(LOCAL_BASE_DIR, "train")
LOCAL_VAL_IMG_DIR = os.path.join(LOCAL_BASE_DIR, "val")
LOCAL_TEST_IMG_DIR = os.path.join(LOCAL_BASE_DIR, "test")
LOCAL_TRAIN_CSV = os.path.join(LOCAL_BASE_DIR, "train.csv")
LOCAL_VAL_CSV = os.path.join(LOCAL_BASE_DIR, "val.csv")

# Define HDFS target paths
HDFS_TRAIN_IMG_DIR = f"{HDFS_BASE_PATH}/train_images"
HDFS_VAL_IMG_DIR = f"{HDFS_BASE_PATH}/val_images"
HDFS_TEST_IMG_DIR = f"{HDFS_BASE_PATH}/test_images"
HDFS_ANNOTATIONS_DIR = f"{HDFS_BASE_PATH}/annotations"
HDFS_TRAIN_CSV_PATH = f"{HDFS_ANNOTATIONS_DIR}/train.csv"
HDFS_VAL_CSV_PATH = f"{HDFS_ANNOTATIONS_DIR}/val.csv"

# Define Model Save Path (Local directory for checkpoints)
MODEL_SAVE_DIR = os.path.join(LOCAL_BASE_DIR, "debris_yolov8_checkpoints")
os.makedirs(MODEL_SAVE_DIR, exist_ok=True) # Ensure the directory exists

# TF Data Pipeline & Model Parameters
IMG_HEIGHT = 640 # YOLOv8 often uses 640x640
IMG_WIDTH = 640
BATCH_SIZE = 4   # Start small with YOLOv8, increase if GPU memory allows (e.g., 8, 16)
BUFFER_SIZE = 1000 # For shuffling tf.data dataset
# Define class mapping - adjust if you have more classes later
CLASS_MAPPING = {0: "debris"} # Class ID 0 maps to 'debris'
NUM_CLASSES = len(CLASS_MAPPING)
# Bounding box format used by KerasCV YOLOv8 preset
# We will convert our source [xmin, xmax, ymin, ymax] to this
TARGET_BBOX_FORMAT = "xyxy" # [xmin, ymin, xmax, ymax] relative coordinates

# --- Spark Initialization ---
print("Initializing Spark Session...")
try:
    findspark.init() # Finds Spark installation
    # Corrected Indentation:
    spark = SparkSession.builder \
        .appName("DebrisDetectionHDFS_YOLO") \
        .config("spark.executor.memory", "4g") \
        .config("spark.driver.memory", "2g") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .getOrCreate() # Add other necessary Spark/Hadoop configs if needed

    sc = spark.sparkContext
    print("SparkSession Initialized Successfully.")
    # Check if Spark UI URL is available
    try:
        print(f"Spark UI: {sc.uiWebUrl}") # Useful for monitoring Spark jobs
    except AttributeError:
        print("Spark UI URL not available.")
except Exception as e:
    print(f"Error initializing Spark: {e}")
    print("Please ensure Spark is correctly installed and configured.")
    raise

# --- GPU Check ---
print("\nChecking for GPU...")
gpus = tf.config.list_physical_devices('GPU')
if gpus:
  try:
    # Set memory growth to avoid allocating all memory at once
    for gpu in gpus:
      tf.config.experimental.set_memory_growth(gpu, True)
    logical_gpus = tf.config.list_logical_devices('GPU')
    print(f"{len(gpus)} Physical GPUs, {len(logical_gpus)} Logical GPUs available. Using GPU for training.")
  except RuntimeError as e:
    print(f"GPU Error: {e}. Training might fall back to CPU.")
else:
    print("No GPU detected by TensorFlow. Training will run on CPU.")

print("-" * 30)
print("Configuration Summary:")
print(f"  HDFS Base Path: {HDFS_BASE_PATH}")
print(f"  Local Base Dir: {LOCAL_BASE_DIR}")
print(f"  Model Save Dir: {MODEL_SAVE_DIR}")
print(f"  Image Size: ({IMG_HEIGHT}, {IMG_WIDTH})")
print(f"  Batch Size: {BATCH_SIZE}")
print(f"  Target BBox Format: {TARGET_BBOX_FORMAT}")
print(f"  Number of Classes: {NUM_CLASSES}")
print("-" * 30)


Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Collecting keras-cv
  Using cached keras_cv-0.9.0-py3-none-any.whl.metadata (12 kB)
Collecting regex (from keras-cv)
  Using cached regex-2024.11.6-cp39-cp39-win_amd64.whl.metadata (41 kB)
Collecting tensorflow-datasets (from keras-cv)
  Using cached tensorflow_datasets-4.9.3-py3-none-any.whl.metadata (9.3 kB)
Collecting keras-core (from keras-cv)
  Using cached keras_core-0.1.7-py3-none-any.whl.metadata (4.3 kB)
Collecting kagglehub (from keras-cv)
  Using cached kagglehub-0.3.12-py3-none-any.whl.metadata (38 kB)
Collecting pyyaml (from kagglehub->keras-cv)
  Using cached PyYAML-6.0.2-cp39-cp39-win_amd64.whl.met

  You can safely remove it manually.
  from .autonotebook import tqdm as notebook_tqdm


ModuleNotFoundError: No module named 'resource'

## Cell 2: Copy Local Data to HDFS

**Run this cell only ONCE** to copy your image folders and CSV files from your local disk into HDFS.

In [None]:
# Cell 2: Copy Local Data to HDFS (Run this cell only ONCE)

# Function to run HDFS commands more robustly
def run_hdfs_command(command_parts):
    """Runs an HDFS command using subprocess and prints output/errors."""
    # Ensure HADOOP_HOME/bin is in PATH or provide full path to hdfs executable if needed
    full_command = ["hdfs", "dfs"] + command_parts
    print(f"Running HDFS command: {' '.join(full_command)}")
    try:
        # Using shell=True might be necessary on Windows for complex paths or wildcards
        # Ensure paths with spaces are handled correctly if they occur
        process = subprocess.run(full_command, check=True, capture_output=True, text=True, shell=True)
        print("STDOUT:\n", process.stdout)
        if process.stderr:
            print("STDERR:\n", process.stderr) # Print stderr even on success, might contain warnings
        print("Command successful.")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error running command: {' '.join(full_command)}")
        print("Return code:", e.returncode)
        print("STDERR:\n", e.stderr)
        print("STDOUT:\n", e.stdout)
        return False
    except FileNotFoundError:
        print("Error: 'hdfs' command not found. Make sure Hadoop bin directory is in your system PATH.")
        return False
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return False

print("Starting data copy to HDFS...")
print("This may take some time depending on data size and network speed.")

# 1. Create base directories in HDFS (use -p for idempotency)
print("\nCreating HDFS directories...")
run_hdfs_command(["-mkdir", "-p", HDFS_BASE_PATH])
run_hdfs_command(["-mkdir", "-p", HDFS_TRAIN_IMG_DIR])
run_hdfs_command(["-mkdir", "-p", HDFS_VAL_IMG_DIR])
run_hdfs_command(["-mkdir", "-p", HDFS_TEST_IMG_DIR])
run_hdfs_command(["-mkdir", "-p", HDFS_ANNOTATIONS_DIR])

# 2. Copy Image Directories
# Use '-put -f'. '-f' overwrites if destination exists.
# Putting the directory itself copies the directory *into* the destination.
print("\nCopying training images...")
# This will create .../debris_detection/train_images/train
run_hdfs_command(["-put", "-f", LOCAL_TRAIN_IMG_DIR, HDFS_TRAIN_IMG_DIR])

print("\nCopying validation images...")
# This will create .../debris_detection/val_images/val
run_hdfs_command(["-put", "-f", LOCAL_VAL_IMG_DIR, HDFS_VAL_IMG_DIR])

print("\nCopying test images...")
# This will create .../debris_detection/test_images/test
run_hdfs_command(["-put", "-f", LOCAL_TEST_IMG_DIR, HDFS_TEST_IMG_DIR])

# 3. Copy CSV Files
print("\nCopying training CSV...")
run_hdfs_command(["-put", "-f", LOCAL_TRAIN_CSV, HDFS_TRAIN_CSV_PATH])

print("\nCopying validation CSV...")
run_hdfs_command(["-put", "-f", LOCAL_VAL_CSV, HDFS_VAL_CSV_PATH])

print("\n--- Data Copy to HDFS Initiated ---")
print("Verify the contents in HDFS using commands like:")
print(f"  hdfs dfs -ls {HDFS_BASE_PATH}")
print(f"  hdfs dfs -ls {HDFS_TRAIN_IMG_DIR}") # Should show 'train' folder inside
print(f"  hdfs dfs -ls {HDFS_ANNOTATIONS_DIR}") # Should show csv files
print("------------------------------------")

Starting data copy to HDFS...
This may take some time depending on data size and network speed.

Creating HDFS directories...
Running HDFS command: hdfs dfs -mkdir -p hdfs://localhost:9000/user/dhanu/debris_detection
STDOUT:
 
STDERR:
 mkdir: RPC response has invalid length of -16777216

Command successful.
Running HDFS command: hdfs dfs -mkdir -p hdfs://localhost:9000/user/dhanu/debris_detection/train_images
STDOUT:
 
STDERR:
 mkdir: RPC response has invalid length of -16777216

Command successful.
Running HDFS command: hdfs dfs -mkdir -p hdfs://localhost:9000/user/dhanu/debris_detection/val_images
STDOUT:
 
STDERR:
 mkdir: RPC response has invalid length of -16777216

Command successful.
Running HDFS command: hdfs dfs -mkdir -p hdfs://localhost:9000/user/dhanu/debris_detection/test_images
STDOUT:
 
STDERR:
 mkdir: RPC response has invalid length of -16777216

Command successful.
Running HDFS command: hdfs dfs -mkdir -p hdfs://localhost:9000/user/dhanu/debris_detection/annotations
STD

## Cell 3: Load and Process Annotations using PySpark

Read the annotation CSV files from HDFS, parse the bounding box strings, and generate the full HDFS paths for each image.

In [None]:
# Cell 3: Load and Process Annotations using PySpark

print("Loading and processing annotations from HDFS using PySpark...")

# --- Define Schema for the CSV ---
# Based on train.pdf, columns are 'ImageID' and 'bboxes'
schema = StructType([
    StructField("ImageID", StringType(), True),
    StructField("bboxes", StringType(), True)   # Read bboxes as string first
])

# --- Read the training CSV from HDFS ---
print(f"Reading training annotations from: {HDFS_TRAIN_CSV_PATH}")
try:
    train_annotations_df = spark.read.csv(HDFS_TRAIN_CSV_PATH, header=True, schema=schema)
    print(f"Successfully read training CSV. Count: {train_annotations_df.count()}") # Add count action
except Exception as e:
    print(f"Error reading HDFS file {HDFS_TRAIN_CSV_PATH}: {e}")
    print("Please ensure the file exists, has correct permissions, schema, and Spark config is right.")
    raise

# --- Define a UDF to parse the bounding box string ---
# Source format seems to be [xmin, xmax, ymin, ymax]
# Target format for KerasCV YOLO is usually "xyxy": [xmin, ymin, xmax, ymax]
# We will store the parsed values in the target format directly.
bbox_struct_schema = StructType([
    StructField("xmin", IntegerType(), False),
    StructField("ymin", IntegerType(), False),
    StructField("xmax", IntegerType(), False),
    StructField("ymax", IntegerType(), False)
])
bbox_array_schema = ArrayType(bbox_struct_schema)

def parse_bboxes_to_xyxy(bbox_str):
    """Parses the bounding box string and converts to xyxy format."""
    if bbox_str is None or not bbox_str.strip():
        return []
    try:
        raw_list = json.loads(bbox_str.replace("'", '"'))
        parsed_list = []
        for box in raw_list:
            if isinstance(box, list) and len(box) == 4:
                try:
                    # Original format: [xmin, xmax, ymin, ymax]
                    # Target format:   [xmin, ymin, xmax, ymax]
                    parsed_list.append({
                        "xmin": int(box[0]), "ymin": int(box[2]),
                        "xmax": int(box[1]), "ymax": int(box[3])
                    })
                except (ValueError, TypeError):
                    print(f"Warning: Skipping invalid numeric data in box: {box} from string: {bbox_str}")
                    continue # Skip invalid box
            else:
                 print(f"Warning: Skipping invalid box format: {box} from string: {bbox_str}")
        return parsed_list
    except (json.JSONDecodeError, TypeError) as e:
        print(f"Error parsing bbox string: '{bbox_str}', Error: {e}")
        return [] # Return empty list on error

parse_bboxes_udf = udf(parse_bboxes_to_xyxy, bbox_array_schema)

# --- Apply the UDF ---
train_annotations_df = train_annotations_df.withColumn("bboxes_parsed", parse_bboxes_udf(col("bboxes"))) \
                                           .drop("bboxes") # Drop the original string column

# --- Construct Full HDFS Image Paths ---
# Path structure after copy: HDFS_TRAIN_IMG_DIR/train/<ImageID>.jpg
train_img_folder_name = os.path.basename(LOCAL_TRAIN_IMG_DIR) # e.g., 'train'
print(f"Constructing HDFS paths assuming images are in folder: {train_img_folder_name}")

add_hdfs_path_udf = udf(
    lambda image_id: f"{HDFS_TRAIN_IMG_DIR}/{train_img_folder_name}/{image_id}.jpg",
    StringType()
)

train_df = train_annotations_df.withColumn("image_path", add_hdfs_path_udf(col("ImageID")))

# Filter out rows where parsing might have failed or path is invalid
train_df = train_df.filter(col("image_path").isNotNull())
# Keep only images that have at least one valid bounding box after parsing
train_df = train_df.filter(size(col("bboxes_parsed")) > 0)

# --- Show results ---
print("\nProcessed Training Annotations Schema:")
train_df.printSchema()
print("\nSample Processed Training Data (showing 5 rows):")
train_df.show(5, truncate=50, vertical=True) # Use vertical for better display of nested data

# --- Repeat for Validation Data ---
print(f"\nReading validation annotations from: {HDFS_VAL_CSV_PATH}")
try:
    val_annotations_df = spark.read.csv(HDFS_VAL_CSV_PATH, header=True, schema=schema)
    print(f"Successfully read validation CSV. Count: {val_annotations_df.count()}")

    val_annotations_df = val_annotations_df.withColumn("bboxes_parsed", parse_bboxes_udf(col("bboxes"))) \
                                            .drop("bboxes")

    val_img_folder_name = os.path.basename(LOCAL_VAL_IMG_DIR) # e.g., 'val'
    print(f"Constructing HDFS paths assuming validation images are in folder: {val_img_folder_name}")
    add_val_hdfs_path_udf = udf(
        lambda image_id: f"{HDFS_VAL_IMG_DIR}/{val_img_folder_name}/{image_id}.jpg",
        StringType()
    )
    val_df = val_annotations_df.withColumn("image_path", add_val_hdfs_path_udf(col("ImageID")))
    val_df = val_df.filter(col("image_path").isNotNull())
    val_df = val_df.filter(size(col("bboxes_parsed")) > 0) # Keep only images with boxes

    print("\nProcessed Validation Annotations Schema:")
    val_df.printSchema()
    print("\nSample Processed Validation Data (showing 5 rows):")
    val_df.show(5, truncate=50, vertical=True)

except Exception as e:
    print(f"Error processing validation data from HDFS: {e}")
    val_df = None # Set to None if validation data fails to load

print("\nSpark processing finished.")

Loading and processing annotations from HDFS using PySpark...
Reading training annotations from: hdfs://localhost:9000/user/dhanu/debris_detection/annotations/train.csv
Error reading HDFS file hdfs://localhost:9000/user/dhanu/debris_detection/annotations/train.csv: An error occurred while calling o51.csv.
: java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status; Host Details : local host is: "Stabl3/192.168.1.3"; destination host is: "localhost":9000; 
	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
	at org.apache.hadoop.ipc.Client.call(Client.java:1480)
	at org.apache.hadoop.ipc.Client.call(Client.java:1413)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
	at com.sun.proxy.$Proxy20.getFileInfo(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:776

Py4JJavaError: An error occurred while calling o51.csv.
: java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status; Host Details : local host is: "Stabl3/192.168.1.3"; destination host is: "localhost":9000; 
	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
	at org.apache.hadoop.ipc.Client.call(Client.java:1480)
	at org.apache.hadoop.ipc.Client.call(Client.java:1413)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
	at com.sun.proxy.$Proxy20.getFileInfo(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:776)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy21.getFileInfo(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
	at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
	at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
	at org.apache.hadoop.fs.FileSystem.isDirectory(FileSystem.java:1439)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:47)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:723)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status
	at com.google.protobuf.UninitializedMessageException.asInvalidProtocolBufferException(UninitializedMessageException.java:81)
	at com.google.protobuf.AbstractParser.checkMessageInitialized(AbstractParser.java:71)
	at com.google.protobuf.AbstractParser.parseDelimitedFrom(AbstractParser.java:253)
	at com.google.protobuf.AbstractParser.parseDelimitedFrom(AbstractParser.java:259)
	at com.google.protobuf.AbstractParser.parseDelimitedFrom(AbstractParser.java:49)
	at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcHeaderProtos.java:3167)
	at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1087)
	at org.apache.hadoop.ipc.Client$Connection.run(Client.java:980)


## Cell 4: Prepare TensorFlow Datasets (`tf.data`)

Collect the processed data from Spark DataFrames into Pandas DataFrames (on the driver node) and then create efficient `tf.data` pipelines to load images from HDFS and prepare data for the model.

In [None]:
# Cell 4: Prepare TensorFlow Datasets (tf.data)

print("Preparing TensorFlow datasets from Spark data...")

# --- Collect data to the driver node (Pandas DataFrame) ---
# Acceptable for ~1.2GB total data on a 16GB RAM machine, but monitor memory usage.
print("Collecting Spark DataFrame data to Pandas DataFrame on the driver...")
try:
    # Select only the necessary columns
    train_data_pd = train_df.select("image_path", "bboxes_parsed").toPandas()
    print(f"Collected {len(train_data_pd)} training samples.")
    if val_df is not None:
        val_data_pd = val_df.select("image_path", "bboxes_parsed").toPandas()
        print(f"Collected {len(val_data_pd)} validation samples.")
    else:
        val_data_pd = pd.DataFrame(columns=['image_path', 'bboxes_parsed']) # Empty df if no val data
        print("Validation data was not loaded successfully from Spark.")
except Exception as e:
    print(f"Error collecting data to driver: {e}")
    print("This might happen if the dataset is too large for driver memory or due to Arrow issues.")
    raise

# --- Define the function to load and preprocess images and labels ---
# This function will be wrapped in tf.py_function
def load_and_preprocess_for_tf(image_path_str, bboxes_list_of_dicts):
    """Loads image from HDFS, preprocesses, and formats boxes/labels for KerasCV."""
    # 1. Load Image from HDFS
    try:
        img_bytes = tf.io.read_file(image_path_str)
        img = tf.image.decode_jpeg(img_bytes, channels=3)
        img_shape = tf.shape(img, out_type=tf.float32) # Use float for division
        original_height, original_width = img_shape[0], img_shape[1]
    except Exception as e:
        # Handle potential errors during file reading/decoding
        tf.print(f"Error loading image {image_path_str}: {e}")
        # Return empty/dummy data consistent with expected output structure
        img = tf.zeros([IMG_HEIGHT, IMG_WIDTH, 3], dtype=tf.float32)
        boxes_xyxy_rel = tf.zeros([0, 4], dtype=tf.float32) # No boxes
        class_ids = tf.zeros([0], dtype=tf.int64) # No labels
        return img, boxes_xyxy_rel, class_ids

    # 2. Preprocess Image (Resize only, normalization often done by model layers)
    img = tf.image.resize(img, [IMG_HEIGHT, IMG_WIDTH], method='bilinear') # Use bilinear or nearest
    img = tf.cast(img, dtype=tf.float32) # Ensure float32 type

    # 3. Process Bounding Boxes
    num_boxes = len(bboxes_list_of_dicts)
    boxes_xyxy_abs = np.zeros((num_boxes, 4), dtype=np.float32)
    class_ids = np.zeros((num_boxes,), dtype=np.int64)

    if num_boxes > 0:
        for i, box_dict in enumerate(bboxes_list_of_dicts):
            # Already in xyxy format from Spark UDF
            boxes_xyxy_abs[i, 0] = box_dict['xmin']
            boxes_xyxy_abs[i, 1] = box_dict['ymin']
            boxes_xyxy_abs[i, 2] = box_dict['xmax']
            boxes_xyxy_abs[i, 3] = box_dict['ymax']
            # Assign class ID (assuming single class 'debris' with ID 0)
            class_ids[i] = 0 # Corresponds to CLASS_MAPPING

        # Convert absolute pixel coordinates to relative [0, 1] coordinates
        # Ensure division by non-zero dimensions
        divisor = np.array([original_width, original_height, original_width, original_height], dtype=np.float32)
        divisor = np.maximum(divisor, 1.0) # Avoid division by zero if image dimensions are somehow 0
        boxes_xyxy_rel = boxes_xyxy_abs / divisor

        # Clip boxes to [0, 1] range to handle potential rounding errors or boxes slightly outside
        boxes_xyxy_rel = np.clip(boxes_xyxy_rel, 0.0, 1.0)

        # Convert to TF tensors
        boxes_tensor = tf.convert_to_tensor(boxes_xyxy_rel, dtype=tf.float32)
        labels_tensor = tf.convert_to_tensor(class_ids, dtype=tf.int64)

    else: # Handle images with no bounding boxes
        boxes_tensor = tf.zeros((0, 4), dtype=tf.float32)
        labels_tensor = tf.zeros((0,), dtype=tf.int64)


    # 4. Format Output for KerasCV model.fit()
    # KerasCV models typically expect a dictionary with 'images' and 'bounding_boxes' keys
    # The 'bounding_boxes' value is another dictionary containing 'boxes' and 'classes'.
    # Note: During inference/prediction, only 'images' are needed.
    # We return image, boxes, classes separately here and combine them later in the pipeline.
    return img, boxes_tensor, labels_tensor


# --- Create the tf.data Datasets using tf.py_function ---
def create_tf_dataset(pandas_df):
    """Creates a tf.data.Dataset from the collected Pandas data."""
    # Create a dataset of image paths and the list of bbox dictionaries
    # Ensure 'bboxes_parsed' is treated as object dtype for lists/dicts
    dataset = tf.data.Dataset.from_tensor_slices((
        pandas_df['image_path'].tolist(),
        tf.ragged.constant(pandas_df['bboxes_parsed'].tolist(), dtype=tf.string) # Temporarily use ragged string to pass structure
    ))

    # Define the parsing function within the TF graph context if possible,
    # otherwise use py_function carefully.
    def parse_and_process(img_path, bboxes_ragged_str):
        # This part runs inside tf.py_function, needs careful handling of types
        # Decode the ragged string tensor back to list of dicts (might be complex)
        # A simpler approach might be to flatten the structure in pandas first
        # For now, stick to the original py_function approach for demonstration
        # Re-evaluate this if performance is an issue
        return tf.py_function(
            func=load_and_preprocess_for_tf,
            # Need to pass Python objects to py_function, decode tensors first
            inp=[img_path, bboxes_ragged_str], # This inp needs adjustment based on how ragged string is handled
            Tout=[tf.float32, tf.float32, tf.int64] # img, boxes, classes
        )

    # Use tf.py_function to map the loading/preprocessing function
    # Passing complex Python objects like list of dicts requires careful handling
    dataset = dataset.map(lambda img_path, bboxes_list_tf: tf.py_function(
            # The bboxes_list_tf needs to be converted back to a Python list of dicts here
            # This is non-trivial within tf.py_function. Let's simplify.
            # We will pass the raw list directly from pandas via the generator approach
            func=load_and_preprocess_for_tf,
            inp=[img_path, bboxes_list_tf], # Pass the tensor path and the list object
            Tout=[tf.float32, tf.float32, tf.int64] # img, boxes, classes
        ),
        num_parallel_calls=tf.data.AUTOTUNE # Parallelize mapping
    )


    # Set shapes after py_function (important!)
    def set_shapes(img, boxes, classes):
        img.set_shape([IMG_HEIGHT, IMG_WIDTH, 3])
        boxes.set_shape([None, 4]) # Variable number of boxes, 4 coords each
        classes.set_shape([None]) # Variable number of boxes
        return img, boxes, classes
    dataset = dataset.map(set_shapes, num_parallel_calls=tf.data.AUTOTUNE)

    # Combine into the dictionary format expected by KerasCV models
    def format_for_keras_cv(img, boxes, classes):
         return {"images": img, "bounding_boxes": {"boxes": boxes, "classes": classes}}
    dataset = dataset.map(format_for_keras_cv, num_parallel_calls=tf.data.AUTOTUNE)

    return dataset

# --- Create the Training Dataset ---
# Simpler approach using from_generator which handles Python objects more easily
def train_generator():
    for _, row in train_data_pd.iterrows():
        yield (row['image_path'], row['bboxes_parsed'])

train_tf_dataset = tf.data.Dataset.from_generator(
    train_generator,
    output_signature=(
        tf.TensorSpec(shape=(), dtype=tf.string),
        tf.TensorSpec(shape=(None,), dtype=tf.object) # Specify object for list of dicts
    )
)

# Map using the py_function wrapper
train_tf_dataset = train_tf_dataset.map(lambda img_path, bboxes_list: tf.py_function(
        func=load_and_preprocess_for_tf,
        inp=[img_path, bboxes_list],
        Tout=[tf.float32, tf.float32, tf.int64]
    ),
    num_parallel_calls=tf.data.AUTOTUNE
)

# Set shapes and format for KerasCV (same as before)
def set_shapes(img, boxes, classes):
    img.set_shape([IMG_HEIGHT, IMG_WIDTH, 3])
    boxes.set_shape([None, 4])
    classes.set_shape([None])
    return img, boxes, classes
train_tf_dataset = train_tf_dataset.map(set_shapes, num_parallel_calls=tf.data.AUTOTUNE)

def format_for_keras_cv(img, boxes, classes):
     return {"images": img, "bounding_boxes": {"boxes": boxes, "classes": classes}}
train_tf_dataset = train_tf_dataset.map(format_for_keras_cv, num_parallel_calls=tf.data.AUTOTUNE)


print("\nCreating training tf.data.Dataset...")
# Apply shuffling, padding, batching, prefetching
train_tf_dataset = train_tf_dataset.shuffle(BUFFER_SIZE)

# Pad batches: Images are fixed size, bounding boxes need padding.
# KerasCV handles ragged tensors internally often, but explicit padding is safer.
# Pad bounding boxes to a max number, e.g., 100 per image. Use -1 for padding value.
train_tf_dataset = train_tf_dataset.padded_batch(
    BATCH_SIZE,
    padding_values={
        "images": tf.constant(0.0, dtype=tf.float32),
        "bounding_boxes": {
            "boxes": tf.constant(0.0, dtype=tf.float32), # Pad boxes with 0
            "classes": tf.constant(-1, dtype=tf.int64)   # Pad classes with -1
        }
    },
    padded_shapes={
        "images": [IMG_HEIGHT, IMG_WIDTH, 3],
        "bounding_boxes": {
            "boxes": [None, 4],    # Pad first dimension (num_boxes) dynamically
            "classes": [None]      # Pad first dimension (num_boxes) dynamically
        }
    },
    drop_remainder=True # Important for consistent batch shapes during training
)
train_tf_dataset = train_tf_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

print("Training Dataset Element Spec (after formatting and batching):")
print(train_tf_dataset.element_spec)


# --- Create the Validation Dataset ---
if not val_data_pd.empty:
    print("\nCreating validation tf.data.Dataset...")
    def val_generator():
        for _, row in val_data_pd.iterrows():
            yield (row['image_path'], row['bboxes_parsed'])

    val_tf_dataset = tf.data.Dataset.from_generator(
        val_generator,
        output_signature=(
            tf.TensorSpec(shape=(), dtype=tf.string),
            tf.TensorSpec(shape=(None,), dtype=tf.object)
        )
    )

    val_tf_dataset = val_tf_dataset.map(lambda img_path, bboxes_list: tf.py_function(
            func=load_and_preprocess_for_tf,
            inp=[img_path, bboxes_list],
            Tout=[tf.float32, tf.float32, tf.int64]
        ),
        num_parallel_calls=tf.data.AUTOTUNE
    )

    val_tf_dataset = val_tf_dataset.map(set_shapes, num_parallel_calls=tf.data.AUTOTUNE)
    val_tf_dataset = val_tf_dataset.map(format_for_keras_cv, num_parallel_calls=tf.data.AUTOTUNE)

    # No shuffling for validation
    val_tf_dataset = val_tf_dataset.padded_batch(
        BATCH_SIZE,
        padding_values={ # Match training padding values
            "images": tf.constant(0.0, dtype=tf.float32),
            "bounding_boxes": {
                "boxes": tf.constant(0.0, dtype=tf.float32),
                "classes": tf.constant(-1, dtype=tf.int64)
            }
        },
        padded_shapes={ # Match training padding shapes
            "images": [IMG_HEIGHT, IMG_WIDTH, 3],
            "bounding_boxes": {
                "boxes": [None, 4],
                "classes": [None]
            }
        },
        drop_remainder=True # Also drop remainder for validation if using metrics sensitive to batch size
    )
    val_tf_dataset = val_tf_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    print("Validation Dataset Element Spec (after formatting and batching):")
    print(val_tf_dataset.element_spec)
else:
    val_tf_dataset = None
    print("\nValidation dataset is empty or was not created.")

print("\nTensorFlow dataset preparation finished.")

## Cell 5: Define the Object Detection Model (YOLOv8 from KerasCV)

Instantiate a pre-trained YOLOv8 model from KerasCV and compile it with appropriate object detection loss and metrics.

In [None]:
# Cell 5: Define the Object Detection Model (YOLOv8 from KerasCV)

print("Defining the YOLOv8 Object Detection Model...")

# --- Configuration for YOLOv8 ---
# Choose a preset. Smaller presets train faster but might be less accurate.
# Options include: "yolo_v8_xs_pascalvoc", "yolo_v8_s_pascalvoc", "yolo_v8_m_pascalvoc", etc.
# Using "_pascalvoc" presets loads weights pre-trained on Pascal VOC dataset.
# We use the 'detector' which includes the detection head.
PRESET_NAME = "yolo_v8_s_pascalvoc" # Start with 'small' PascalVOC model
# PRESET_NAME = "yolo_v8_m_pascalvoc" # Alternative: Medium model
# PRESET_NAME = "yolo_v8_s_coco" # Alternative: Use COCO preset (more classes initially)

# Define the bounding box format KerasCV expects
# Already set TARGET_BBOX_FORMAT = "xyxy"

# --- Instantiate the YOLOv8 Detector ---
# This will download pre-trained weights the first time you run it.
print(f"Instantiating YOLOv8 Detector with preset: {PRESET_NAME}")
model = keras_cv.models.YOLOV8Detector.from_preset(
    preset=PRESET_NAME,
    bounding_box_format=TARGET_BBOX_FORMAT,
    num_classes=NUM_CLASSES # Your number of classes (1 for 'debris')
)

# --- Compile the Model with Appropriate Loss and Metrics ---
print("Compiling the model...")
# KerasCV provides standard object detection losses and metrics
# The ObjectDetectionLoss combines classification (e.g., Focal) and box (e.g., CIoU) losses.
optimizer = Adam(learning_rate=1e-4) # Adjust learning rate as needed (start low for fine-tuning)
# optimizer = tf.keras.optimizers.AdamW(learning_rate=1e-4, weight_decay=1e-4) # AdamW often works well

model.compile(
    optimizer=optimizer,
    # Use the built-in KerasCV loss for object detection
    loss=keras_cv.losses.ObjectDetectionLoss(
        classification_loss=keras_cv.losses.FocalLoss(from_logits=True, reduction="sum"),
        box_loss=keras_cv.losses.IoULoss(bounding_box_format=TARGET_BBOX_FORMAT, mode="ciou", reduction="sum")
    ),
    # Use standard COCO metrics for evaluation (calculated differently than simple accuracy)
    metrics=[
        keras_cv.metrics.BoxCOCOMetrics(
            bounding_box_format=TARGET_BBOX_FORMAT,
            evaluate_freq=1, # Evaluate every epoch
            name="BoxCOCOMetrics" # Explicit name for easier history access
        )
    ]
    # Alternatively, you can use metrics=['accuracy'] during initial debugging,
    # but it's not meaningful for object detection evaluation.
    # metrics=['accuracy'] # Placeholder - DO NOT USE FOR FINAL EVALUATION
)

# Display model summary (can be very large for complex models)
try:
    model.summary(expand_nested=True) # Expand nested layers for more detail
except ValueError as e:
    print(f"\nNote: model.summary() might fail before model is built. Error: {e}")
    print("Model structure will be fully defined after the first batch of data.")

print("\nYOLOv8 model defined and compiled.")

## Cell 6: Train the Model

Train the compiled YOLOv8 model using the prepared `tf.data` pipelines. Use callbacks to save the best model weights, stop early if performance plateaus, and reduce the learning rate.

In [None]:
# Cell 6: Train the Model

print("Starting model training...")

# --- Configuration for Training ---
NUM_EPOCHS = 25 # Increase epochs for better results, e.g., 25, 50, or more
print(f"Attempting to train for {NUM_EPOCHS} epochs.")
print(f"Checkpoints will be saved in: {MODEL_SAVE_DIR}")

# --- Define Callbacks ---
# 1. ModelCheckpoint: Save weights
checkpoint_filename = "yolov8_debris_epoch_{epoch:02d}-val_loss_{val_loss:.4f}.weights.h5"
checkpoint_path = os.path.join(MODEL_SAVE_DIR, checkpoint_filename)
model_checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_path,
    save_weights_only=True,
    monitor='val_loss', # Monitor validation loss
    mode='min',
    save_best_only=True, # Save only the best model based on monitored value
    save_freq='epoch',
    verbose=1
)

# 2. EarlyStopping: Stop training if validation loss doesn't improve
early_stopping_callback = EarlyStopping(
    monitor='val_loss',
    patience=10, # Number of epochs with no improvement after which training will be stopped
    verbose=1,
    mode='min',
    restore_best_weights=True # Restore weights from the epoch with the best monitor value
)

# 3. ReduceLROnPlateau: Reduce learning rate if validation loss plateaus
reduce_lr_callback = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2, # Factor by which the learning rate will be reduced. new_lr = lr * factor
    patience=5, # Number of epochs with no improvement after which learning rate will be reduced
    verbose=1,
    mode='min',
    min_lr=1e-6 # Lower bound on the learning rate
)

# --- Check if validation data is available ---
callbacks_list = [model_checkpoint_callback, early_stopping_callback, reduce_lr_callback]
if val_tf_dataset is None:
    print("WARNING: No validation dataset available. Training without validation callbacks (EarlyStopping, ReduceLR).")
    # Remove callbacks that rely on validation data
    callbacks_list = [model_checkpoint_callback]
    validation_args = {}
else:
    validation_args = {'validation_data': val_tf_dataset}

# --- Calculate Steps (Optional but recommended for large datasets) ---
# If using tf.data with .repeat(), specify steps_per_epoch
# steps_per_epoch = len(train_data_pd) // BATCH_SIZE
# validation_steps = len(val_data_pd) // BATCH_SIZE if val_tf_dataset is not None else None
# print(f"Calculated steps per epoch: {steps_per_epoch}")
# print(f"Calculated validation steps: {validation_steps}")

# --- Start Training ---
history = model.fit(
    train_tf_dataset,
    epochs=NUM_EPOCHS,
    callbacks=callbacks_list,
    # steps_per_epoch=steps_per_epoch, # Uncomment if using steps
    # validation_steps=validation_steps, # Uncomment if using steps
    **validation_args # Pass validation_data only if available
)

print("\nTraining finished.")
# If EarlyStopping restored best weights, the current model state reflects the best epoch.

# --- Plot training history ---
if val_tf_dataset is not None and 'val_loss' in history.history:
    print("\nPlotting training history...")
    try:
        plt.style.use('seaborn-v0_8-darkgrid') # Use a nice style
        fig, axes = plt.subplots(1, 2, figsize=(16, 5)) # Create 2 subplots

        # Plot Loss
        axes[0].plot(history.history['loss'], label='Training Loss', marker='o', linestyle='--')
        axes[0].plot(history.history['val_loss'], label='Validation Loss', marker='o', linestyle='-')
        axes[0].set_title('Model Loss Over Epochs')
        axes[0].set_xlabel('Epoch')
        axes[0].set_ylabel('Loss Value')
        axes[0].legend()
        axes[0].grid(True)

        # Plot Metrics (Find the COCO metric name - might be 'BoxCOCOMetrics', 'MaP', etc.)
        # Find the actual metric key from history.history.keys()
        # Metric names might include loss components, filter them out
        metric_keys = [k for k in history.history.keys() if 'loss' not in k and not k.startswith('val_') and k != 'lr']
        val_metric_keys = [k for k in history.history.keys() if k.startswith('val_') and 'loss' not in k]

        if metric_keys and val_metric_keys:
            # Plot the first available primary metric (likely COCO metric)
            metric_name = metric_keys[0] # e.g., 'BoxCOCOMetrics'
            val_metric_name = val_metric_keys[0] # e.g., 'val_BoxCOCOMetrics'

            # The history object stores the *result* of the metric computation, which for BoxCOCOMetrics is a dict
            # We need to extract a specific value, e.g., 'MaP'
            if isinstance(history.history[metric_name][0], dict):
                 # Extract MaP if available
                 train_map = [epoch_metrics.get('MaP', np.nan) for epoch_metrics in history.history[metric_name]]
                 val_map = [epoch_metrics.get('MaP', np.nan) for epoch_metrics in history.history[val_metric_name]]
                 metric_label = 'MaP'
            else:
                 # Fallback if the metric isn't a dict (shouldn't happen with BoxCOCOMetrics)
                 train_map = history.history[metric_name]
                 val_map = history.history[val_metric_name]
                 metric_label = metric_name

            axes[1].plot(train_map, label=f'Training {metric_label}', marker='s', linestyle='--')
            axes[1].plot(val_map, label=f'Validation {metric_label}', marker='s', linestyle='-')
            axes[1].set_title(f'Model Metric ({metric_label}) Over Epochs')
            axes[1].set_xlabel('Epoch')
            axes[1].set_ylabel('Metric Value (e.g., mAP)')
            axes[1].legend()
            axes[1].grid(True)
        else:
             axes[1].set_title('Metrics Plot Unavailable')
             axes[1].text(0.5, 0.5, 'No suitable metrics found in history', horizontalalignment='center', verticalalignment='center')


        plt.tight_layout()
        plt.show()

        # Save the plot
        plot_save_path = os.path.join(MODEL_SAVE_DIR, "training_history_plot.png")
        fig.savefig(plot_save_path)
        print(f"Training history plot saved to: {plot_save_path}")

    except ImportError:
        print("Matplotlib not found. Skipping history plot.")
    except Exception as plot_e:
        print(f"Error plotting history: {plot_e}")
else:
    print("\nSkipping training history plot (no validation data or metrics).")

## Cell 7: Evaluate the Model

Load the best weights (restored by EarlyStopping or loaded manually) and evaluate the model's performance on the validation set using the compiled COCO metrics.

In [None]:
# Cell 7: Evaluate the Model

print("Evaluating the trained model...")

# --- Load the Best Saved Weights ---
# If EarlyStopping was used with restore_best_weights=True, the 'model' object already has the best weights.
# Otherwise, find and load the best checkpoint saved by ModelCheckpoint.
best_checkpoint_path = None
try:
    # Find the checkpoint file saved by save_best_only=True (based on val_loss)
    # This requires parsing filenames, which can be brittle. A more robust way is to save the best explicitly.
    # For now, let's assume EarlyStopping worked or we load the latest best.
    # If EarlyStopping didn't run or restore_best_weights=False, find the best manually:
    checkpoints = [os.path.join(MODEL_SAVE_DIR, f) for f in os.listdir(MODEL_SAVE_DIR) if f.endswith('.weights.h5')]
    if checkpoints:
        checkpoints.sort(key=os.path.getmtime) # Sort by modification time
        # If save_best_only=True, the latest might not be the best if training continued past the best epoch.
        # A better approach: Parse val_loss from filenames to find the minimum.
        best_val_loss = float('inf')
        for chkpt in checkpoints:
            try:
                # Extract val_loss, be robust to filename format variations
                val_loss_str = chkpt.split('val_loss_')[-1].split('.weights.h5')[0]
                current_val_loss = float(val_loss_str)
                if current_val_loss < best_val_loss:
                    best_val_loss = current_val_loss
                    best_checkpoint_path = chkpt
            except (IndexError, ValueError):
                print(f"Warning: Could not parse val_loss from checkpoint filename: {chkpt}")
                continue # Skip file if parsing fails

        if best_checkpoint_path:
             print(f"Identified best checkpoint (lowest val_loss): {best_checkpoint_path}")
        else:
             print("Warning: Could not determine the best checkpoint from filenames. Using model state from end of training.")
    else:
        print("No checkpoints found in directory. Using model state from end of training.")

except Exception as e:
    print(f"Error finding checkpoints: {e}. Using model state from end of training.")

# Decide whether to load weights or use the current model state
if best_checkpoint_path and os.path.exists(best_checkpoint_path):
    print(f"Loading weights from best checkpoint: {best_checkpoint_path}")
    # Re-create the model architecture (important if not using the 'model' object directly)
    eval_model = keras_cv.models.YOLOV8Detector.from_preset(
        preset=PRESET_NAME,
        bounding_box_format=TARGET_BBOX_FORMAT,
        num_classes=NUM_CLASSES
    )
    eval_model.load_weights(best_checkpoint_path)
    # Re-compile is essential after loading weights
    print("Compiling loaded model...")
    eval_model.compile(
        optimizer=optimizer, # Use the same optimizer config
        loss=keras_cv.losses.ObjectDetectionLoss(
            classification_loss=keras_cv.losses.FocalLoss(from_logits=True, reduction="sum"),
            box_loss=keras_cv.losses.IoULoss(bounding_box_format=TARGET_BBOX_FORMAT, mode="ciou", reduction="sum")
        ),
        metrics=[keras_cv.metrics.BoxCOCOMetrics(bounding_box_format=TARGET_BBOX_FORMAT, name="BoxCOCOMetrics")]
    )
    print("Model loaded with best weights and compiled.")
else:
    print("Using model state directly from training (assuming EarlyStopping restored best weights or no better checkpoint found).")
    eval_model = model # Use the model object directly from training


# --- Evaluate on Validation Set ---
if eval_model and val_tf_dataset:
    print("\nEvaluating on Validation Set using compiled COCO metrics...")
    results = eval_model.evaluate(val_tf_dataset, verbose=1)
    print("\nValidation Results:")
    if isinstance(results, list): # evaluate returns list: [loss, metric1_value, metric2_value,...]
        print(f"  Validation Loss    : {results[0]:.4f}")
        # Print metrics based on the names compiled into the model
        for i, metric_name in enumerate(eval_model.metrics_names[1:]): # Skip 'loss'
             metric_value = results[i+1]
             # COCO Metrics often return a dictionary of values (mAP, mAP50, etc.)
             if isinstance(metric_value, dict):
                 print(f"  Validation {metric_name}:")
                 for sub_metric, value in metric_value.items():
                     print(f"    - {sub_metric}: {value:.4f}")
             else:
                 print(f"  Validation {metric_name}: {metric_value:.4f}")
    else: # Single loss value if no metrics compiled (shouldn't happen here)
        print(f"  Validation Loss    : {results:.4f}")

elif not eval_model:
    print("\nSkipping validation evaluation (model not available).")
else:
    print("\nSkipping validation evaluation (validation data not available).")


# --- Conceptual: Prediction on Test Set ---
# 1. Prepare Test Dataset (Images Only)
print("\nConceptual: Preparing Test Dataset...")
try:
    test_img_folder_name = os.path.basename(LOCAL_TEST_IMG_DIR) # e.g., 'test'
    test_image_files_pattern = f"{HDFS_TEST_IMG_DIR}/{test_img_folder_name}/*.jpg" # Adjust pattern if needed
    test_image_paths = tf.data.Dataset.list_files(test_image_files_pattern, shuffle=False)
    print(f"Found test image pattern: {test_image_files_pattern}")

    # Check if any files match the pattern
    test_file_count = tf.data.experimental.cardinality(test_image_paths).numpy()
    if test_file_count == 0:
         print("Warning: No test files found matching the pattern.")
         raise FileNotFoundError # Raise error to skip prediction
    else:
         print(f"Found {test_file_count} potential test files.")


    def load_and_preprocess_test_image(image_path_tensor):
        try:
            img_bytes = tf.io.read_file(image_path_tensor)
            img = tf.image.decode_jpeg(img_bytes, channels=3)
            img = tf.image.resize(img, [IMG_HEIGHT, IMG_WIDTH], method='bilinear')
            img = tf.cast(img, dtype=tf.float32)
            # KerasCV predict expects dictionary format
            return {"images": img}
        except Exception as e:
            tf.print(f"Error loading test image {image_path_tensor}: {e}")
            return {"images": tf.zeros([IMG_HEIGHT, IMG_WIDTH, 3], dtype=tf.float32)} # Return dummy

    test_tf_dataset = test_image_paths.map(load_and_preprocess_test_image, num_parallel_calls=tf.data.AUTOTUNE)
    # Batch the test dataset
    test_tf_dataset = test_tf_dataset.batch(BATCH_SIZE) # Use a batch size suitable for inference
    test_tf_dataset = test_tf_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    print("Test dataset prepared conceptually.")

    # 2. Run Prediction
    if eval_model:
        print("\nRunning predictions on Test Set...")
        # Use model.predict() for inference
        # KerasCV detectors return bounding boxes in a dictionary format
        predictions = eval_model.predict(test_tf_dataset, verbose=1)

        # 'predictions' structure depends on the model, for KerasCV YOLOv8:
        # It's often a dictionary like {'boxes': ..., 'classes': ..., 'confidence': ...}
        # The shapes would be [batch, num_detections, 4] for boxes, [batch, num_detections] for classes/confidence
        print("\nSample Prediction Output Structure (first batch):")
        if isinstance(predictions, dict):
            for key, value in predictions.items():
                 # Check if value is a tensor/numpy array before accessing shape
                 if hasattr(value, 'shape'):
                     print(f"  Key: '{key}', Shape: {value.shape}, Dtype: {value.dtype}")
                 else:
                     print(f"  Key: '{key}', Value Type: {type(value)}")
        else:
             print(f"  Prediction Type: {type(predictions)}")
             if hasattr(predictions, 'shape'):
                 print(f"  Prediction Shape: {predictions.shape}")


        # 3. Post-process Predictions (Example)
        # - Filter boxes based on confidence threshold
        # - Apply Non-Max Suppression (NMS) if not done by the model
        # - Convert relative coordinates back to absolute pixel values if needed
        # - Visualize results (draw boxes on images)

        print("\nPrediction finished. Further post-processing needed to use/visualize results.")
        # Add visualization code here if desired, iterating through predictions and images.

    else:
        print("\nSkipping test set prediction (model not available).")

except (tf.errors.NotFoundError, FileNotFoundError):
     print(f"\nError: Test image files not found at pattern: {test_image_files_pattern}")
     print("Skipping test set prediction.")
except Exception as e:
    print(f"\nAn error occurred during test set preparation or prediction: {e}")
    print("Skipping test set prediction.")


print("\nEvaluation and Prediction conceptual steps finished.")

## Cell 8: Stop SparkSession

Release the resources used by the SparkSession.

In [None]:
# Cell 8: Stop SparkSession
print("\nStopping SparkSession...")
try:
    spark.stop()
    print("SparkSession stopped successfully.")
except Exception as e:
    print(f"Error stopping SparkSession: {e}")