In [1]:
import json
import copy 

def group_overlapping(keysteps):
    """
    Given a list of keystep dicts (each with 'start_t' and 'end_t'),
    group overlapping keysteps into clusters. For each cluster with more than one
    keystep, keep the keystep with the longest duration as primary and return the rest as secondary.
    Keysteps that do not overlap with any others are returned as primary.
    """
    if len(keysteps) <= 1:
        return keysteps, []

    # Sort keysteps by start time
    sorted_keysteps = sorted(keysteps, key=lambda k: k['start_t'])
    clusters = []
    current_cluster = [sorted_keysteps[0]]
    current_cluster_end = sorted_keysteps[0]['end_t']

    for step in sorted_keysteps[1:]:
        # Check if current keystep overlaps with current cluster.
        # Overlap if the start time is less than current cluster's max end time.
        if step['start_t'] < current_cluster_end:
            current_cluster.append(step)
            current_cluster_end = max(current_cluster_end, step['end_t'])
        else:
            clusters.append(current_cluster)
            current_cluster = [step]
            current_cluster_end = step['end_t']
    if current_cluster:
        clusters.append(current_cluster)

    primary = []
    secondary = []
    for cluster in clusters:
        if len(cluster) == 1:
            primary.extend(cluster)
        else:
            # In a cluster of overlapping keysteps, choose the one with maximum duration as primary.
            durations = [k['end_t'] - k['start_t'] for k in cluster]
            max_index = durations.index(max(durations))
            for i, k in enumerate(cluster):
                if i == max_index:
                    primary.append(k)
                else:
                    secondary.append(k)
    return primary, secondary

def process_annotation(input_file, output_file):
    with open(input_file, 'r') as f:
        data = json.load(f)

    # Dictionary to hold new secondary subjects (keyed by new subject id)
    secondary_subjects = {}

    # Process each subject in the annotation
    for subject in data.get("subjects", []):
        subject_id = subject["subject_id"]
        # Iterate over a copy of trials list so we can modify in place
        for trial in subject.get("trials", []):
            keysteps = trial.get("keysteps", [])
            # Only process trials with more than one keystep (possible overlap)
            if len(keysteps) > 1:
                primary_keysteps, secondary_keysteps = group_overlapping(keysteps)
                # Update the current trial with primary keysteps only
                trial["keysteps"] = primary_keysteps

                if secondary_keysteps:
                    # Create or update the secondary subject for this original subject
                    secondary_subject_id = f"{subject_id}-secondary"
                    if secondary_subject_id not in secondary_subjects:
                        secondary_subjects[secondary_subject_id] = {
                            "subject_id": secondary_subject_id,
                            "trials": []
                        }
                    # Make a deep copy of the trial for the secondary subject
                    trial_secondary = copy.deepcopy(trial)
                    # Replace keysteps with the secondary ones only
                    trial_secondary["keysteps"] = secondary_keysteps
                    secondary_subjects[secondary_subject_id]["trials"].append(trial_secondary)

    # Append the secondary subjects (if any) to the subjects list
    for sec_sub in secondary_subjects.values():
        data["subjects"].append(sec_sub)

    with open(output_file, 'w') as f:
        json.dump(data, f, indent=4)
    print(f"Modified annotation saved to {output_file}")



In [1]:
# Example usage:

input_filename = "../../Annotations/main_annotation_classification.json"
output_filename = "multiperson_annotation.json"


In [None]:
process_annotation(input_filename, output_filename)


## Verify annotation multiperson

In [4]:
import cv2
import json
import sys

