# Libraries

In [1]:
import sys
sys.path.append('..') 
sys.path.append('.') 

In [2]:
%load_ext autoreload
%autoreload 2

In [16]:
import os
import json
import tarfile
from tqdm import tqdm

from ultralytics import YOLOWorld

import cv2
from cv2 import dnn_superres

import pandas as pd
import numpy as np

import tensorflow as tf

from IPython.display import display, HTML
import matplotlib.pyplot as plt

from adjustText import adjust_text

from pathlib import Path

from matplotlib import rcParams


In [4]:
from functions.cardetection import detect_cars
from functions.imgprocessing import visualize_random_batch, crop_detected_vehicles, load_image_as_bytes
from functions.superresolution import superresolve_images

In [None]:
# Enable LaTeX rendering for elegant text, comment if no available TeX distribution
rcParams['text.usetex'] = True
rcParams['font.family'] = 'serif'
rcParams['text.latex.preamble'] = r'\usepackage{mathpazo}'
plt.rcParams['axes.axisbelow'] = True




# Define colors
box_color = (86, 0, 50)  # Dark red for bounding boxes
text_color = (86, 0, 50)  # Same color for text

# Global variables

In [5]:
IMAGE_EXTENSIONS = (
        '.bmp', '.dib',           # Windows bitmaps
        '.jpeg', '.jpg', '.jpe',  # JPEG files
        '.jp2',                   # JPEG 2000 files
        '.png',                   # Portable Network Graphics
        '.webp',                  # WebP files
        '.avif',                  # AVIF files
        '.pbm', '.pgm', '.ppm', '.pxm', '.pnm',  # Portable image formats
        '.pfm',                   # PFM files
        '.sr', '.ras',            # Sun rasters
        '.tiff', '.tif',          # TIFF files
        '.exr',                   # OpenEXR Image files
        '.hdr', '.pic'            # Radiance HDR
    )

In [None]:
ORIGINAL_IMAGE_DIR = "sample"
ORIGINAL_IMAGE_PATHS = [os.path.join(ORIGINAL_IMAGE_DIR, f) for f in os.listdir(ORIGINAL_IMAGE_DIR) if f.lower().endswith(IMAGE_EXTENSIONS)]
print(f"{len(ORIGINAL_IMAGE_PATHS)} image(s) found in {ORIGINAL_IMAGE_DIR} folder.")

In [None]:
OUTPUT_DIR = "outputs/exp"

DETECTION_DIR = f"{OUTPUT_DIR}/0_detection"
LABEL_DIR = f"{OUTPUT_DIR}/0_detection/labels"
CROPPED_DIR = f"{OUTPUT_DIR}/1_cropped_vehicles"

SUPERRESOLVED_DIR = f"{OUTPUT_DIR}/2_super_resolution"
sr_csv_path = f"{OUTPUT_DIR}/2_super_resolution/rescaling_stats.csv"

ic_csv_path= f"{OUTPUT_DIR}/3_classification/classification_results.csv"

ANNOTATED_IMG_DIR = f"{OUTPUT_DIR}/annotated_images/"


dirs = [LABEL_DIR, CROPPED_DIR, SUPERRESOLVED_DIR, ANNOTATED_IMG_DIR]

for dir in dirs:
    os.makedirs(dir, exist_ok=True)

# Vehicle Detection 
Detect vehicles in street-level images using a pre-trained YOLO-World model with SAHI.

## Initialize YOLO-World model
Ref: https://doi.org/10.48550/arXiv.2401.17270

In [14]:
model = YOLOWorld("models/yolov8x-world.pt")  
model.set_classes(["car"]) 

## Use YOLO-World with SAHI for Sliced Inference

- https://github.com/obss/sahi
- https://doi.org/10.1109/ICIP46576.2022.9897990
- https://doi.org/10.5281/zenodo.5718950
      

In [None]:
for image_path in tqdm(ORIGINAL_IMAGE_PATHS, desc="Processing images"):
        print(f"Processing {image_path}...")
        detect_cars(
        image_path=image_path,
        output_dir=Path(DETECTION_DIR),
        model=model,
        slice_height=320, #depending on the expected minimal object size
        slice_width=320,
        overlap_height_ratio=0.1,
        overlap_width_ratio=0.1,
        confidence_threshold=0.25,
        save_labels=True,
        save_visuals=True,
        verbose=True,
    )
        

# TODO fix the case of bigger objects occupying bigger part of the image

### Visualize car detection results

In [None]:
visualize_random_batch(ORIGINAL_IMAGE_DIR, LABEL_DIR, batch_size=1)

# Region Extraction & Contextual Expansion 
Crop detected vehicle regions with a 20% bounding box enlargement to include flood indicators and contextual cues.

