In [1]:
from utils.util import *
from ultralytics import RTDETR
from PIL import Image, ImageDraw, ImageFont
from pathlib import Path

In [19]:
detector = RTDETR("../models/object_detection.pt")

classifier = get_pretrained_resnet(num_classes=1, pretrained=False)
classifier.load_state_dict(torch.load("../models/defect_detection_model.pth"))
classifier.to(DEVICE)

detectable_classes = {7:0, 5:1, 11:2, 1:4}
class_to_model = {0 : "defect_detection_glass_model.pth", 1 : "defect_detection_lighting_model.pth", 2: "defect_detection_polymer_model.pth", 4: "defect_detection_yoke_model.pth"}
class_to_problem = {0 : "Missing cap", 1 : "Rust", 2: "Rust", 4: "Rust"}

ResNet18 loaded. Final layer replaced for 1 output features.
Only the final layer will be trained initially.


In [23]:
models = {}

def load_models():
    for i, v in class_to_model.items():
        model_path = os.path.join("/Users/azizbek/Documents/Projects/PowerLine/models", v)
        model = get_pretrained_resnet(num_classes=1, pretrained=False)
        model.load_state_dict(torch.load(model_path))
        model.eval()
        models[i] = model

In [24]:
def get_model(class_id):
    return models[class_id]

In [45]:
def run_full_pipeline(img, image_path:str|Path, save_dir:Path|None=None, pad:int=0):
    """
    • Detect parts with REDETR
    • Crop each part
    • Run defect classifier on the crops
    • Optionally save crops for inspection
    """
    original_img = img

    if img is None:
        original_img = Image.open(image_path).convert("RGB")

    img_defections   = original_img.copy()          # red boxes will go here
    draw    = ImageDraw.Draw(img_defections)

    results = detector.predict(original_img)
    detections = []
    for result in results:
        # print(f"  Type of result.boxes: {type(result.boxes)}")
        # print(f"  Value of result.boxes: {result.boxes}")
        if result.boxes is not None and len(result.boxes) > 0:
            boxes = result.boxes
            confidences = boxes.conf
            cls_indices = boxes.cls

            # print(f"  Type of boxes: {type(boxes)}")
            # print(f"  Type of confidences: {type(confidences)}")
            # print(f"  Type of cls_indices: {type(cls_indices)}")

            for box, confidence, class_id in zip(boxes.xyxy, confidences, cls_indices):
                x1, y1, x2, y2 = box.tolist()
                confidence_value = confidence.item()

                c_id = int(class_id)
                class_name = result.names[c_id]

                print(f"Object Detected {class_name}-{c_id} with confidence: {confidence_value:.2f} at ({x1:.0f}, {y1:.0f}), ({x2:.0f}, {y2:.0f})")
                id = detectable_classes.get(c_id)
                print(f"Class id is now: {id}")
                if id is not None:
                    crop = crop_object(original_img, box)
                    model = get_model(id)
                    prob, label = predict_single(crop, model, DEVICE)
                    print(f"    Detected defect {label} with confidence: {prob:.2f}")
                    if label != 0:
                        print(f"    Detected defect: {class_to_problem[id]} with confidence: {prob:.2f}")
                        draw.rectangle([(x1, y1), (x2, y2)], outline="red", width=1)
                        draw.text((x1, y1 - 12), class_to_problem[id], fill="red")
                        detections.append({"id": id, "prob": prob, "label": label, "type": class_to_problem[id], "box": box, "confidence": confidence})


            img_object_detection = Image.fromarray(result.plot().astype('uint8')) # Convert to PIL Image
            save_path = f"./{save_dir}/object_{os.path.basename(image_path)}"
            save_path_d = f"./{save_dir}/detected_{os.path.basename(image_path)}"
            img_object_detection.save(save_path)  # Now you can use .save()
            print(f"  Saved annotated image to: {save_path}")
            if detections:
                img_defections.save(save_path_d)  # Now you can use .save()
                print(f"  Saved defected image to: {save_path}")
            else:
                print("  No defects detected")
            return result, detections
        else:
            print("  No objects detected in this image.")

In [48]:
load_models()
print(f"Loaded models: {len(models)}")
test_img = '/Users/azizbek/Downloads/tok-stoyka.png'
test_img = '/Users/azizbek/Documents/Projects/PowerLine/data/InsPLAD-fault/defect_supervised/yoke-suspension/val/rust/01-06-2021_DJI_0385_114.jpg'

outputs  = run_full_pipeline(img = None, image_path = test_img, save_dir="debug_crops")
print(outputs)

ResNet18 loaded. Final layer replaced for 1 output features.
Only the final layer will be trained initially.
ResNet18 loaded. Final layer replaced for 1 output features.
Only the final layer will be trained initially.
ResNet18 loaded. Final layer replaced for 1 output features.
Only the final layer will be trained initially.
ResNet18 loaded. Final layer replaced for 1 output features.
Only the final layer will be trained initially.
Loaded models: 4