def load_annotations(annotation_file, primary_subject_id, trial_id):
    """
    Loads the keystep annotations for a given trial.
    Returns a tuple (primary_annotations, secondary_annotations) where:
      - primary_annotations come from the subject with subject_id == primary_subject_id
      - secondary_annotations come from the subject with subject_id == f"{primary_subject_id}-secondary"
    """
    with open(annotation_file, 'r') as f:
        data = json.load(f)
    
    primary_annotations = []
    secondary_annotations = []
    
    for subject in data.get("subjects", []):
        sid = subject.get("subject_id", "")
        for trial in subject.get("trials", []):
            if trial.get("trial_id") == trial_id:
                if sid == primary_subject_id:
                    primary_annotations = trial.get("keysteps", [])
                elif sid == f"{primary_subject_id}-secondary":
                    secondary_annotations = trial.get("keysteps", [])
    return primary_annotations, secondary_annotations

def find_trial(data, subject_id, trial_id):
    """
    Returns the trial dict for a given subject_id and trial_id.
    """
    for subject in data.get("subjects", []):
        if subject.get("subject_id") == subject_id:
            for trial in subject.get("trials", []):
                if trial.get("trial_id") == trial_id:
                    return trial
    return None

def get_video_file_from_trial(trial, preferred_extensions=['.mp4']):
    """
    Iterates over the streams in a trial and returns the file_path of the first one
    whose filename ends with one of the preferred extensions.
    """
    clip_ego_stream = trial.get("streams", {}).get("egocam_rgb_audio", {})
    file_path = clip_ego_stream.get("file_path", "")
    if any(file_path.lower().endswith(ext) for ext in preferred_extensions):
        return file_path
    
    return None

