# Set the Working Directory

In [None]:
import os

working_dir = os.getcwd()

if os.path.isdir(working_dir):
    print("Working directory is ready!")
else:
    raise ValueError("Working directory does not exist")

# Set the Trace Directory

In [None]:
import yaml

# Load configuration file
config_path = os.path.join(working_dir, "configuration.yaml")
with open(config_path, "r") as f:
    config = yaml.safe_load(f)

# Resolve trace directory
trace_dir = os.path.join(working_dir, "data", config["working_trace"])

if os.path.isdir(trace_dir):
    print(f"✅ Trace directory ready: {trace_dir}")
else:
    raise FileNotFoundError(f"❌ Trace directory not found: {trace_dir}\n")

# Load Metadata and Sampled Video Pairs
⚠️ Please ensure [user-defined metadata has been processed](user_defined_metadata.ipynb) and [video pairs have been sampled](sampling.ipynb) before running this cell.

In [None]:
import random
from utils import load_pickle_file

# Load pickle files
metadata = load_pickle_file(os.path.join(trace_dir, "metadata.pickle"))
sampled_pairs = load_pickle_file(os.path.join(trace_dir, "sampled_pairs.pickle"))

# Shuffle video pairs
random.shuffle(sampled_pairs)

# Create or Load Annotations

In [None]:
from glob import glob
import pickle

def create_or_load_annotation(trace_dir):
    annotations_dir = os.path.join(trace_dir, "annotations")
    os.makedirs(annotations_dir, exist_ok=True)

    # List existing annotators
    existing_files = sorted(glob(os.path.join(annotations_dir, "annotator_*.pickle")))
    existing_annotators = [os.path.splitext(os.path.basename(f))[0] for f in existing_files]

    if existing_annotators:
        print("\n📝 Existing annotators:")
        print(f"  [0] Create a new annotator")
        for idx, name in enumerate(existing_annotators, start=1):
            print(f"  [{idx}] {name}")

        # Prompt user to select an annotator by number
        while True:
            choice = input("\nSelect an annotator by number (enter '0' to create new): ").strip()
            if choice.isdigit() and 0 <= int(choice) <= len(existing_annotators):
                choice = int(choice)
                break
            print("Invalid input. Please enter a valid number.")

        if choice == 0:
            # Create a new annotator
            existing_ids = [int(name.split('_')[1]) for name in existing_annotators]
            new_id = max(existing_ids, default=0) + 1
            annotator = f"annotator_{new_id}"
            annotation = {}
            print(f"\n🆕 Created an new annotator: {annotator}")
        else:
            # Load an existing annotator
            annotator = existing_annotators[choice - 1]
            with open(os.path.join(annotations_dir, f"{annotator}.pickle"), "rb") as file:
                annotation = pickle.load(file)
            print(f"\n✅ Loaded annotation for {annotator}")
            print(f"# Annotated pairs: {len(annotation)}")

    else:
        # Create the first annotator
        annotator = "annotator_1"
        annotation = {}
        print(f"\n🆕 No existing annotators found. Created new annotator: {annotator}")

    # Save annotation to pickle
    with open(os.path.join(annotations_dir, f"{annotator}.pickle"), "wb") as file:
        pickle.dump(annotation, file)

    return annotator, annotation

# Create or load annotation
annotator, annotation = create_or_load_annotation(trace_dir)

# Annotate Sampled Video Pairs

> Manually loop through the cell below 🔽

In [None]:
from utils import *

id_1, id_2 = get_next_unannotated_pair(sampled_pairs, annotation)

# Exit loop if all pairs are annotated
if (id_1, id_2) == (None, None):
    print("✅ Completed annotation!")
    raise StopIteration("No more pairs to annotate.")

display_metadata_and_videos([id_1, id_2], metadata, trace_dir)

# Collect annotation input
response = valid_input(["yes", "no"], prompt="Are the two videos similar? [yes/no]: ")
annotation[(id_1, id_2)] = (response == "yes")

# Save annotation
annotation_path = os.path.join(trace_dir, f"annotations/{annotator}.pickle")
with open(annotation_path, "wb") as file:
    pickle.dump(annotation, file)
print(f"📦 Saved annotation to: {annotation_path}")

# Display progress
print(f"📊 Progress: {len(annotation)} annotated / {len(sampled_pairs) - len(annotation)} remaining")

# Evaluate Agreement Between Two Annotators

