# Import

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import re
import sys
import json
import pickle
import numpy as np

from PIL import Image
import matplotlib.pyplot as plt
from torchvision import transforms
from torchvision.transforms import functional as F

import torch



In [None]:
# Update with your CholecT50 path location
cholec_dir = "/content/drive/My Drive/Master Thesis/CholecT50"
save_dir = "path/to/save"

video_dir = cholec_dir + "/videos"
json_annotations = cholec_dir + "/labels/VID01.json"
labels_dir = cholec_dir + "/labels"


# Frame Dateset Creation

This section covers the necessary steps for the frame datasets, which will be used for the frame-level models, i.e Obect detection and Frame caption generation. Due to sorage capacity reasons, the 50 videos are treated and stored by group of 10, leading to 5 processed frame datasets.

## Functions

In [None]:
#Mapping dictionaries
instrument_mapping = {0: "grasper", 1: "bipolar", 2: "hook", 3: "scissors", 4: "clipper", 5: "irrigator", 6: "specimen_bag", 7: "no_instrument"}
verb_mapping = {0: "grasp", 1: "retract", 2: "dissect", 3: "coagulate", 4: "clip", 5: "cut", 6: "aspirate", 7: "irrigate", 8: "pack", 9: "null_verb"}
target_mapping = {0: "gallbladder", 1: "cystic_plate", 2: "cystic_duct", 3: "cystic_artery", 4: "cystic_pedicle", 5: "blood_vessel",
                  6: "fluid", 7: "abdominal_wall_cavity", 8: "liver", 9: "adhesion", 10: "omentum", 11: "peritoneum",
                  12: "gut", 13: "specimen_bag", 14: "null_target"}
phase_mapping = {0: "preparation", 1: "calot-triangle-dissection", 2: "clipping-and-cutting", 3: "gallbladder-dissection",
                 4: "gallbladder-packaging", 5: "cleaning-and-coagulation", 6: "gallbladder-extraction"}


mapping = {0: "grasper", 1: "bipolar", 2: "hook", 3: "scissors", 4: "clipper", 5: "irrigator", 6: "specimen_bag", 7: "gallbladder",
           8: "cystic_plate", 9: "cystic_duct", 10: "cystic_artery", 11: "cystic_pedicle", 12: "blood_vessel", 13: "fluid",
           14: "abdominal_wall_cavity", 15: "liver", 16: "adhesion", 17: "omentum", 18: "peritoneum", 19: "gut", 20: "specimen_bag", 21: "null_target"}
reverse_mapping = {v: k for k, v in mapping.items()}

In [None]:
# Turn Json vectors into binary vector of size 21 for instrument/target presence
def decode_vector(vectors):
  result = np.zeros(21)
  for vector in vectors:
    idx = int(vector[0])
    if idx == -1:
      return result
    with open(json_annotations, 'r') as file:
      data = json.load(file)
    triplets = data['categories']['triplet']
    triplet = triplets[str(idx)]
    instrument, verb, target = triplet.split(',')
    instrument_number = reverse_mapping.get(instrument)
    target_number = reverse_mapping.get(target)

    result[instrument_number] = 1
    if target_number != 21:
      result[target_number] = 1

  return result

# Create the sentence with the annotations from the json
def sentence(annotations):
    n = len(annotations)
    i = 1
    for vector in annotations:
        t = vector[0]
        p = vector[-1]
        if t==-1:
          answer = "Unknown"
          return answer
        with open(json_annotations, 'r') as file:
            data = json.load(file)
        triplets = data['categories']['triplet']
        triplet = triplets[str(t)]
        instrument, verb, target = triplet.split(',')
        phases = data['categories']['phase']
        phase = phases[str(p)]

        print(f"The {instrument} is {verb}ing the {target}", end="")

        if i < n:
            print(", and ", end="")
        if i == n:
            print(f" during phase {phase}.", end="")
        i += 1
    print()