def visualize_annotations(video_path, primary_annotations, secondary_annotations, output_video_path="annotated_video.mp4"):
    """
    Opens the video file, reads it frame by frame, overlays text annotations based on keystep intervals,
    and saves the annotated frames into an output video file.
    Primary annotations are drawn in green and secondary annotations in red.
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error opening video file:", video_path)
        sys.exit(1)
    
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    
    font = cv2.FONT_HERSHEY_SIMPLEX

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Calculate current time (in seconds) based on frame index and FPS
        current_frame = cap.get(cv2.CAP_PROP_POS_FRAMES)
        current_time = current_frame / fps

        # Build overlay texts for active keysteps at current time
        primary_text = ""
        for ann in primary_annotations:
            if ann["start_t"] <= current_time <= ann["end_t"]:
                primary_text += f"{ann['label']} "

        secondary_text = ""
        for ann in secondary_annotations:
            if ann["start_t"] <= current_time <= ann["end_t"]:
                secondary_text += f"{ann['label']} "

        # Overlay texts on the frame:
        if primary_text:
            cv2.putText(frame, "Primary: " + primary_text, (50, 50), font, 1, (0, 255, 0), 2, cv2.LINE_AA)
        if secondary_text:
            cv2.putText(frame, "Secondary: " + secondary_text, (50, 100), font, 1, (0, 0, 255), 2, cv2.LINE_AA)

        # Write the annotated frame to the output video
        out.write(frame)

    cap.release()
    out.release()
    print(f"Annotated video saved to {output_video_path}")



In [5]:
# Specify your annotation file and trial details here.
annotation_file = output_filename  # modified annotation file from previous processing
primary_subject_id = "ms1"  # e.g., original subject id
trial_id = "0"            # trial you wish to verify

# Load the full annotation JSON
with open(annotation_file, 'r') as f:
    data = json.load(f)

# Get the primary trial details to obtain the video file path.
primary_trial = find_trial(data, primary_subject_id, trial_id)
if primary_trial is None:
    print(f"Trial {trial_id} not found for subject {primary_subject_id}.")
    sys.exit(1)

video_path = get_video_file_from_trial(primary_trial, preferred_extensions=[".mp4"])
if video_path is None:
    print("No video file (with .mp4 extension) found in the trial streams.")
    sys.exit(1)

# Load keystep annotations from both primary and secondary subjects.
primary_annotations, secondary_annotations = load_annotations(annotation_file, primary_subject_id, trial_id)

print("Primary Annotations:", primary_annotations)
print("Secondary Annotations:", secondary_annotations)
print("Using video file:", video_path)

visualize_annotations(video_path, primary_annotations, secondary_annotations)

Primary Annotations: [{'keystep_id': '5_FuqYvMnm', 'start_t': 0, 'end_t': 6.60863, 'label': 'approach_patient', 'class_id': 0}, {'keystep_id': '5_1JZ1Tkf5', 'start_t': 6.609, 'end_t': 11.41892, 'label': 'check_responsiveness', 'class_id': 1}, {'keystep_id': '5_2FerCyqc', 'start_t': 15.773, 'end_t': 83.70723, 'label': 'chest_compressions', 'class_id': 4}]
Secondary Annotations: [{'keystep_id': '5_u2JcKI3V', 'start_t': 11.419, 'end_t': 15.77309, 'label': 'check_pulse', 'class_id': 2}, {'keystep_id': '5_fRl1lC6p', 'start_t': 13.395, 'end_t': 15.77309, 'label': 'check_breathing', 'class_id': 3}, {'keystep_id': '5_dfb1x5md', 'start_t': 19.50903, 'end_t': 22.50903, 'label': 'request_assistance', 'class_id': 6}, {'keystep_id': '5_wli4oXCE', 'start_t': 22.509, 'end_t': 25.11386, 'label': 'request_aed', 'class_id': 5}, {'keystep_id': '5_e4hWg5c0', 'start_t': 30.50903, 'end_t': 33.59237, 'label': 'turn_on_aed', 'class_id': 7}, {'keystep_id': '5_xXQP77f7', 'start_t': 33.884, 'end_t': 44.13068, 'l

## Annotation Corrector

In [7]:
import tkinter as tk
from tkinter import ttk, messagebox
import cv2
from PIL import Image, ImageTk
import json

class AnnotationEditor:
    def __init__(self, master):
        self.master = master
        self.master.title("Annotation Editor")
        
        # Load annotation file
        self.annotation_file = "./multiperson_annotation.json"
        try:
            with open(self.annotation_file, 'r') as f:
                self.annotation_data = json.load(f)
        except Exception as e:
            messagebox.showerror("Error", f"Failed to load annotation file: {e}")
            master.destroy()
            return
        
        # Build a list of trial options that have a matching secondary subject.
        self.trial_options = self.get_trial_options()
        if not self.trial_options:
            messagebox.showerror("Error", "No trials with secondary subjects found.")
            master.destroy()
            return
        
        # Create UI elements
        top_frame = tk.Frame(master)
        top_frame.pack(pady=5)
        
        tk.Label(top_frame, text="Select Trial (format: PrimarySubjectID:TrialID)").pack(side=tk.LEFT)
        self.selected_trial_var = tk.StringVar(value=self.trial_options[0])
        self.trial_dropdown = ttk.Combobox(top_frame, textvariable=self.selected_trial_var, values=self.trial_options, state="readonly")
        self.trial_dropdown.pack(side=tk.LEFT, padx=5)
        self.trial_dropdown.bind("<<ComboboxSelected>>", self.on_trial_selected)
        
        # Video display area
        self.video_label = tk.Label(master)
        self.video_label.pack()
        
        # Active annotations listbox
        listbox_frame = tk.Frame(master)
        listbox_frame.pack(pady=5)
        tk.Label(listbox_frame, text="Active Annotations (click to select then switch):").pack()
        self.annotation_listbox = tk.Listbox(listbox_frame, width=60, height=6)
        self.annotation_listbox.pack()
        
        # Control buttons
        control_frame = tk.Frame(master)
        control_frame.pack(pady=5)
        self.switch_button = tk.Button(control_frame, text="Switch Annotation", command=self.switch_annotation)
        self.switch_button.pack(side=tk.LEFT, padx=5)
        self.save_button = tk.Button(control_frame, text="Save Annotations", command=self.save_annotations)
        self.save_button.pack(side=tk.LEFT, padx=5)
        
        # Video playback variables
        self.cap = None
        self.fps = 0
        self.video_running = False
        
        # Load the first trial by default
        self.load_selected_trial()
    
    def get_trial_options(self):
        """
        Returns a list of strings "PrimarySubjectID:TrialID" for which there exists
        both a primary subject (without "-secondary") and its corresponding secondary subject.
        """
        options = []
        primary_ids = {subj["subject_id"] for subj in self.annotation_data.get("subjects", []) if not subj["subject_id"].endswith("-secondary")}
        for pid in primary_ids:
            secondary_id = pid + "-secondary"
            primary_trials = {trial["trial_id"] for trial in self.get_trials_for_subject(pid)}
            secondary_trials = {trial["trial_id"] for trial in self.get_trials_for_subject(secondary_id)}
            common = primary_trials.intersection(secondary_trials)
            for trial_id in common:
                options.append(f"{pid}:{trial_id}")
        return options
    
    def get_trials_for_subject(self, subject_id):
        for subj in self.annotation_data.get("subjects", []):
            if subj.get("subject_id") == subject_id:
                return subj.get("trials", [])
        return []
    
    def on_trial_selected(self, event=None):
        self.load_selected_trial()
    
    def load_selected_trial(self):
        # Stop any running video playback.
        self.video_running = False
        if self.cap is not None:
            self.cap.release()
            self.cap = None
        
        selection = self.selected_trial_var.get()
        if not selection:
            return
        # Expect selection in format "PrimarySubjectID:TrialID"
        try:
            primary_id, trial_id = selection.split(":")
        except Exception as e:
            messagebox.showerror("Error", f"Invalid trial selection format: {e}")
            return
        
        self.primary_subject_id = primary_id
        self.trial_id = trial_id
        
        # Retrieve primary and secondary trial objects.
        self.primary_trial = self.find_trial(self.primary_subject_id, self.trial_id)
        self.secondary_trial = self.find_trial(self.primary_subject_id + "-secondary", self.trial_id)
        if self.primary_trial is None or self.secondary_trial is None:
            messagebox.showerror("Error", "Selected trial not found in both primary and secondary subjects.")
            return
        
        # Get video file path from primary trial's 'clip_ego' stream.
        self.video_path = self.get_video_file_from_trial(self.primary_trial)
        if not self.video_path:
            messagebox.showerror("Error", "No video file found for the selected trial.")
            return
        
        # Open the video capture
        self.cap = cv2.VideoCapture(self.video_path)
        if not self.cap.isOpened():
            messagebox.showerror("Error", f"Cannot open video file: {self.video_path}")
            return
        self.fps = self.cap.get(cv2.CAP_PROP_FPS)
        self.video_running = True
        self.update_frame()
    
    def find_trial(self, subject_id, trial_id):
        for subj in self.annotation_data.get("subjects", []):
            if subj.get("subject_id") == subject_id:
                for trial in subj.get("trials", []):
                    if trial.get("trial_id") == trial_id:
                        return trial
        return None
    
    def get_video_file_from_trial(self, trial, preferred_extensions=['.mp4']):
        """
        Returns the file_path from the 'egocam_rgb_audio' stream if it matches one of the preferred extensions.
        """
        clip_ego = trial.get("streams", {}).get("egocam_rgb_audio", {})
        file_path = clip_ego.get("file_path", "")
        if any(file_path.lower().endswith(ext) for ext in preferred_extensions):
            return file_path
        return None
    
    def update_frame(self):
        if not self.video_running or self.cap is None:
            return
        
        ret, frame = self.cap.read()
        if not ret:
            # Restart video at end.
            self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
            ret, frame = self.cap.read()
        current_frame_num = self.cap.get(cv2.CAP_PROP_POS_FRAMES)
        current_time = current_frame_num / self.fps
        
        # Optionally overlay active annotations on the frame.
        primary_text = ""
        for ann in self.primary_trial.get("keysteps", []):
            if ann["start_t"] <= current_time <= ann["end_t"]:
                primary_text += f"{ann['label']} "
        secondary_text = ""
        for ann in self.secondary_trial.get("keysteps", []):
            if ann["start_t"] <= current_time <= ann["end_t"]:
                secondary_text += f"{ann['label']} "
        cv2.putText(frame, "Primary: " + primary_text, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
        cv2.putText(frame, "Secondary: " + secondary_text, (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
        
        # Convert frame to Tkinter-compatible image.
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img = Image.fromarray(frame_rgb)
        imgtk = ImageTk.PhotoImage(image=img)
        self.video_label.imgtk = imgtk
        self.video_label.configure(image=imgtk)
        
        # Update the active annotations listbox.
        self.update_active_annotations(current_time)
        
        # Schedule the next frame update.
        self.master.after(int(1000/self.fps), self.update_frame)
    
    def update_active_annotations(self, current_time):
        # Clear the listbox.
        self.annotation_listbox.delete(0, tk.END)
        # Add primary annotations active at current time.
        for idx, ann in enumerate(self.primary_trial.get("keysteps", [])):
            if ann["start_t"] <= current_time <= ann["end_t"]:
                text = f"Primary - {idx}: {ann['label']} [{ann['start_t']} - {ann['end_t']}]"
                self.annotation_listbox.insert(tk.END, text)
        # Add secondary annotations active at current time.
        for idx, ann in enumerate(self.secondary_trial.get("keysteps", [])):
            if ann["start_t"] <= current_time <= ann["end_t"]:
                text = f"Secondary - {idx}: {ann['label']} [{ann['start_t']} - {ann['end_t']}]"
                self.annotation_listbox.insert(tk.END, text)
    
    def switch_annotation(self):
        # Get the selected annotation from the listbox.
        sel = self.annotation_listbox.curselection()
        if not sel:
            messagebox.showinfo("Info", "Please select an annotation to switch.")
            return
        index = sel[0]
        item_text = self.annotation_listbox.get(index)
        if item_text.startswith("Primary"):
            try:
                # Extract index from text: "Primary - {idx}: ..."
                ann_idx = int(item_text.split("-")[1].split(":")[0].strip())
            except Exception as e:
                messagebox.showerror("Error", f"Failed to parse annotation index: {e}")
                return
            if ann_idx < len(self.primary_trial.get("keysteps", [])):
                ann = self.primary_trial["keysteps"].pop(ann_idx)
                self.secondary_trial.setdefault("keysteps", []).append(ann)
                messagebox.showinfo("Info", "Annotation switched from primary to secondary.")
        elif item_text.startswith("Secondary"):
            try:
                ann_idx = int(item_text.split("-")[1].split(":")[0].strip())
            except Exception as e:
                messagebox.showerror("Error", f"Failed to parse annotation index: {e}")
                return
            if ann_idx < len(self.secondary_trial.get("keysteps", [])):
                ann = self.secondary_trial["keysteps"].pop(ann_idx)
                self.primary_trial.setdefault("keysteps", []).append(ann)
                messagebox.showinfo("Info", "Annotation switched from secondary to primary.")
        else:
            messagebox.showerror("Error", "Invalid annotation selection.")
        # Refresh the listbox.
        current_frame = self.cap.get(cv2.CAP_PROP_POS_FRAMES)
        self.update_active_annotations(current_frame / self.fps)
    
    def save_annotations(self):
        output_file = "modified_annotation_updated.json"
        try:
            with open(output_file, 'w') as f:
                json.dump(self.annotation_data, f, indent=4)
            messagebox.showinfo("Info", f"Annotations saved to {output_file}")
        except Exception as e:
            messagebox.showerror("Error", f"Failed to save annotations: {e}")

if __name__ == "__main__":
    root = tk.Tk()
    app = AnnotationEditor(root)
    root.mainloop()
