Using Neural Networks and Latency mechanism[1] to spot cracks and reduce the number of false positives


[1]Enhanced Video-Level Anomaly Feature Detection for Nuclear Power Plant
Component Inspections Using the Latency Mechanism - Zhouxiang Fei, Graeme M. West, Paul Murray, Gordon Dobie

- using CNN (either GoogleLeNet or YOLOv8) to classify patches of an image
- seeing if the flagged patches remain for a certain number of frames
- flagging as crack if it does


In [None]:
!pip install torch opencv-python numpy torchvision pillow

In [None]:
!.\crack_detection\Scripts\activate
!pip install ultralytics

In [None]:
import torch
import cv2
import numpy as np
from torchvision import transforms
from PIL import Image
from torch import nn
import torch
import torch.nn.functional as F
from ultralytics import YOLO


test running model on single crack image

In [None]:
model_path = "best (1).pt"
model = YOLO(model_path).to("cpu")


image_path = "crack-outside-of-control-joint-300x225.jpg"  # Replace with your image path
image = cv2.imread(image_path)

results = model(image)

# Display results
for result in results:
    # Show the detected segmentation masks
    result.show()  # Opens a window with the detection results


In [None]:
import cv2
from google.colab.patches import cv2_imshow  # Import this only if you're using Google Colab

# Load the video
cap = cv2.VideoCapture("WhatsApp Video 2025-02-08 at 15.49.18.mp4")