In [None]:
from glob import glob
import pickle
from sklearn.metrics import cohen_kappa_score

def load_annotation(name, annotations_dir):
    path = os.path.join(annotations_dir, f"{name}.pickle")
    if not os.path.isfile(path):
        raise FileNotFoundError(f"Annotation file not found: {path}")
    with open(path, "rb") as file:
        return pickle.load(file)

def select_two_annotators(trace_dir):
    annotations_dir = os.path.join(trace_dir, "annotations")
    existing_files = sorted(glob(os.path.join(annotations_dir, "annotator_*.pickle")))
    existing_annotators = [os.path.splitext(os.path.basename(f))[0] for f in existing_files]

    if len(existing_annotators) < 2:
        raise RuntimeError("❌ Fewer than two annotators found. Please complete at least two annotations before measuring agreement.")

    elif len(existing_annotators) == 2:
        print(f"\n📋 Exactly two annotators found: {existing_annotators[0]} and {existing_annotators[1]}")
        return existing_annotators[0], existing_annotators[1]

    else:
        print("\n📋 Available annotators:")
        for idx, name in enumerate(existing_annotators, start=1):
            print(f"  [{idx}] {name}")

        while True:
            try:
                idx1 = int(input("Select the first annotator by number: ").strip())
                idx2 = int(input("Select the second annotator by number: ").strip())
                if idx1 != idx2 and 1 <= idx1 <= len(existing_annotators) and 1 <= idx2 <= len(existing_annotators):
                    return existing_annotators[idx1 - 1], existing_annotators[idx2 - 1]
                else:
                    print("❌ Invalid selection. Make sure to choose two different annotators.")
            except ValueError:
                print("❌ Invalid input. Please enter numeric value.")

def compute_annotator_agreement(trace_dir):
    a1_name, a2_name = select_two_annotators(trace_dir)
    print(f"\n🔍 Comparing annotations from: {a1_name} and {a2_name}")

    a1 = load_annotation(a1_name, os.path.join(trace_dir, "annotations"))
    a2 = load_annotation(a2_name, os.path.join(trace_dir, "annotations"))

    if not len(a1) == len(sampled_pairs) or not len(a2) == len(sampled_pairs):
        print(f"⚠️ {a1_name} has completed {len(a1)} out of {len(sampled_pairs)} annotations.")
        print(f"⚠️ {a2_name} has completed {len(a2)} out of {len(sampled_pairs)} annotations.")
        raise ValueError("❌ Both annotators must complete all sampled pairs before comparison.")

    common_pairs = set(a1.keys()) & set(a2.keys())
    if not common_pairs:
        raise ValueError("❌ No common annotated pairs found between the selected annotators.")

    y1, y2, common_annotation = [], [], {}
    for pair in common_pairs:
        y1.append(a1[pair])
        y2.append(a2[pair])
        if a1[pair] == a2[pair]:
            common_annotation[pair] = a1[pair]

    agreement = sum([i == j for i, j in zip(y1, y2)]) / len(y1)
    kappa = cohen_kappa_score(y1, y2)

    print(f"\n✅ Completed agreement analysis.")
    print(f"  Number of common annotated pairs: {len(common_pairs)}")
    print(f"  Exact agreement: {agreement * 100:.1f}%")
    print(f"  Cohen’s Kappa: {kappa:.2f}")
    
    return common_annotation

# Run agreement analysis
final_annotation = compute_annotator_agreement(trace_dir)

# Resolve Disagreements

> Manually loop through the cell below 🔽

In [None]:
from utils import *

id_1, id_2 = get_next_unannotated_pair(sampled_pairs, final_annotation)

# Exit loop if all pairs are annotated
if (id_1, id_2) == (None, None):
    print("✅ Completed resolving disagreements!")
    raise StopIteration("No more pairs to annotate.")

display_metadata_and_videos([id_1, id_2], metadata, trace_dir)

# Collect annotation input
response = valid_input(["yes", "no"], prompt="Are the two videos similar? [yes/no]: ")
final_annotation[(id_1, id_2)] = (response == "yes")

# Save annotation
annotation_path = os.path.join(trace_dir, f"annotations/final_annotation.pickle")
with open(annotation_path, "wb") as file:
    pickle.dump(final_annotation, file)
print(f"📦 Saved common annotation to: {annotation_path}")

# Display progress
print(f"📊 Progress: {len(final_annotation)} annotated / {len(sampled_pairs) - len(final_annotation)} remaining")