# Data Annotation with Prediction Assistant based on Intelligently Selected Data (ISD)

Key commands

| Button | Description |
|--------|-------------|
| s | **Save** file w/ annotations |
| a | Move image to `to_annotate` |
| z | **Undo** most recent box |
| L click | **Draw** box |
| R click | **Highlight** boxes to delete |
| r | **Remove** highlighted boxes |
| 1 | Change light to **green** |
| 2 | Change light to **red** |
| 3 | Change light to **yellow** |


Required imports

In [1]:
import os
import sys
import shutil
from datetime import datetime
from dotenv import load_dotenv

import cv2
from ultralytics import YOLO

sys.path.append(os.path.abspath('../utils'))

import screen_info
import annotation_utils
import data_management
import model_training

In [2]:
# Make sure model_data is empty
data_management.dump_data()

Choose model for predictions

In [3]:
# Load model for predictions
curr_model = os.listdir("../models/batch_assistant")[0]
model_path = f'../models/batch_assistant/{curr_model}'
model =  YOLO(model_path)
print(curr_model)

2025-03-30_19-18-53.pt


Directories for retrieving, copying, or moving frames and videos

In [4]:

# Unprocessed frames
frame_dir = '../data/images/batch'

# Resized and annotated frames
processed_dir = "../data/images/processed"

# 1920x1080 original copies
original_dir = "../data/images/frames_original"

# Images to augment later
augmenting_dir = "../data/images/for_augmenting"

Gather screen information so that annotating data is a smoother experience

In [5]:
window_width, window_height, window_x, window_y = screen_info.get_screen_info()

Variables

In [6]:
# Exit labelling variable
exit = False

# Standardized image width/height
load_dotenv()
IMAGE_WIDTH, IMAGE_HEIGHT = 768, 448


# Color mapping based on key presses
color_mapping = {
    ord("1"): (0, 255, 0),   # Green
    ord("2"): (0, 0, 255),   # Red
    ord("3"): (0, 255, 255), # Yellow
}

# Label mapping based on colors
label_mapping = {
    (0, 255, 0): "green_light",
    (0, 0, 255): "red_light",
    (0, 255, 255): "yellow_light"
}

# Class mapping based on label (for YOLO format that uses int instead of str)
class_mapping = {
    "green_light": 1,
    "red_light": 2,
    "yellow_light": 3
}

dataset_size = data_management.get_latest_dataset_size()

if dataset_size is None:
    dataset_size = int(max(len(os.listdir(processed_dir), 1)))

print(dataset_size)

40512


In [7]:
import torch
import os
import torchvision.transforms as transforms
from torchvision import models 
from PIL import Image
import numpy as np
from dotenv import load_dotenv

#### Connect to Pinecone

In [8]:
from pinecone import Pinecone, ServerlessSpec
import pinecone
load_dotenv()
pc = Pinecone(api_key=os.getenv("PC_API_KEY"))
index_name = 'rlr-embeddings'
index = pc.Index(index_name)

  from .autonotebook import tqdm as notebook_tqdm


##### View ResNet architecture
Take note of the final layer, we will remove the final output layer because the layer prior will act as our embeddings layer <br>
`(fc): Linear(in_features=2048, out_features=1000, bias=True) `

In [9]:
embedding_model = models.resnet50(pretrained=True)
embedding_model = torch.nn.Sequential(*list(embedding_model.children())[:-1])
embedding_model.eval()



Sequential(
  (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU(inplace=True)
  (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (4): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)


#### Define our transformation function

In [10]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),    # ResNet 50 expects image sizes of 224x224
    transforms.ToTensor(),            # converts PIL image / NumPy array to tensor
    transforms.Normalize(             
        mean = [0.485, 0.456, 0.406], # mean for each channel (RGB)
        std = [0.229, 0.224, 0.225]   # std for each channel
    )
])

### Main Code
1. Helper functions for drawing bboxes
2. Iterate over each frame:
    - Use current assistant to make predictions
    - Pop-up window for removing predictions / drawing new bboxes
    - When new dataset exceeds previous model's dataset by 10% train new assitant

In [11]:
# Draws annotations onto copy of resized image then resets the cv2 frame (img_copy)
def redraw_bbox(annotations):
    global img, img_copy
    img = resized_img.copy()  # reset to the resized image
    for ann in annotations:
        cv2.rectangle(img, (int(ann["x1"]), int(ann["y1"])), (int(ann["x2"]), int(ann["y2"])), ann["color_code"], int(ann['thickness']))
    img_copy = img.copy()