0: 640x640 1 yoke, 1 yoke suspension, 797.3ms
Speed: 2.2ms preprocess, 797.3ms inference, 0.3ms postprocess per image at shape (1, 3, 640, 640)
Object Detected yoke suspension-1 with confidence: 0.90 at (43, 2), (269, 478)
Class id is now: 4
    Detected defect 0 with confidence: 0.08
Object Detected yoke-0 with confidence: 0.60 at (53, 0), (483, 485)
Class id is now: None
  Saved annotated image to: ./debug_crops/object_01-06-2021_DJI_0385_114.jpg
  No defects detected
(ultralytics.engine.results.Results object with attributes:

boxes: ult

In [8]:
import os, random, cv2, torch
from pathlib import Path

# ─── CONFIG ───────────────────────────────────────────────────────────────────
DEFECT_THR  = 0.5            # show box only if defect‑prob ≥ this
PAD         = 4              # pixels of padding when cropping

# simple RGB‑to‑tensor preproc matching your classifier
def to_tensor(img_bgr):
    return ( torch.from_numpy(cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB))
             .permute(2,0,1).float()/255. ).unsqueeze(0)

@torch.inference_mode()
def detect_and_filter(image_path: str|Path):
    im_bgr   = cv2.imread(str(image_path))
    h, w     = im_bgr.shape[:2]

    # 1 ▸ DETECT PARTS
    results  = detector.predict(im_bgr, conf=0.65, device=DEVICE, verbose=False)[0]
    if len(results.boxes) == 0:                      # → nothing at all
        return im_bgr, []

    boxes_xyxy = results.boxes.xyxy.cpu().numpy()    # [N,4]
    part_names = [results.names[int(c)] for c in results.boxes.cls]

    keep, info = [], []                              # filtered indices & meta
    # 2 ▸ CROP ▸ CLASSIFY EACH BOX
    for idx, (x1,y1,x2,y2) in enumerate(boxes_xyxy.astype(int)):
        # crop with small padding
        x1p,y1p = max(x1-PAD,0), max(y1-PAD,0)
        x2p,y2p = min(x2+PAD,w-1), min(y2+PAD,h-1)
        crop    = im_bgr[y1p:y2p, x1p:x2p]

        prob = torch.sigmoid(classifier(to_tensor(crop).to(DEVICE))).item()
        if prob >= DEFECT_THR:                       # defective → keep
            keep.append(idx)
            info.append((x1,y1,x2,y2, part_names[idx], prob))

    return im_bgr, info                              # original image + kept boxes

# ─── TEST LOOP ────────────────────────────────────────────────────────────────


In [10]:
num_test_images = 5
# Path to your directory of random images
image_dir = "../data/InsPLAD-det/val/images"
image_dir = "../data/InsPLAD-fault/unsupervised_anomaly_detection/glass-insulator/test/missingcap"

# Get a list of all image files
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir) if os.path.isfile(os.path.join(image_dir, f))]

for _ in range(num_test_images):
    img_path = random.choice(image_files)
    print(f"\n--- Processing image: {os.path.basename(img_path)} ---")

    img_bgr, bad_parts = detect_and_filter(img_path)
    if not bad_parts:
        print("  No defects found.")
        continue

    # draw only “bad” boxes
    for (x1,y1,x2,y2, label, prob) in bad_parts:
        cv2.rectangle(img_bgr, (x1,y1), (x2,y2), (0,0,255), 2)          # red box
        cv2.putText(img_bgr,
                    f"{label}: {prob:.2f}",
                    (x1, max(y1-5,12)),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.5, (0,0,255), 1, cv2.LINE_AA)
        print(f"    DEFECT {label}  p={prob:.2f}  box=({x1},{y1})–({x2},{y2})")

    save_path = f"../results/defect_detection/defects_{os.path.basename(img_path)}"
    cv2.imwrite(save_path, img_bgr)
    print(f"  Saved annotated defects‑only image to: {save_path}")


--- Processing image: Fotos 21-10-2020_DJI_0557_cadeia_isolador_vidro_1630.jpg ---
  No defects found.

--- Processing image: Fotos 21-10-2020_DJI_0537_cadeia_isolador_vidro_1615.jpg ---
  No defects found.

--- Processing image: Fotos 07-12-2020_DJI_0134_cadeia_isolador_vidro_1338.jpg ---
  No defects found.

--- Processing image: Fotos 03-12-2020_DJI_0361_cadeia_isolador_vidro_911.jpg ---
  No defects found.

--- Processing image: Fotos 03-12-2020_DJI_0361_cadeia_isolador_vidro_912.jpg ---
  No defects found.


In [38]:
import yaml

with open("../data/InsPLAD-det/data.yaml") as f:
    class_names = yaml.safe_load(f)["names"]

# Later:
print(class_names)

['yoke', 'yoke suspension', 'spacer', 'stockbridge damper', 'lightning rod shackle', 'lightning rod suspension', 'polymer insulator', 'glass insulator', 'tower id plate', 'vari-grip', 'polymer insulator lower shackle', 'polymer insulator upper shackle', 'polymer insulator tower shackle', 'glass insulator big shackle', 'glass insulator small shackle', 'glass insulator tower shackle', 'spiral damper', 'sphere']