def sentence_return(annotations):
    n = len(annotations)
    i = 1
    result = ""

    for vector in annotations:
        t = vector[0]
        p = vector[-1]

        if t == -1:
            answer = "Unknown"
            return answer
        with open(json_annotations, 'r') as file:
            data = json.load(file)
        triplets = data['categories']['triplet']
        triplet = triplets[str(t)]
        instrument, verb, target = triplet.split(',')
        phases = data['categories']['phase']
        phase = phases[str(p)]

        if i == 1:
          result += f" During phase {phase}, "

        if verb != "null_verb":
            if verb[-1] == "e":
              verb = verb[:-1]
            if instrument == "scissors":
                result += f"the {instrument} are {verb}ing the {target}"
            else:
              result += f"the {instrument} is {verb}ing the {target}"
        else:
            result += f"the {instrument} is present"

        if i < n:
            result += ", "
        i += 1

    return result


def get_objects(annotations,json_annotations):
    n = len(annotations)
    i = 1

    objects = []
    for vector in annotations:
        t = vector[0]
        p = vector[-1]

        if t==-1:
          answer = "Unknown"
          return answer
        with open(json_annotations, 'r') as file:
            data = json.load(file)
        triplets = data['categories']['triplet']
        triplet = triplets[str(t)]
        instrument, verb, target = triplet.split(',')

        if instrument != "null_instrument":
          objects.append(str(instrument))
        if target != "null_target":
          objects.append(str(target))

    return objects

def description(video, frame):
    base_json_path = labels_dir
    video_folder = f"VID{video:02d}"
    json_path = f"{base_json_path}{video_folder}.json"
    try:
        with open(json_path, 'r') as file:
            data = json.load(file)
        frame_annotations = data["annotations"].get(str(frame), "No annotations found for this frame.")
    except FileNotFoundError:
        print("Not a Video")
        return None
    return sentence_return(frame_annotations)



In [None]:
def get_objects(annotations,json_annotations):
    n = len(annotations)
    i = 1

    objects = []
    for vector in annotations:
        t = vector[0]
        p = vector[-1]

        if t==-1:
          answer = "Unknown"
          return answer
        with open(json_annotations, 'r') as file:
            data = json.load(file)
        triplets = data['categories']['triplet']
        triplet = triplets[str(t)]
        instrument, verb, target = triplet.split(',')

        if instrument != "null_instrument":
          objects.append(str(instrument))
        if target != "null_target":
          objects.append(str(target))

    return objects


def get_frame_caption(annotations, json_annotations):
    n = len(annotations)
    i = 1
    result = ""

    for vector in annotations:
        t = vector[0]
        p = vector[-1]

        if t == -1:
            answer = "Unknown"
            return answer
        with open(json_annotations, 'r') as file:
            data = json.load(file)
        triplets = data['categories']['triplet']
        triplet = triplets[str(t)]
        instrument, verb, target = triplet.split(',')
        phases = data['categories']['phase']
        phase = phases[str(p)]

        if i == 1:
          result += f" During phase {phase}, "

        if verb != "null_verb":
            if verb[-1] == "e":
              verb = verb[:-1]
            if instrument == "scissors":
                result += f"the {instrument} are {verb}ing the {target}"
            else:
              result += f"the {instrument} is {verb}ing the {target}"
        else:
            result += f"the {instrument} is present"

        if i < n:
            result += ", "
        i += 1

    return result

def preprocess_frame(frame_path, target_size=(224, 224)):
  transform = transforms.Compose([
        transforms.Resize(target_size),
        transforms.ToTensor(),
  ])

  image = Image.open(frame_path).convert('RGB')
  tensor = transform(image)

  return tensor



def annotation_to_label(annotations, json_dir):
  labels = np.zeros(21)
  with open(json_dir, 'r') as file:
      data = json.load(file)
  for vector in annotations:
    idx = int(vector[0])
    if idx == -1:
      return labels
    triplets = data['categories']['triplet']
    triplet = triplets[str(idx)]
    instrument, verb, target = triplet.split(',')
    instrument_number = reverse_mapping.get(instrument)
    target_number = reverse_mapping.get(target)
    labels[instrument_number] = 1
    if target_number != 21:
      labels[target_number] = 1
  return labels





## Create datasets