# Based on mouse events, draws/deletes/edits BBoxes
def edit_bbox(event, x, y, flags, params):

    # right click to select the box the cursor is inside of
    if event == cv2.EVENT_RBUTTONDOWN:
        clickedBox = False
        for i, ann in enumerate(annotations):
            if x > min(ann['x1'], ann['x2']) and x < max(ann['x1'], ann['x2']) and y > min(ann['y1'], ann['y2']) and y < max(ann['y1'], ann['y2']):
                print(annotations[i])
                ann["thickness"] = 2
                redraw_bbox(annotations)
                clickedBox = True

        if not clickedBox:
            annotation_utils.reset_selection(annotations)
            redraw_bbox(annotations)

    global ix, iy, drawing, img_copy, img, current_color

    # Left clicking starts a drawing event if the user is not currently drawing
    if event == cv2.EVENT_LBUTTONDOWN and not drawing:  
        drawing = True # event status is drawing
        ix, iy = x, y # anchor point for the first corner of the rectangle
        img_copy = img.copy()  # reset copy when starting a new rectangle

    # When the cursor is moving and we are in drawing status display adjusted size of rectangle based on cursor location
    elif event == cv2.EVENT_MOUSEMOVE and drawing:  
        img_copy = img.copy()  # reset to avoid multiple overlapping rectangles
        cv2.rectangle(img_copy, (ix, iy), (x, y), current_color, 1) # drawing rectangle from ix, iy to current cursor position

    # Left click when we are already drawing places the rectangle where the cursor is located during the click
    elif event == cv2.EVENT_LBUTTONDOWN and drawing:  
        drawing = False # reset event status to not drawing
        cv2.rectangle(img, (ix, iy), (x, y), current_color, 1)  # draw on final image
        annotations.append({
            "x1": min(ix, x),
            "x2": max(ix, x),
            "y1": min(iy, y),
            "y2": max(iy, y),
            "color_code": current_color,
            "color": label_mapping[current_color],
            "class": class_mapping[label_mapping[current_color]],
            "thickness": 1
        }) # appends a map of values needed for documentation min/max x and y coordinates, color codes, colors, and class
        
        redraw_bbox(annotations)  # for view consistency