while True:
    # Capture frame-by-frame
    ret, frame = cap.read()

    if not ret:
        print("Error: Failed to capture frame.")
        break

    # Run YOLOv8 inference on the frame
    results = model.predict(frame)  # You can also pass a path to a frame image if you need

    # Process results (boxes, labels, and confidence scores)
    boxes = results[0].boxes  # Bounding box data for detected objects
    for box in boxes:
        # Draw bounding boxes around detected cracks
        x1, y1, x2, y2 = map(int, box.xyxy[0])  # Get the coordinates (x1, y1, x2, y2) of the box
        conf = box.conf[0]  # Get the confidence of the detection

        # Draw a rectangle and label (optional)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)  # Green box
        cv2.putText(frame, f'{conf:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Show the resulting frame with detected cracks
    cv2_imshow(frame)  # ✅ Works in Google Colab

    # Exit if the user presses 'q'
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release the capture object and close any OpenCV windows
cap.release()
cv2.destroyAllWindows()

In [None]:
import cv2

video_path = "WhatsApp Video 2025-02-08 at 15.49.18.mp4"
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print("❌ Error: Could not open video file. Check if the file exists.")
else:
    print("✅ Video file opened successfully!")

frame_count = 0

while True:
    ret, frame = cap.read()

    if not ret:
        print(f"❌ No more frames to read. Total frames processed: {frame_count}")
        break

    frame_count += 1

cap.release()
print(f"✅ Successfully read {frame_count} frames from the video.")


In [None]:
import cv2
import numpy as np
from collections import deque
from ultralytics import YOLO
from google.colab.patches import cv2_imshow  # ✅ Required for Google Colab

# Load YOLO model
model_path = "best (1).pt"
model = YOLO(model_path).to("cpu")

# Video Capture
video_path = "WhatsApp Video 2025-02-08 at 15.49.18.mp4"
cap = cv2.VideoCapture(video_path)

# Check if the video file is opened successfully
if not cap.isOpened():
    print("❌ Error: Could not open video file. Please check the file path.")
    exit()

# Parameters for the latency mechanism
latency_threshold = 3  # ✅ Frames needed to confirm a detection
detection_history = {}  # ✅ Track detections across frames

# ✅ Cumulative crack counters across all frames
total_cracks_before_latency = 0
total_cracks_after_latency = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        print("❌ No more frames to read.")
        break

    # ✅ Run YOLO on the frame
    results = model.predict(frame)
    boxes = results[0].boxes
    current_frame_detections = set()  # ✅ Stores unique detections in this frame

    # ✅ Count cracks in this frame before latency
    cracks_before_latency = len(boxes)
    total_cracks_before_latency += cracks_before_latency  # ✅ Accumulate the count

    # ✅ Store detected cracks for latency processing
    for box in boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0])  # Bounding box coordinates
        conf = float(box.conf[0])  # Confidence score

        # ✅ Use the center of the bounding box as a unique key
        center_x = (x1 + x2) // 2
        center_y = (y1 + y2) // 2
        key = (center_x, center_y)

        current_frame_detections.add(key)  # ✅ Add to current frame detections

        # ✅ Update detection history for latency check
        if key in detection_history:
            detection_history[key] += 1
        else:
            detection_history[key] = 1

    # ✅ Find confirmed cracks after latency filtering
    confirmed_detections = set()
    for key, count in detection_history.items():
        if count >= latency_threshold:  # ✅ Only count persistent detections
            confirmed_detections.add(key)

    # ✅ Count and accumulate confirmed detections
    cracks_after_latency = len(confirmed_detections)
    total_cracks_after_latency += cracks_after_latency  # ✅ Accumulate

    # ✅ Draw confirmed detections on the frame
    for key in confirmed_detections:
        for box in boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            center_x = (x1 + x2) // 2
            center_y = (y1 + y2) // 2
            if (center_x, center_y) == key:
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)  # ✅ Green box
                cv2.putText(frame, f'{conf:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX,
                            0.5, (0, 255, 0), 2)
                break  # ✅ Stop after finding the matching box

    # ✅ Remove detections that are no longer seen
    for key in list(detection_history.keys()):
        if key not in current_frame_detections:
            del detection_history[key]  # ✅ Remove missing detections

    # ✅ Display crack counts on the frame
    cv2.putText(frame, f'Frame Cracks Before Latency: {cracks_before_latency}', (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    cv2.putText(frame, f'Frame Cracks After Latency: {cracks_after_latency}', (10, 60),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

    # ✅ Show the resulting frame with detected cracks
    cv2_imshow(frame)

    # ✅ Optional: Stop after a few frames for debugging
    # break  # 🔴 REMOVE this if you want to process the full video

# ✅ Release video capture
#cap.release()
#cv2.destroyAllWindows()

# ✅ Print final crack counts
print(f"🔹 Total cracks detected before latency: {total_cracks_before_latency}")
print(f"🔹 Total cracks detected after latency: {total_cracks_after_latency}")


In [None]:
import cv2
import numpy as np
from collections import deque
from ultralytics import YOLO
from google.colab.patches import cv2_imshow  # ✅ Required for Google Colab

# Load YOLO model
model_path = "best (1).pt"
model = YOLO(model_path).to("cpu")

# Video Capture
video_path = "WhatsApp Video 2025-02-08 at 15.49.18.mp4"
cap = cv2.VideoCapture(video_path)

# Check if the video file is opened successfully
if not cap.isOpened():
    print("❌ Error: Could not open video file. Please check the file path.")
    exit()

# Parameters for the latency mechanism
latency_threshold = 2  # ✅ Frames needed to confirm a detection
detection_history = {}  # ✅ Track detections across frames

# ✅ Cumulative crack counters across all frames
total_cracks_before_latency = 0
total_cracks_after_latency = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        print("❌ No more frames to read.")
        break

    # ✅ Run YOLO on the frame
    results = model.predict(frame)
    boxes = results[0].boxes
    current_frame_detections = set()  # ✅ Stores unique detections in this frame

    # ✅ Count cracks in this frame before latency
    cracks_before_latency = len(boxes)
    total_cracks_before_latency += cracks_before_latency  # ✅ Accumulate the count

    # ✅ Store detected cracks for latency processing
    for box in boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0])  # Bounding box coordinates
        conf = float(box.conf[0])  # Confidence score

        # ✅ Use the center of the bounding box as a unique key
        center_x = (x1 + x2) // 2
        center_y = (y1 + y2) // 2
        key = (center_x, center_y)

        current_frame_detections.add(key)  # ✅ Add to current frame detections

        # ✅ Update detection history for latency check
        if key in detection_history:
            detection_history[key] += 1
        else:
            detection_history[key] = 1

    # ✅ Find confirmed cracks after latency filtering
    confirmed_detections = set()
    for key, count in detection_history.items():
        if count >= latency_threshold:  # ✅ Only count persistent detections
            confirmed_detections.add(key)

    # ✅ Count and accumulate confirmed detections
    cracks_after_latency = len(confirmed_detections)
    total_cracks_after_latency += cracks_after_latency  # ✅ Accumulate

    # ✅ Draw confirmed detections on the frame
    for key in confirmed_detections:
        for box in boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            center_x = (x1 + x2) // 2
            center_y = (y1 + y2) // 2
            if (center_x, center_y) == key:
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)  # ✅ Green box
                cv2.putText(frame, f'{conf:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX,
                            0.5, (0, 255, 0), 2)
                break  # ✅ Stop after finding the matching box

    # ✅ Remove detections that are no longer seen
    for key in list(detection_history.keys()):
        if key not in current_frame_detections:
            del detection_history[key]  # ✅ Remove missing detections

    # ✅ Display crack counts on the frame
    cv2.putText(frame, f'Frame Cracks Before Latency: {cracks_before_latency}', (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    cv2.putText(frame, f'Frame Cracks After Latency: {cracks_after_latency}', (10, 60),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

    # ✅ Show the resulting frame with detected cracks
    cv2_imshow(frame)

    # ✅ Optional: Stop after a few frames for debugging
    # break  # 🔴 REMOVE this if you want to process the full video

# ✅ Release video capture
#cap.release()
#cv2.destroyAllWindows()

# ✅ Print final crack counts
print(f"🔹 Total cracks detected before latency: {total_cracks_before_latency}")
print(f"🔹 Total cracks detected after latency: {total_cracks_after_latency}")