In [None]:
def create_dataset(video_dir, labels_dir, json_annotations, start_video=None, end_video=None):


    dataset = []

    all_video_folders = sorted(os.listdir(video_dir))
    if start_video is not None or end_video is not None:
        all_video_folders = all_video_folders[start_video:end_video]

    for video_folder in all_video_folders:
        print(video_folder)
        video_path = os.path.join(video_dir, video_folder)

        if not os.path.isdir(video_path):
            continue

        frames = sorted(os.listdir(video_path))
        frames = [f for f in frames if f.endswith(('.png', '.jpg', '.jpeg'))]
        processed_frames = set()

        idx = 0
        while idx < len(frames):
            frame_name = frames[idx]
            # Skip duplicate frames
            if frame_name in processed_frames:
                idx += 1
                continue

            processed_frames.add(frame_name)
            match = re.match(r'^(\d+)', os.path.splitext(frame_name)[0])
            if not match:
                print(f"Skipping invalid frame name: {frame_name}")
                idx += 1
                continue

            frame_number = int(match.group(1))

            frame_path = os.path.join(video_path, frame_name)
            video_id = os.path.basename(os.path.dirname(frame_path))
            annotation_file = os.path.join(labels_dir, f"{video_id}.json")

            with open(annotation_file, "r") as f:
                annotations = json.load(f)

            frame_key = str(frame_number)
            frame_annotation = annotations["annotations"].get(frame_key, None)
            frame_name = frame_name.replace(".png", "")

            objects = get_objects(frame_annotation, json_annotations)
            frame_caption = get_frame_caption(frame_annotation, json_annotations)
            frame = preprocess_frame(frame_path)
            object_labels = annotation_to_label(frame_annotation, json_annotations)

            if frame_caption != "Unknown":
                dataset.append({
                    "video": video_folder,
                    "frame_number": frame_name,
                    "frame": frame,
                    "object_labels": object_labels,
                    "objects": objects,
                    "frame_caption": frame_caption
                })

            idx += 1

    return dataset


In [None]:
start = 40
end = 50


dataset = create_dataset(video_dir, labels_dir, json_annotations, start, end)
torch.save(dataset, f"{save_dir}/Datasets/frame_dataset_{start}_{end-1}.pt")

VID68
VID70
VID73
VID74
VID75
VID78
VID79
VID80
VID92
VID96


# Clip Dataset Creation

creating and preprocessing the dataset necessary for the clip caption generation model. Same as before, the videos are treated 10 by 10.

## Functions

In [None]:

def extract_phase_and_actions(captions):
    results = []
    prev_phase = None
    grouped_actions = []
    time_count = 0

    for caption in captions:
        phase_match = re.search(r'during phase ([a-zA-Z\-]+)', caption, re.IGNORECASE)
        phase = phase_match.group(1) if phase_match else prev_phase

        if phase is None:
            continue

        actions_part = re.sub(r'during phase [a-zA-Z\-]+,? ', '', caption, flags=re.IGNORECASE)
        actions = [action.strip() for action in actions_part.split(',') if action.strip() and action.lower() != "unknown"]

        if phase == prev_phase:
            grouped_actions = list(set(grouped_actions + actions))
            time_count += 1
            results[-1]["actions"] = grouped_actions
            results[-1]["time"] = time_count
        else:
            grouped_actions = actions
            time_count = 1
            results.append({"phase": phase, "actions": grouped_actions, "time": time_count})

        prev_phase = phase

    return create_sentence(results)


def create_sentence(results):
    if not results:
        return ""

    sentence = ""
    phase_count = len(results)
    connectors = ["First", "Then", "Then", "Then", "Finally"]

    for i, entry in enumerate(results):
        phase = entry["phase"]
        time = entry["time"]
        actions = entry["actions"]

        if len(actions) == 1:
            actions_text = actions[0]
        elif len(actions) == 2:
            actions_text = f"{actions[0]} while {actions[1]}"
        else:
            actions_text = f"{', '.join(actions[:-1])} and {actions[-1]}"

        if phase_count == 1:
            sentence += f"During the phase of {phase} lasting {time} seconds, {actions_text}."
        else:
            connector = connectors[min(i, len(connectors) - 1)]
            sentence += f" {connector}, during the phase of {phase} lasting {time} seconds, {actions_text}."

    return sentence.strip()