In [None]:
crop_detected_vehicles(ORIGINAL_IMAGE_DIR, LABEL_DIR, CROPPED_DIR, scale=1.2, verbose=False)
print(f'{len([f for f in os.listdir(CROPPED_DIR) if os.path.isfile(os.path.join(CROPPED_DIR, f))])} images saved')

# Super-Resolution Enhancement 
Apply pre-trained Enhanced Deep Super-Resolution (EDSR) networks to improve image quality.
https://doi.org/10.48550/arXiv.1707.02921 

In [None]:
# Initialize and load Super-Resolution models with different scales
sr_models = {
    4: dnn_superres.DnnSuperResImpl_create(),
    3: dnn_superres.DnnSuperResImpl_create(),
    2: dnn_superres.DnnSuperResImpl_create()
}

for scale, sr in sr_models.items():
    model_path = f"./models/EDSR_x{scale}.pb"  # TODO ADD MODEL DOWNLOAD
    sr.readModel(model_path)
    sr.setModel("edsr", scale)

In [None]:
folder_stats = {
    'min_size': float('inf'),
    'max_size': 0,
    'below_200_count': 0,
    'below_100_count': 0,
    'below_50_count': 0
}


image_paths = [os.path.join(CROPPED_DIR, f) for f in os.listdir(CROPPED_DIR) if f.lower().endswith(IMAGE_EXTENSIONS)]

for image_path in image_paths:
    image = cv2.imread(str(image_path))
    if image is None:
        continue

    height, width = image.shape[:2]
    size = max(height, width)

    folder_stats['min_size'] = min(folder_stats['min_size'], size)
    folder_stats['max_size'] = max(folder_stats['max_size'], size)
    folder_stats['below_200_count'] += (height < 200 or width < 200)
    folder_stats['below_100_count'] += (height < 100 or width < 100)
    folder_stats['below_50_count'] += (height < 50 or width < 50)

print("-" * 50)
print(f"Folder: {CROPPED_DIR}")
print(f"Min size: {folder_stats['min_size']} pixels")
print(f"Max size: {folder_stats['max_size']} pixels")
print(f"Images below 200 pixels: {folder_stats['below_200_count']}")
print(f"Images below 100 pixels: {folder_stats['below_100_count']}")
print(f"Images below 50 pixels: {folder_stats['below_50_count']}")
print("-" * 50)


In [None]:
superresolve_images(CROPPED_DIR, SUPERRESOLVED_DIR, sr_models, IMAGE_EXTENSIONS, sr_csv_path)

# Flood Depth Classification 
Classify images based on flood depth levels using the fine-tuned ResNet50 model.

In [None]:
# TODO ADD MODEL DOWNLOAD
classifier_path = "models/classifier.tar.gz"
extract_dir = "models/classifier"

os.makedirs(extract_dir, exist_ok=True)

with tarfile.open(classifier_path, "r:gz") as tar:
    tar.extractall(path=extract_dir)

print(f"Model extracted to {extract_dir}")

In [None]:
model_path = "models/classifier/1"

model = tf.saved_model.load(model_path)


print("Available signatures:", list(model.signatures.keys()))
inference_fn = model.signatures["serving_default"]
print("Input Signature:", inference_fn.structured_input_signature)
print("Output Signature:", inference_fn.structured_outputs)

labels_info_path = "models/classifier/labels_info.json"

with open(labels_info_path, "r") as f:
    labels_info = json.load(f)

CLASS_LABELS = labels_info["labels"]

print("Class Labels:", CLASS_LABELS)


In [None]:
sr_image_paths = [os.path.join(SUPERRESOLVED_DIR, f) for f in os.listdir(SUPERRESOLVED_DIR) if f.lower().endswith(IMAGE_EXTENSIONS)]

# Run classification
results = []
for image_path in tqdm(sr_image_paths, desc="Classifying Images", unit="image"):
    img_bytes = load_image_as_bytes(image_path)

    predictions = inference_fn(bytes_inputs=img_bytes)["output_0"].numpy()[0]

    # Get predicted label
    predicted_label = CLASS_LABELS[predictions.argmax()]
    results.append([image_path, predicted_label, *predictions])

# Convert to DataFrame with tidy format
df = pd.DataFrame(results, columns=["image_path", "predicted_label"] + CLASS_LABELS)

# Save to CSV
df.to_csv(ic_csv_path, index=False)
print(f"Classification results saved to {ic_csv_path}")


## Results Visualization

In [None]:
# Load classification results
df = pd.read_csv(ic_csv_path)

# Show class distribution
print("Class Distribution:\n", df["predicted_label"].value_counts().to_string())

# Display random sample
sample_df = df.sample(min(5, len(df)))

for _, row in sample_df.iterrows():
    img_path = row["image_path"]
    predicted_label = row["predicted_label"]
    
    class_probs = row.iloc[2:].to_dict()
    sorted_probs = sorted(class_probs.items(), key=lambda x: x[1], reverse=True)

    display(
        HTML(
            f'<img src="{img_path}" style="width: 250px;"/>'
            f"<figcaption>Predicted: {predicted_label} ({sorted_probs[0][1]:.2%})<br>"
            f"Probabilities: {', '.join([f'{k}: {v:.2%}' for k, v in sorted_probs])}</figcaption>"
        )
    )

