In [None]:
# For Testing Entire Directory/Folder
import os
import cv2
import csv
import json
import numpy as np
import torch
import pandas as pd
from PIL import Image, Image as PILImage
from ultralytics import YOLO
from torchvision import transforms
from torchvision.models import resnet50

# --- Constants ---
INPUT_DIR = "image_test/Drone Cracked Images"
OUTPUT_DIR = "image_test_inference/Drone Cracked Images Annotated Resnet"
TILE_SIZE = 512
TILE_DIR = "output_tiles"
ANNOTATED_DIR = "annotated_tiles"
BOXES_DIR = "tile_boxes"
CLASSIFIER_PATH = "resnet50_pv_classifier.pth"
YOLO_MODEL_PATH = "runs/detect/train_yolo_v8_new_dataset4/weights/best.pt"
CLASS_NAMES = ["Bird-drop", "Clean", "Dusty", "Physical-Damage"]

# --- Setup ---
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(TILE_DIR, exist_ok=True)
os.makedirs(ANNOTATED_DIR, exist_ok=True)
os.makedirs(BOXES_DIR, exist_ok=True)

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])
Image.MAX_IMAGE_PIXELS = None

# --- Clear Directories ---
def clear_directories(*dirs):
    for folder in dirs:
        if not os.path.exists(folder):
            os.makedirs(folder)
            continue
        for filename in os.listdir(folder):
            file_path = os.path.join(folder, filename)
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.remove(file_path)
        print(f"🧹 Cleared: {folder}")