# Constructs the frame caption from json annotations
def sentence_return(annotations):
    n = len(annotations)
    i = 1
    result = ""

    for vector in annotations:
        t = vector[0]
        p = vector[-1]

        if t == -1:
            answer = "Unknown"
            return answer
        with open(json_annotations, 'r') as file:
            data = json.load(file)
        triplets = data['categories']['triplet']
        triplet = triplets[str(t)]
        instrument, verb, target = triplet.split(',')
        phases = data['categories']['phase']
        phase = phases[str(p)]

        if i == 1:
          result += f" During phase {phase}, "

        if verb != "null_verb":
            if verb[-1] == "e":
              verb = verb[:-1]
            if instrument == "scissors":
                result += f"the {instrument} are {verb}ing the {target}"
            else:
              result += f"the {instrument} is {verb}ing the {target}"
        else:
            result += f"the {instrument} is present"

        if i < n:
            result += ", "
        i += 1

    return result




def get_frame_captions(frame_paths):
    captions = []
    for frame_path in frame_paths:
        video_id = os.path.basename(os.path.dirname(frame_path))
        frame_number = os.path.splitext(os.path.basename(frame_path))
        frame_number = int(frame_number[0][:6])

        annotation_file = os.path.join(labels_dir, f"{video_id}.json")

        with open(annotation_file, "r") as f:
            annotations = json.load(f)

        frame_key = str(frame_number)
        frame_annotation = annotations["annotations"].get(frame_key, None)
        caption = sentence_return(frame_annotation)
        captions.append(caption)
    return captions


# Reshape Frames
def load_and_preprocess_frames(frame_paths, target_size=(224, 224)):
    transform = transforms.Compose([
        transforms.Resize(target_size),
        transforms.ToTensor(),
    ])

    frames = []
    for path in frame_paths:
        image = Image.open(path).convert('RGB')
        tensor = transform(image).to(device)
        frames.append(tensor)

    video_tensor = torch.stack(frames).to(device)
    return video_tensor


## Create

In [None]:




def create_dataset(device, video_dir, clip_length, overlap, start_idx, end_idx):
    dataset = []

    video_folders = sorted([vf for vf in os.listdir(video_dir) if os.path.isdir(os.path.join(video_dir, vf))])
    selected_videos = video_folders[start_idx:end_idx + 1]

    for video_folder in selected_videos:
        print(f"Processing: {video_folder}")
        video_path = os.path.join(video_dir, video_folder)

        frames = sorted([f for f in os.listdir(video_path) if f.endswith(('.png', '.jpg', '.jpeg'))])
        start_frame_idx = 0

        while start_frame_idx + clip_length <= len(frames):
            frame_paths = [
                os.path.join(video_path, frames[i])
                for i in range(start_frame_idx, start_frame_idx + clip_length)
            ]

            frame_numbers = [path.split('/')[-1].split('.')[0] for path in frame_paths]
            clip = load_and_preprocess_frames(frame_paths).to(device)
            frame_captions = get_frame_captions(frame_paths)
            clip_caption = extract_phase_and_actions(frame_captions)

            dataset.append({
                "video": video_folder,
                "frame_numbers": frame_numbers,
                "clip": clip,
                "frame_captions": frame_captions,
                "clip_caption": clip_caption
            })

            start_frame_idx += clip_length - overlap

    return dataset


In [None]:
clip_length = 32
overlap = 16

start = 40
end = 49

dataset = create_dataset(device,video_dir, clip_length, overlap, start, end)
torch.save(dataset, f"{save_dir}/clip_dataset_{start}_{end}.pt")
print("Dataset saved!")

Processing: VID68
Processing: VID70
Processing: VID73
Processing: VID74
Processing: VID75
Processing: VID78
Processing: VID79
Processing: VID80
Processing: VID92
Processing: VID96
Dataset saved!