# Iterate over each file in the frame dir
frames = os.listdir(frame_dir)
for frame in frames:
    # open current image
    image_path = os.path.join(frame_dir, frame)
    img = Image.open(image_path).convert('RGB')

    img_tensor = transform(img).unsqueeze(0) # add transformations from cell above

    # torch.no_grad does not calculate the gradients to reduce memory usage / increase speed
    with torch.no_grad():
        embedding = embedding_model(img_tensor).squeeze(-1).squeeze(-1) # calculate embedding, remove the last two dimensions of the tensor

    embedding = embedding / torch.norm(embedding, p=2) #L2 Normalization
    embedding = embedding.numpy().tolist()[0]          # convert to 1 dimensional list

    frame = frame

    model_name = curr_model.split('.')[0]

    # List to store dicts of annotations
    annotations = []
    if exit:
        break

    file_path = os.path.join(frame_dir, frame)

    if frame.lower().endswith(('.png', '.jpg', '.jpeg')):

        # Prepare annotation file
        img_filename = os.path.basename(file_path)
        print(img_filename)
        print(os.path.splitext(img_filename)[0])
        text_filename = os.path.splitext(img_filename)[0] + ".txt"

        # Viewing annotation file
        viewing_annotation_path = f"../data/labels/viewing/viewing_{text_filename}"
        # Formatted annotation file
        yolo_annotations_path = f'../data/labels/formatted/{text_filename}'


        # Load image
        original_img = cv2.imread(file_path)  # keep original image
        if original_img is None:
            raise FileNotFoundError("Image not found. Check the file path.")
        
        # Resize the image to a uniform size
        resized_img = cv2.resize(original_img, (IMAGE_WIDTH, IMAGE_HEIGHT))

        # Initialize working images
        img = resized_img.copy()   # Active drawing image
        img_copy = img.copy()      # Image copy for real-time updates
        result = model(resized_img, verbose=False)[0]

        # tracking variables
        pred_red = 0
        pred_yellow = 0
        pred_green = 0

        # add predictions to annotations
        for ann in result.boxes.data.tolist():
            x1, y1, x2, y2, score, class_id = ann
            annotations.append({
                "x1": int(min(x1, x2)),
                "x2": int(max(x1, x2)),
                "y1": int(min(y1, y2)),
                "y2": int(max(y1, y2)),
                "color_code": color_mapping[ord(str(int(class_id)))],
                "color": result.names[int(class_id)],
                "class": int(class_id),
                "thickness": 1
                }  
            )
            if result.names[int(class_id)] == 'red_light':
                pred_red += 1
            elif result.names[int(class_id)] == 'yellow_light':
                pred_yellow += 1
            elif result.names[int(class_id)] == 'green_light':
                pred_green += 1
            # draw BBox
            cv2.rectangle(img_copy, (int(x1), int(y1)), (int(x2), int(y2)), color_mapping[ord(str(int(class_id)))], 1)

        annotation_utils.update_annotation_data(model_name, img_filename, 'pred_red_light', pred_red)
        annotation_utils.update_annotation_data(model_name, img_filename, 'pred_yellow_light', pred_yellow)
        annotation_utils.update_annotation_data(model_name, img_filename, 'pred_green_light', pred_green)
        # Anchor variables
        ix, iy = -1, -1
        drawing = False
        current_color = (0, 255, 0)  # Default: Green



        # Create window and set mouse callback
        window_name = f"Label Data: {frame}"
        cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)  # Allow resizing
        cv2.resizeWindow(window_name, window_width, window_height)  # Set to 95% of screen size
        cv2.moveWindow(window_name, window_x, window_y)  # Center it on the second monitor
        cv2.setMouseCallback(window_name, edit_bbox)

        # Display loop
        while True:
            cv2.imshow(window_name, img_copy)  # Show dynamic updates
            key = cv2.waitKey(10) & 0xFF
            
            # Press 'Esc' to exit
            if key == 27:
                exit = True
                break
            
            # Press 's' to save annotations
            elif key == ord("s"):
                index.upsert(vectors=[(frame, embedding)])
                print(f"Uploaded embedding for {image_path} with ID: {frame}")
                # Open viewing text file, iterate over annotations and write to file
                with open(viewing_annotation_path, 'w') as viewing_file:
                    for annotation in annotations:
                        viewing_file.write(f"{annotation}\n")
                print(f"Viewing annotations saved to {viewing_annotation_path}")
                
                # Convert annotations to yolo format
                yolo_annotations = annotation_utils.viewing_to_yolo(annotations, IMAGE_WIDTH, IMAGE_HEIGHT)

                # Open yolo text file, iterate over annotations and write to file
                with open(yolo_annotations_path, 'w') as yolo_file:
                    for yolo_ann in yolo_annotations:
                        yolo_str = " ".join(map(str, yolo_ann))  # Convert each item to string and join with commas
                        yolo_file.write(f"{yolo_str}\n")  # Write formatted string to file
                print(f"YOLO annotations saved to {yolo_annotations_path}")


                # Open processed_dir and write resized image to the directory
                os.makedirs(processed_dir, exist_ok=True)
                processed_path = os.path.join(processed_dir, os.path.basename(file_path))
                cv2.imwrite(processed_path, resized_img)
                print(f"Moved {frame} -> {processed_path}")

                os.makedirs(original_dir, exist_ok=True)
                original_path = os.path.join(original_dir, os.path.basename(file_path))
                shutil.move(file_path, original_path)

                annotation_utils.update_annotation_data(model_name, img_filename, 'total_annotations', len(annotations))

                red_count = 0
                yellow_count = 0
                green_count = 0
                
                for ann in annotations:
                    if ann['color'] == 'red_light':
                        red_count += 1
                    elif ann['color'] == 'yellow_light':
                        yellow_count += 1
                    elif ann['color'] == 'green_light':
                        green_count += 1

                annotation_utils.update_annotation_data(model_name, img_filename, 'red_light', red_count)
                annotation_utils.update_annotation_data(model_name, img_filename, 'rmv_red_light', 0)
                annotation_utils.update_annotation_data(model_name, img_filename, 'yellow_light', yellow_count)
                annotation_utils.update_annotation_data(model_name, img_filename, 'rmv_yellow_light', 0)
                annotation_utils.update_annotation_data(model_name, img_filename, 'green_light', green_count)
                annotation_utils.update_annotation_data(model_name, img_filename, 'rmv_green_light', 0)
                break

                
            # Change rectangle color based on number key
            elif key in color_mapping:  
                current_color = color_mapping[key]
                print(f"Class changed to: {label_mapping[color_mapping[key]]}")
            
            # Press 'r' to remove bbox drawn by model assistant
            elif key == ord("r"):
                annotation_utils.remove_bbox(annotations, model_name, img_filename)
                redraw_bbox(annotations)
                
            # Press 'z' to undo last rectangle
            elif key == ord("z") and annotations: 
                annotations.pop()  # remove last rectangle
                redraw_bbox(annotations)  # reset image and redraw remaining rectangles
                print("Last rectangle removed!")

        cv2.destroyAllWindows()

        # when the dataset has increased by 10% from the previous training
        if len(os.listdir(processed_dir)) >= dataset_size * 1.2:

            date_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")      # unique identifier for model
            model_training.train_assistant_model(date_time)               # splits dataset and trains new model
            data_management.dump_data()                                   # places files back in original dir
            data_management.track_dataset(date_time, processed_dir)       # updates meta data file
            dataset_size = data_management.get_latest_dataset_size()      # update dataset size

            # update new model as current assistant
            curr_model = os.listdir("../models/current_assistant")[0]
            model_path = f'../models/current_assistant/{curr_model}'
            model =  YOLO(model_path)

