In [None]:
# Import necessary libraries
import os
import numpy as np
import tensorflow as tf
import cv2
import matplotlib.pyplot as plt
from PIL import Image
import time
import glob

: 

In [None]:
# Define paths for the project
BASE_DIR = os.getcwd()
INPUT_DIR = os.path.join(BASE_DIR, 'input')
OUTPUT_DIR = os.path.join(BASE_DIR, 'output')
MODEL_DIR = os.path.join(BASE_DIR, 'object_detection', 'output_inference_graph')
FROZEN_MODEL_PATH = os.path.join(MODEL_DIR, 'frozen_inference_graph_pothole.pb')

# Create output directory if it doesn't exist
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)
    
print(f"Base directory: {BASE_DIR}")
print(f"Input directory: {INPUT_DIR}")
print(f"Output directory: {OUTPUT_DIR}")
print(f"Model directory: {MODEL_DIR}")
print(f"Frozen model path: {FROZEN_MODEL_PATH}")

# Check if the input folder exists and list images
if os.path.exists(INPUT_DIR):
    images = glob.glob(os.path.join(INPUT_DIR, '*.jpg'))
    print(f"Found {len(images)} images in input folder:")
    for img in images:
        print(f" - {os.path.basename(img)}")
else:
    print("Input directory does not exist. Please check the path.")

In [None]:
# Function to load the frozen TensorFlow model
def load_model(frozen_model_path):
    detection_graph = tf.Graph()
    with detection_graph.as_default():
        od_graph_def = tf.GraphDef()
        with tf.gfile.GFile(frozen_model_path, 'rb') as fid:
            serialized_graph = fid.read()
            od_graph_def.ParseFromString(serialized_graph)
            tf.import_graph_def(od_graph_def, name='')
    return detection_graph

# Load the model if the frozen model exists
if os.path.exists(FROZEN_MODEL_PATH):
    print("Loading pre-trained model...")
    detection_graph = load_model(FROZEN_MODEL_PATH)
    print("Model loaded successfully!")
else:
    print(f"Error: Could not find model at {FROZEN_MODEL_PATH}")
    print("Please ensure the model file exists at this location.")

In [None]:
# Helper functions for detection and visualization

def load_image_into_numpy_array(image):
    (im_width, im_height) = image.size
    return np.array(image.getdata()).reshape(
        (im_height, im_width, 3)).astype(np.uint8)

def detect_objects(image_np, detection_graph, sess):
    # Get handles to input and output tensors
    ops = tf.get_default_graph().get_operations()
    all_tensor_names = {output.name for op in ops for output in op.outputs}
    tensor_dict = {}
    for key in [
        'num_detections', 'detection_boxes', 'detection_scores',
        'detection_classes'
    ]:
        tensor_name = key + ':0'
        if tensor_name in all_tensor_names:
            tensor_dict[key] = tf.get_default_graph().get_tensor_by_name(
                tensor_name)
            
    image_tensor = tf.get_default_graph().get_tensor_by_name('image_tensor:0')
    
    # Run inference
    output_dict = sess.run(tensor_dict,
                         feed_dict={image_tensor: np.expand_dims(image_np, 0)})
    
    # Convert to numpy arrays
    output_dict['num_detections'] = int(output_dict['num_detections'][0])
    output_dict['detection_classes'] = output_dict['detection_classes'][0].astype(np.uint8)
    output_dict['detection_boxes'] = output_dict['detection_boxes'][0]
    output_dict['detection_scores'] = output_dict['detection_scores'][0]
    return output_dict

def visualize_detection_results(image_np, output_dict, category_index, threshold=0.5):
    # Visualization of the results
    from object_detection.utils import visualization_utils as vis_util
    
    vis_util.visualize_boxes_and_labels_on_image_array(
        image_np,
        output_dict['detection_boxes'],
        output_dict['detection_classes'],
        output_dict['detection_scores'],
        category_index,
        use_normalized_coordinates=True,
        line_thickness=8,
        min_score_thresh=threshold)
    
    return image_np

In [None]:
# Define categories for anomaly detection
# Replace these with the actual categories from your model
category_index = {
    1: {'id': 1, 'name': 'pothole'},
    # Add more categories if needed
}

In [None]:
# Detect and visualize anomalies in test images
def process_images():
    if not os.path.exists(FROZEN_MODEL_PATH):
        print("Model not found. Please check the path.")
        return
        
    with detection_graph.as_default():
        with tf.Session() as sess:
            for image_path in images:
                print(f"Processing {os.path.basename(image_path)}...")
                
                # Read and prepare image
                image = Image.open(image_path)
                image_np = load_image_into_numpy_array(image)
                
                # Detect objects
                start_time = time.time()
                output_dict = detect_objects(image_np, detection_graph, sess)
                elapsed_time = time.time() - start_time
                print(f"Detection took {elapsed_time:.2f} seconds")
                
                # Visualize results
                vis_image = visualize_detection_results(image_np.copy(), output_dict, category_index)
                
                # Display the image
                plt.figure(figsize=(12, 8))
                plt.imshow(vis_image)
                plt.title(f"Detected anomalies in {os.path.basename(image_path)}")
                plt.axis('off')
                plt.show()
                
                # Save the output image
                output_path = os.path.join(OUTPUT_DIR, f"detected_{os.path.basename(image_path)}")
                cv2.imwrite(output_path, cv2.cvtColor(vis_image, cv2.COLOR_RGB2BGR))
                print(f"Saved output to {output_path}")
                print("-" * 50)

# Process all images
if os.path.exists(FROZEN_MODEL_PATH):
    process_images()
else:
    print("Cannot process images without a valid model.")

In [None]:
# Function to process a single image for demonstration
def process_single_image(image_path):
    if not os.path.exists(FROZEN_MODEL_PATH):
        print("Model not found. Please check the path.")
        return
        
    with detection_graph.as_default():
        with tf.Session() as sess:
            print(f"Processing {os.path.basename(image_path)}...")
            
            # Read and prepare image
            image = Image.open(image_path)
            image_np = load_image_into_numpy_array(image)
            
            # Detect objects
            output_dict = detect_objects(image_np, detection_graph, sess)
            
            # Visualize results
            vis_image = visualize_detection_results(image_np.copy(), output_dict, category_index)
            
            # Display the image
            plt.figure(figsize=(12, 8))
            plt.imshow(vis_image)
            plt.title(f"Detected anomalies in {os.path.basename(image_path)}")
            plt.axis('off')
            plt.show()
            
            # Count detections above threshold
            scores = output_dict['detection_scores']
            boxes = output_dict['detection_boxes']
            classes = output_dict['detection_classes']
            
            detections = sum(score > 0.5 for score in scores)
            print(f"Found {detections} anomalies with confidence > 0.5")
            
            # Print top detections
            for i in range(min(5, detections)):
                class_id = classes[i]
                class_name = category_index.get(class_id, {}).get('name', 'Unknown')
                confidence = scores[i]
                box = boxes[i]
                print(f"Detection {i+1}: {class_name} with {confidence:.2f} confidence at position {box}")

# Example usage - uncomment to try with a specific image
# if images:
#    process_single_image(images[0])