## On original image

In [None]:
LEVELS_YOLO_ANNOTATIONS_DIR = f"{OUTPUT_DIR}/levels_yolo_annotations/"
os.makedirs(LEVELS_YOLO_ANNOTATIONS_DIR, exist_ok=True)

CLASS_LABELS_TO_ID = {label: idx for idx, label in enumerate(CLASS_LABELS)}

# Load classification results
df = pd.read_csv(ic_csv_path)

# Extract original image name and detection index
df["original_image"] = df["image_path"].apply(lambda x: "_".join(os.path.basename(x).split("_")[:-1]) + ".jpg")
df["detection_index"] = df["image_path"].apply(lambda x: int(os.path.basename(x).split("_")[-1].split(".")[0]))

# Iterate through each image
for image_file in os.listdir(ORIGINAL_IMAGE_DIR):
    if not image_file.lower().endswith(IMAGE_EXTENSIONS):
        continue  # Skip non-image files

    image_path = os.path.join(ORIGINAL_IMAGE_DIR, image_file)
    label_path = os.path.join(LABEL_DIR, os.path.splitext(image_file)[0] + ".txt")
    lvl_yolo_annotation_path = os.path.join(LEVELS_YOLO_ANNOTATIONS_DIR, os.path.splitext(image_file)[0] + ".txt")

    if not os.path.exists(label_path):
        print(f"Skipping {image_file}: No YOLO-World labels found.")
        continue

    # Load image
    image = cv2.imread(image_path)
    if image is None:
        print(f"Skipping {image_file}: Could not load image.")
        continue

    # Load YOLO labels (bounding boxes)
    with open(label_path, "r") as f:
        detections = [list(map(float, line.strip().split())) for line in f]

    if not detections:
        print(f"Skipping {image_file}: No detections found.")
        continue

    # Filter classifications for this image and sort by detection index
    image_df = df[df["original_image"] == image_file].sort_values(by="detection_index")

    # Class counts
    class_counts = {level: 0 for level in df.columns[2:-2]}  # Dynamically get class labels

    img_height, img_width = image.shape[:2]

    fig, ax = plt.subplots(figsize=(12, 8))
    ax.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    ax.set_title(rf"\textbf{{{image_file}}}")
    ax.axis("off")

    text_annotations = []  # Store text annotations for adjustment
    yolo_annotations = []  # Store YOLO-formatted annotations

    for (detection, (_, row)) in zip(detections, image_df.iterrows()):
        class_id, x_center, y_center, width, height = detection
        x1 = int((x_center - width / 2) * img_width)
        y1 = int((y_center - height / 2) * img_height)
        x2 = int((x_center + width / 2) * img_width)
        y2 = int((y_center + height / 2) * img_height)

        predicted_label = row["predicted_label"]
        probability = row[predicted_label]  # Get confidence score

        class_counts[predicted_label] += 1
        yolo_class_id = CLASS_LABELS_TO_ID[predicted_label]  # Get corresponding class ID

        ax.add_patch(plt.Rectangle((x1, y1), x2 - x1, y2 - y1, linewidth=2, edgecolor="darkred", facecolor="none"))

        # Convert to YOLO format (normalized coordinates)
        yolo_x_center = x_center
        yolo_y_center = y_center
        yolo_width = width
        yolo_height = height
        yolo_annotations.append(f"{yolo_class_id} {yolo_x_center:.6f} {yolo_y_center:.6f} {yolo_width:.6f} {yolo_height:.6f} {probability:.4f}")

        label_text = f"{predicted_label.replace('Level', 'L')}: {probability:.0%} %"
        text_annotations.append(ax.text(x1, y1 - 10, label_text, fontsize=10, color="darkred"))

    adjust_text(
        text_annotations,
        expand_points=(1.2, 1.4),
        arrowprops=dict(arrowstyle="-", color="black", lw=0.5),
    )

    class_text = "\n".join([rf"\textbf{{{k}}}: {v}" for k, v in class_counts.items()])
    plt.text(img_width + 10, img_height // 4, class_text, fontsize=12, verticalalignment="top", color="black")

        # Save YOLO-formatted labels
    with open(lvl_yolo_annotation_path, "w") as f:
        f.write("\n".join(yolo_annotations))
    print(f"YOLO annotation saved to {lvl_yolo_annotation_path}")

    output_image_path = os.path.join(ANNOTATED_IMG_DIR, image_file)
    plt.savefig(output_image_path, bbox_inches="tight", pad_inches=0.2)
    plt.show()

    print(f"Annotated image saved to {output_image_path}")