cv2.destroyAllWindows()

20250222_161041M_001000.jpg
20250222_161041M_001000
Uploaded embedding for ../data/images/batch\20250222_161041M_001000.jpg with ID: 20250222_161041M_001000.jpg
Viewing annotations saved to ../data/labels/viewing/viewing_20250222_161041M_001000.txt
YOLO annotations saved to ../data/labels/formatted/20250222_161041M_001000.txt
Moved 20250222_161041M_001000.jpg -> ../data/images/processed\20250222_161041M_001000.jpg
20250222_161041M_001001.jpg
20250222_161041M_001001
Uploaded embedding for ../data/images/batch\20250222_161041M_001001.jpg with ID: 20250222_161041M_001001.jpg
Viewing annotations saved to ../data/labels/viewing/viewing_20250222_161041M_001001.txt
YOLO annotations saved to ../data/labels/formatted/20250222_161041M_001001.txt
Moved 20250222_161041M_001001.jpg -> ../data/images/processed\20250222_161041M_001001.jpg
20250222_161041M_001002.jpg
20250222_161041M_001002
Uploaded embedding for ../data/images/batch\20250222_161041M_001002.jpg with ID: 20250222_161041M_001002.jpg
Vie

In [3]:
import os
import shutil
import cv2
from ultralytics import YOLO
import sys
import json
sys.path.append(os.path.abspath('../utils'))
import data_management
import annotation_utils
import model_training

data_management.dump_data()

imgs = os.listdir('../data/images/batch')

for img in imgs:
    shutil.move(os.path.join('../data/images/batch', img), os.path.join('../data/images/frames'), img)

### Comparing Model Performance
The section below will test a models performance on a `batch` of data. By doing so we can see how much the predictive assistant benefited from selective training vs our other predictive models<br>
Our ISTD model is `2025-03-30_19-18-53.pt` and will be compared to `2025-03-27_17-25-00.pt` on the batch **001** 

In [None]:
import os
import shutil
import cv2
from ultralytics import YOLO
import sys
import json
sys.path.append(os.path.abspath('../utils'))
import data_management
import annotation_utils
import model_training

#### Load batch dataset

In [None]:
BATCH_LOG_PATH = '../batch_log.json'

with open(BATCH_LOG_PATH, 'r') as f:
    data = json.load(f)

batch_files = None
for obj in data:
    if obj['batch_id'] == '002':
        batch_files = obj['batch_images']



##### Select models

In [None]:
istd_model_path = '../models/batch_assistant/2025-03-31_16-23-12.pt'

control_model = '../models/current_assistant/2025-03-27_17-25-00.pt'

In [None]:
label_dir = '../data/labels/formatted'
img_dir = '../data/images/processed'

istd_results = model_training.test_model(istd_model_path, batch_files, img_dir, label_dir)
istd_tp, istd_fp, istd_fn = istd_results['TP'], istd_results['FP'], istd_results['FN']
control_results = model_training.test_model(control_model, batch_files, img_dir, label_dir)
control_tp, control_fp, control_fn = control_results['TP'], control_results['FP'], control_results['FN']

In [None]:
istd_precision = istd_tp / (istd_fp + istd_tp) if  (istd_tp + istd_fp) > 0 else 0
istd_recall = istd_tp / (istd_fn + istd_tp) if (istd_tp + istd_fn) > 0 else 0 
istd_f1_score = 2 * (istd_precision * istd_recall) / (istd_precision + istd_recall) if (istd_precision + istd_recall) > 0 else 0 

control_precision = control_tp / (control_fp + control_tp) if  (control_tp + control_fp) > 0 else 0
control_recall = control_tp / (control_fn + control_tp) if (control_tp + control_fn) > 0 else 0 
control_f1_score = 2 * (control_precision * control_recall) / (control_precision + control_recall) if (control_precision + control_recall) > 0 else 0 

print("Precision")
print(f"ISTD: {100*istd_precision:03f} | Control {100*control_precision:03f}")
print("Recall")
print(f"ISTD: {100*istd_recall:03f} | Control {100*control_recall:03f}")
print("F1")
print(f"ISTD: {100*istd_f1_score:03f} | Control {100*control_f1_score:03f}")

In [None]:
import video_processing

video_processing.predict_video('../data/videos/processed/20250222_154541M.mp4', '../images/result_videos/testing_batch_002.mp4', istd_model_path)

In [None]:
video_processing.predict_video('../data/videos/processed/20250222_154541M.mp4', '../images/result_videos/testing_batch_002_control.mp4', control_model)