# --- Tiling ---
def tile_image_with_mapping(image_path, tile_size=512, output_folder=TILE_DIR, metadata_file="tile_metadata.csv"):
    img = Image.open(image_path)
    width, height = img.size
    os.makedirs(output_folder, exist_ok=True)
    metadata_path = os.path.join(output_folder, metadata_file)

    with open(metadata_path, mode='w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['tile_name', 'x_start', 'y_start', 'width', 'height'])

        for y in range(0, height, tile_size):
            for x in range(0, width, tile_size):
                right = min(x + tile_size, width)
                lower = min(y + tile_size, height)
                tile = img.crop((x, y, right, lower)).convert("RGB")
                tile_name = f"tile_{x}_{y}.jpg"
                tile.save(os.path.join(output_folder, tile_name))
                writer.writerow([tile_name, x, y, right - x, lower - y])

# --- YOLO Filter ---
def is_likely_panel(crop):
    hsv = cv2.cvtColor(crop, cv2.COLOR_BGR2HSV)
    brightness = np.mean(hsv[:, :, 2])
    saturation = np.mean(hsv[:, :, 1])
    avg_rgb = np.mean(crop, axis=(0, 1)).mean()
    return (40 < brightness < 180) and (30 < saturation < 140) and (30 < avg_rgb < 180)

# --- YOLO Inference ---
def run_yolo_and_store_boxes():
    model = YOLO(YOLO_MODEL_PATH)
    for fname in sorted(os.listdir(TILE_DIR)):
        if not fname.lower().endswith(('.jpg', '.jpeg', '.png')):
            continue
        tile_path = os.path.join(TILE_DIR, fname)
        img = cv2.imread(tile_path)
        if img is None:
            continue

        results = model(img, conf=0.75, iou=0.84)[0]
        valid_boxes = []
        for box in results.boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
            crop = img[max(0, y1):min(img.shape[0], y2), max(0, x1):min(img.shape[1], x2)]
            if crop.shape[0] < 20 or crop.shape[1] < 20 or not is_likely_panel(crop):
                continue
            valid_boxes.append([x1, y1, x2, y2])

        if valid_boxes:
            with open(os.path.join(BOXES_DIR, fname.replace(".jpg", ".json")), "w") as f:
                json.dump(valid_boxes, f)

        cv2.imwrite(os.path.join(ANNOTATED_DIR, fname), img)

# --- ResNet Classification ---
def load_classifier():
    model = resnet50()
    model.fc = torch.nn.Linear(model.fc.in_features, len(CLASS_NAMES))
    model.load_state_dict(torch.load(CLASSIFIER_PATH, map_location="cpu"), strict=False)
    model.eval()
    return model


def classify_detected_panels():
    model = load_classifier()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for json_file in sorted(os.listdir(BOXES_DIR)):
        if not json_file.endswith(".json"):
            continue

        tile_name = json_file.replace(".json", ".jpg")
        tile_path = os.path.join(ANNOTATED_DIR, tile_name)
        with open(os.path.join(BOXES_DIR, json_file), "r") as f:
            boxes = json.load(f)

        img = cv2.imread(tile_path)
        rgb = img[:, :, ::-1]

        for box in boxes:
            x1, y1, x2, y2 = map(int, box)
            crop = rgb[y1:y2, x1:x2]
            if crop.shape[0] < 20 or crop.shape[1] < 20:
                continue
            tensor = transform(PILImage.fromarray(crop)).unsqueeze(0).to(device)
            with torch.no_grad():
                pred = model(tensor)
                label = CLASS_NAMES[torch.argmax(pred, dim=1).item()]

            color = (0, 255, 0) if label == "Clean" else (0, 0, 255)
            center_x = x1 + (x2 - x1) // 2
            center_y = y1 + (y2 - y1) // 2
            cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
            cv2.putText(img, label, (center_x - 20, center_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

        cv2.imwrite(tile_path, img)

# --- Restitch ---
def restitch_tiles(metadata_csv, annotated_dir, save_path):
    df = pd.read_csv(metadata_csv)
    full_width = df['x_start'].max() + df['width'].max()
    full_height = df['y_start'].max() + df['height'].max()
    canvas = np.zeros((full_height, full_width, 3), dtype=np.uint8)

    for _, row in df.iterrows():
        tile_path = os.path.join(annotated_dir, row['tile_name'])
        tile = cv2.imread(tile_path)
        if tile is None:
            continue
        x, y = int(row['x_start']), int(row['y_start'])
        canvas[y:y+tile.shape[0], x:x+tile.shape[1]] = tile

    cv2.imwrite(save_path, canvas)

# --- Main Loop ---
for image_name in sorted(os.listdir(INPUT_DIR)):
    if not image_name.lower().endswith((".jpg", ".jpeg", ".png")):
        continue

    print(f"\n🔍 Running pipeline for: {image_name}")

    clear_directories(TILE_DIR, BOXES_DIR, ANNOTATED_DIR)

    image_path = os.path.join(INPUT_DIR, image_name)
    output_image_name = os.path.splitext(image_name)[0] + "_annotated.jpg"
    output_image_path = os.path.join(OUTPUT_DIR, output_image_name)
    metadata_csv = os.path.join(TILE_DIR, "tile_metadata.csv")

    tile_image_with_mapping(image_path, TILE_SIZE, TILE_DIR, "tile_metadata.csv")
    run_yolo_and_store_boxes()
    classify_detected_panels()
    restitch_tiles(metadata_csv, ANNOTATED_DIR, output_image_path)

    print(f"✅ Done: {output_image_name}")

print("\n🎯 All images processed successfully.")



🔍 Running pipeline for: DJI_20250508151227_0001_V.JPG
🧹 Cleared: output_tiles
🧹 Cleared: tile_boxes
🧹 Cleared: annotated_tiles

0: 640x640 (no detections), 7.5ms
Speed: 3.2ms preprocess, 7.5ms inference, 19.6ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 solar_panel, 7.3ms
Speed: 2.7ms preprocess, 7.3ms inference, 69.4ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 solar_panel, 8.0ms
Speed: 2.9ms preprocess, 8.0ms inference, 1.6ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 solar_panel, 8.5ms
Speed: 2.9ms preprocess, 8.5ms inference, 1.4ms postprocess per image at shape (1, 3, 640, 640)

0: 576x640 2 solar_panels, 33.6ms
Speed: 2.9ms preprocess, 33.6ms inference, 1.5ms postprocess per image at shape (1, 3, 576, 640)

0: 640x640 1 solar_panel, 8.0ms
Speed: 2.9ms preprocess, 8.0ms inference, 1.3ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 solar_panel, 7.6ms
Speed: 2.5ms preprocess, 7.6ms inference, 1.5ms postproces

KeyboardInterrupt: 

In [None]:
import os
import cv2
import csv
import json
import numpy as np
import torch
import pandas as pd
from PIL import Image, Image as PILImage
from ultralytics import YOLO
from torchvision import transforms
from torchvision.models import resnet50

# --- Constants ---
INPUT_IMAGE = "image_test/Drone Cracked Images/DJI_20250508151227_0001_V.jpg"
OUTPUT_IMAGE = "image_test_inference_img/DJI_20250508151227_0001_V_annotated.jpg"
TILE_SIZE = 512
TILE_DIR = "output_tiles"
ANNOTATED_DIR = "annotated_tiles"
BOXES_DIR = "tile_boxes"
CLASSIFIER_PATH = "resnet50_pv_classifier.pth"
YOLO_MODEL_PATH = "runs/detect/train_yolo_v8_new_dataset4/weights/best.pt"
CLASS_NAMES = ["Bird-drop", "Clean", "Dusty", "Physical-Damage"]

# --- Setup ---
os.makedirs("image_test_inference_img", exist_ok=True)
os.makedirs(TILE_DIR, exist_ok=True)
os.makedirs(ANNOTATED_DIR, exist_ok=True)
os.makedirs(BOXES_DIR, exist_ok=True)

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])
Image.MAX_IMAGE_PIXELS = None

# --- Clear Directories ---
def clear_directories(*dirs):
    for folder in dirs:
        if not os.path.exists(folder):
            os.makedirs(folder)
            continue
        for filename in os.listdir(folder):
            file_path = os.path.join(folder, filename)
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.remove(file_path)
        print(f"🧹 Cleared: {folder}")

# --- Tiling ---
def tile_image_with_mapping(image_path, tile_size=512, output_folder=TILE_DIR, metadata_file="tile_metadata.csv"):
    img = Image.open(image_path)
    width, height = img.size
    metadata_path = os.path.join(output_folder, metadata_file)

    with open(metadata_path, mode='w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['tile_name', 'x_start', 'y_start', 'width', 'height'])

        for y in range(0, height, tile_size):
            for x in range(0, width, tile_size):
                right = min(x + tile_size, width)
                lower = min(y + tile_size, height)
                tile = img.crop((x, y, right, lower)).convert("RGB")
                tile_name = f"tile_{x}_{y}.jpg"
                tile.save(os.path.join(output_folder, tile_name))
                writer.writerow([tile_name, x, y, right - x, lower - y])

# --- YOLO Filter ---
def is_likely_panel(crop):
    hsv = cv2.cvtColor(crop, cv2.COLOR_BGR2HSV)
    brightness = np.mean(hsv[:, :, 2])
    saturation = np.mean(hsv[:, :, 1])
    avg_rgb = np.mean(crop, axis=(0, 1)).mean()
    return (40 < brightness < 180) and (30 < saturation < 140) and (30 < avg_rgb < 180)

# --- YOLO Inference ---
def run_yolo_and_store_boxes():
    model = YOLO(YOLO_MODEL_PATH)
    for fname in sorted(os.listdir(TILE_DIR)):
        if not fname.lower().endswith(('.jpg', '.jpeg', '.png')):
            continue
        tile_path = os.path.join(TILE_DIR, fname)
        img = cv2.imread(tile_path)
        if img is None:
            continue

        results = model(img, conf=0.75, iou=0.84)[0]
        valid_boxes = []
        for box in results.boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
            crop = img[max(0, y1):min(img.shape[0], y2), max(0, x1):min(img.shape[1], x2)]
            if crop.shape[0] < 20 or crop.shape[1] < 20 or not is_likely_panel(crop):
                continue
            valid_boxes.append([x1, y1, x2, y2])

        if valid_boxes:
            with open(os.path.join(BOXES_DIR, fname.replace(".jpg", ".json")), "w") as f:
                json.dump(valid_boxes, f)

        cv2.imwrite(os.path.join(ANNOTATED_DIR, fname), img)

# --- ResNet Classification ---
def load_classifier():
    model = resnet50()
    model.fc = torch.nn.Linear(model.fc.in_features, len(CLASS_NAMES))
    model.load_state_dict(torch.load(CLASSIFIER_PATH, map_location="cpu"), strict=False)
    model.eval()
    return model

def classify_detected_panels():
    model = load_classifier()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for json_file in sorted(os.listdir(BOXES_DIR)):
        if not json_file.endswith(".json"):
            continue

        tile_name = json_file.replace(".json", ".jpg")
        tile_path = os.path.join(ANNOTATED_DIR, tile_name)
        with open(os.path.join(BOXES_DIR, json_file), "r") as f:
            boxes = json.load(f)

        img = cv2.imread(tile_path)
        rgb = img[:, :, ::-1]

        for box in boxes:
            x1, y1, x2, y2 = map(int, box)
            crop = rgb[y1:y2, x1:x2]
            if crop.shape[0] < 20 or crop.shape[1] < 20:
                continue
            tensor = transform(PILImage.fromarray(crop)).unsqueeze(0).to(device)
            with torch.no_grad():
                pred = model(tensor)
                label = CLASS_NAMES[torch.argmax(pred, dim=1).item()]

            color = (0, 255, 0) if label == "Clean" else (0, 0, 255)
            center_x = x1 + (x2 - x1) // 2
            center_y = y1 + (y2 - y1) // 2
            cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
            cv2.putText(img, label, (center_x - 20, center_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

        cv2.imwrite(tile_path, img)

# --- Restitch ---
def restitch_tiles(metadata_csv, annotated_dir, save_path):
    df = pd.read_csv(metadata_csv)
    full_width = df['x_start'].max() + df['width'].max()
    full_height = df['y_start'].max() + df['height'].max()
    canvas = np.zeros((full_height, full_width, 3), dtype=np.uint8)

    for _, row in df.iterrows():
        tile_path = os.path.join(annotated_dir, row['tile_name'])
        tile = cv2.imread(tile_path)
        if tile is None:
            continue
        x, y = int(row['x_start']), int(row['y_start'])
        canvas[y:y+tile.shape[0], x:x+tile.shape[1]] = tile

    cv2.imwrite(save_path, canvas)

# --- Run Pipeline for Single Image ---
print(f"\n🔍 Running pipeline for: {INPUT_IMAGE}")
clear_directories(TILE_DIR, BOXES_DIR, ANNOTATED_DIR)
tile_image_with_mapping(INPUT_IMAGE, TILE_SIZE, TILE_DIR, "tile_metadata.csv")
run_yolo_and_store_boxes()
classify_detected_panels()
restitch_tiles(os.path.join(TILE_DIR, "tile_metadata.csv"), ANNOTATED_DIR, OUTPUT_IMAGE)
print(f"✅ Done: {OUTPUT_IMAGE}")



🔍 Running pipeline for: image_test/Drone Cracked Images/DJI_20250508151227_0001_V.jpg
🧹 Cleared: output_tiles
🧹 Cleared: tile_boxes
🧹 Cleared: annotated_tiles

0: 640x640 (no detections), 6.9ms
Speed: 2.7ms preprocess, 6.9ms inference, 0.6ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 solar_panel, 6.5ms
Speed: 2.5ms preprocess, 6.5ms inference, 1.9ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 solar_panel, 7.3ms
Speed: 2.3ms preprocess, 7.3ms inference, 1.2ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 solar_panel, 6.8ms
Speed: 2.5ms preprocess, 6.8ms inference, 2.2ms postprocess per image at shape (1, 3, 640, 640)

0: 576x640 2 solar_panels, 7.6ms
Speed: 2.5ms preprocess, 7.6ms inference, 1.5ms postprocess per image at shape (1, 3, 576, 640)

0: 640x640 1 solar_panel, 7.4ms
Speed: 2.2ms preprocess, 7.4ms inference, 1.8ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 solar_panel, 7.3ms
Speed: 2.7ms preprocess, 7.3ms