In [None]:
! pip install torchmetrics

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchmetrics
  Downloading torchmetrics-0.11.4-py3-none-any.whl (519 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m519.2/519.2 kB[0m [31m9.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: torchmetrics
Successfully installed torchmetrics-0.11.4


In [None]:
import torch
import tensorflow
import torchvision
import torchvision.transforms as transforms
import torchmetrics
from torchmetrics.detection.mean_ap import MeanAveragePrecision
from torchmetrics import StructuralSimilarityIndexMeasure
import cv2
import time
import numpy as np
from PIL import Image
from google.colab.patches import cv2_imshow
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Set up the model
- define classes
- load model
- define predict function


In [None]:
# Define COCO class names
# We won't use this but it's here if you need to sanity check 
coco_names = [
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A', 'stop sign',
    'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack', 'umbrella', 'N/A', 'N/A',
    'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
    'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket',
    'bottle', 'N/A', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl',
    'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza',
    'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table',
    'N/A', 'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
    'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A', 'book',
    'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]

In [None]:
# Define the torchvision image transforms
transform = transforms.Compose([
    transforms.ToTensor(),
])

# Load the object detection model, SSD, and set mode to eval
model = torchvision.models.detection.ssd300_vgg16(pretrained=True)
model.eval()
model = model.to("cuda")

Downloading: "https://download.pytorch.org/models/ssd300_vgg16_coco-b556d3b4.pth" to /root/.cache/torch/hub/checkpoints/ssd300_vgg16_coco-b556d3b4.pth
100%|██████████| 136M/136M [00:01<00:00, 97.5MB/s]


In [None]:
# Function to run a single image through model and get boxes, labels, and scores
def predict(image, model, detection_threshold):
    # transform the image to tensor
    image = transform(image)

    # add a batch dimension
    image = image.unsqueeze(0) 

    image = image.to("cuda")
    # get the predictions on the image
    outputs = model(image) 

    # get score for all the predicted objects
    pred_scores = outputs[0]['scores'].to("cpu")
    pred_scores = pred_scores.detach()

    # get all the predicted bounding boxes and filter by threshold
    pred_bboxes = outputs[0]['boxes'].to("cpu")
    pred_bboxes = pred_bboxes.detach()
    boxes = pred_bboxes[pred_scores >= detection_threshold]

    # get all predicted labels and filter by threshold    
    labels = outputs[0]['labels'].to("cpu")
    labels = labels[pred_scores >= detection_threshold]

    scores = pred_scores[pred_scores >= detection_threshold]

    return boxes, labels, scores 

# TESTING

If you need to sanity check your predictions, you can use this section

In [None]:
image = Image.open('/content/drive/My Drive/CS181_FINAL_PROJECT/Resources/.jpeg')
boxes, labels, scores = predict(image, model, 0.3)
print(boxes)
print(scores)
print(labels)

In [None]:
COLORS = np.random.uniform(0, 255, size=(91, 3))
def draw_boxes(boxes, labels, image):
    """
    Draws the bounding box around a detected object.
    """
    image = cv2.cvtColor(np.asarray(image), cv2.COLOR_BGR2RGB)
    for i, box in enumerate(boxes):
        color = COLORS[labels[i]]
        cv2.rectangle(
            image,
            (int(box[0]), int(box[1])),
            (int(box[2]), int(box[3])),
            color, 2
        )
        cv2.putText(image, str(labels[i]), (int(box[0]), int(box[1]-5)),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2, 
                    lineType=cv2.LINE_AA)
    return image

image = draw_boxes(boxes, labels, image)
cv2_imshow(image)

# Accuracy Comparison

In [None]:
# Function that takes "ground truth" boxes and labels and "comparison" boxes, labels,
# and scores, and returns the accuracy of the comparison result relative to the ground truth 

In [None]:
def calculate_accuracy(ground_truth, prediction):
    metric = MeanAveragePrecision(iou_type="bbox")
    metric.update(prediction, ground_truth)
    result = metric.compute()
    return result['map'].item()

# Read video and process frames
- Read in a video and loop through its frames using the OpenCV library
- Run the predict function on each frame

In [None]:

def should_process_frame(frame, prev_frame, index, policy, threshold, blurred):

  if (index == 0) : return True
  
  # prev_frame = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
  # frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  
  if(blurred):
    prev_frame = cv2.GaussianBlur(prev_frame, (3, 3), 0)
    frame = cv2.GaussianBlur(frame, (3, 3), 0)
  
  # Applying the function mse
  if(policy == "mse"):
    def mse(img1, img2):
      h, w, d = img1.shape
      diff = cv2.subtract(img1, img2)
      # cv2_imshow(diff)
      err = np.sum(diff**2)
      mse = err/(float(h*w*d))
      return mse, diff
    error, diff = mse(prev_frame, frame)

  # Applying the function mae
  if(policy == "mae"):
    def mae(img1, img2):
      h, w, d = img1.shape
      diff = cv2.subtract(img1, img2)
      err = np.sum(abs(diff))
      mae = err/(float(h*w*d))
      return mae, diff
    error, diff = mae(prev_frame, frame)

  # Applying the function ssim
  if(policy == "ssim"):
    def ssim(img1, img2):
      func = StructuralSimilarityIndexMeasure()
      img1 = torch.from_numpy(img1)
      # add the batch dimesnion
      img1 = torch.unsqueeze(img1, dim = 0)
      # reorder tensor to follow the dimesnions B*C*H*W; it's orginially B*H*W*C
      img1 = torch.movedim(img1, (0,1,2,3), (0,2,3, 1))
      img1 = img1.float()
      img2 = torch.from_numpy(img2)
      # add the batch dimesnion
      img2 = torch.unsqueeze(img2, dim = 0)
      # reorder tensor to follow the dimesnions B*C*H*W; it's orginially B*H*W*C
      img2 = torch.movedim(img2, (0,1,2,3),(0,2,3, 1))
      img2 = img2.float()
      similarity = func(img1, img2)
      return similarity.item()
    error = ssim(prev_frame, frame)

 
  
  # print(error)
  if (error < threshold): return False 
  else: return True

In [None]:
def process_video(video_path, model):
        # Ask user for filtering policy and ensure it's valid
        policy = input('Enter the filtering policy (mse, mae, ssim): ')
        if policy not in ['mse', 'mae', 'ssim']:
          print('Please enter a valid filtering policy')
          return None, None

        blurred = False
        blurredInput = input('Would you like the frame Blurred? (Y/N)')
        if blurredInput == "Y" or blurredInput == "Yes" or blurredInput == "y" or blurredInput == "yes":
          blurred = True  

        threshold = int(input('Enter the THRESHOLD: '))

        ground_truth = {}
        comparison = {}

        # Set up video capture
        cap = cv2.VideoCapture(video_path)
        frame_count = int(cv2.VideoCapture.get(cap, int(cv2.CAP_PROP_FRAME_COUNT)))
        print(f'Frame count: {frame_count}')
        ret, frame = cap.read()
        prev_frame = None

        index = -1
        start_time = time.time()

        frames_filtered  = 0

        # Store the results of the last processed frame. If we skip a frame, fill "comparison" with the 
        # last processed frame
        prev_frame_results = (None, None, None)

        # Loop through frames
        while cap.isOpened():
            if not ret:
                break
            index += 1
             
            if index%100 == 0:
              print(f'Reached index {index}')
              # if(index != 0) : print(f'PercentageFilteredSoFar: {frames_filtered/index}')

            image_id = f'image{index}'
            ground_truth[image_id] = {}
            comparison[image_id] = {}

            # Run prediction on this frame. We have to run it regardless of our filtering
            # method so we can assess ground truth
            boxes, labels, scores = predict(frame, model, 0.3)
            
            ground_truth[image_id]['boxes'] = boxes
            ground_truth[image_id]['labels'] = labels

            if should_process_frame(frame, prev_frame, index, policy, threshold, blurred):
                # This becomes the last frame processed
                prev_frame_results = (boxes, labels, scores)
                prev_frame = frame.copy()

                # Comparison contains results of this frame
                comparison[image_id]['boxes'] = boxes
                comparison[image_id]['labels'] = labels
                comparison[image_id]['scores'] = scores
            else:
                frames_filtered += 1
                # Use the previous frame's results instead
                comparison[image_id]['boxes'] = prev_frame_results[0]
                comparison[image_id]['labels'] = prev_frame_results[1]
                comparison[image_id]['scores'] = prev_frame_results[2]
            ret, frame = cap.read()

        end_time = time.time()
        print(f'Total time: {end_time - start_time}')
        print(f'Threshold: {threshold}')
        print(f'Frame Percentage Filtered Out: { frames_filtered / frame_count}')
        
        # Calculate percentage of frames selected and write indexes of those to a json file
        cap.release()

        ground_truth_formatted = [v for k,v in ground_truth.items()]
        comparison_formatted = [v for k,v in comparison.items()]

        # print(f'Ground truth: {ground_truth_formatted}')
        # print(f'Comparison: {comparison_formatted}')

        return ground_truth_formatted, comparison_formatted

# Run the video through the model and get the result!

This is the code block you'll want to rerun as you change configurations and videos

In [None]:
# In order to work through sharing. Go to the folder CS181_FINAL_PROJECT and select (Add Shortcut To Drive) and add it to your Colab Notebooks folder. 
# then the below code should work without error. 
gt, comp = process_video('/content/drive/My Drive/Colab Notebooks/CS181_FINAL_PROJECT/Resources/old/Banff.mp4', model)
mAP = calculate_accuracy(gt, comp)

print(f'mAP: { mAP}')