Import libraries and load the model

In [None]:
import numpy as np
import cv2
import torch
import os
import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import accuracy_score, adjusted_rand_score
from collections import Counter

# Load model directly
from transformers import AutoImageProcessor, AutoModelForVideoClassification

processor = AutoImageProcessor.from_pretrained("facebook/timesformer-base-finetuned-k600", return_dict = True, output_hidden_states = True)
model = AutoModelForVideoClassification.from_pretrained("facebook/timesformer-base-finetuned-k600", return_dict = True,  output_hidden_states = True)

Utilities

In [None]:
# Function to extract frames from video with dynamic large step rate
def extract_frames(video_path, small_step_rate=1, max_large_frames=8, max_small_frames=8):
    vidcap = cv2.VideoCapture(video_path)

    # Calculate the total number of frames in the video
    total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate dynamic large step rate
    large_step_rate = max(1, total_frames // max_large_frames)

    large_step_frames = []
    small_step_frames = []
    success, image = vidcap.read()
    count = 0

    while success and (len(large_step_frames) < max_large_frames or len(small_step_frames) < max_small_frames):
        if count % large_step_rate == 0 and len(large_step_frames) < max_large_frames:
            large_step_frames.append(image)
        if count % small_step_rate == 0 and len(small_step_frames) < max_small_frames:
            small_step_frames.append(image)
        success, image = vidcap.read()
        count += 1

    vidcap.release()
    return large_step_frames, small_step_frames


# Function to embed video frames into vectors with max pooling
def embed_video(frames):

    inputs = processor(frames, return_tensors="pt")
    with torch.no_grad():
        outputs = model(**inputs)

    # Use hidden states as embeddings
    last_layer = outputs.hidden_states[-1]

    # Apply max pooling across the logits
    pooled_embeddings, _ = torch.max(last_layer, dim=1)

    return pooled_embeddings

def process_videos_in_folders(folders, labels):
    embedded_vectors = []
    embedded_labels = []

    for folder, label in zip(folders, labels):
        video_files = [f for f in os.listdir(folder) if f.endswith(".mp4")]

        for video_file in video_files:
            video_path = os.path.join(folder, video_file)
            large_step_frames, small_step_frames = extract_frames(video_path)
            embedding_vector_large = embed_video(large_step_frames)
            embedding_vector_small = embed_video(small_step_frames)
            embedding_vector = torch.cat((embedding_vector_large, embedding_vector_small), dim=0)
            # Convert to numpy array and store in list
            embedding_vector_np = embedding_vector.numpy()
            embedded_vectors.append(embedding_vector_np)
            embedded_labels.append(label)

    return np.array(embedded_vectors), np.array(embedded_labels)

# Function to perform KMeans clustering and compare with true labels
def compare_kmeans_with_labels(embedded_vectors, true_labels, n_clusters=3):
    # Perform KMeans clustering
    kmeans = KMeans(n_clusters=n_clusters, random_state=0)
    predicted_labels = kmeans.fit_predict(embedded_vectors)

    # Since KMeans labels are arbitrary, we need to map them to the true labels
    label_map = {}
    for i in range(n_clusters):
        # Find the most common true label in each cluster
        cluster_indices = np.where(predicted_labels == i)[0]
        true_labels_in_cluster = true_labels[cluster_indices]
        most_common_label = Counter(true_labels_in_cluster).most_common(1)[0][0]
        label_map[i] = most_common_label

    # Map predicted labels to the most common true label in each cluster
    mapped_labels = np.array([label_map[label] for label in predicted_labels])

    # Calculate accuracy
    accuracy = accuracy_score(true_labels, predicted_labels)

    # Calculate Adjusted Rand Index (a more robust clustering metric)
    ari = adjusted_rand_score(true_labels, predicted_labels)

    return accuracy, ari, mapped_labels, predicted_labels


In [None]:
# Example usage
folders = ["/content/drive/MyDrive/processed_glasses",
           "/content/drive/MyDrive/processed_climb",
           "/content/drive/MyDrive/processed_brush"]

labels = [0, 1, 2]  # Labels corresponding to the folders

embedded_vectors, embedded_labels = process_videos_in_folders(folders, labels)
# Reshape embedded_vectors to be 2D array
embeddings = [np.concatenate((vec[0], vec[1])) for vec in embedded_vectors]

768

In [None]:
import pandas as pd

# Convert to DataFrame
df = pd.DataFrame(embeddings)
df['label'] = embedded_labels

In [None]:
df.to_csv('../embeddings.csv', index=False)

In [None]:
# Example usage
accuracy, ari, mapped_labels, predicted_labels = compare_kmeans_with_labels(embeddings, embedded_labels, n_clusters=3)

print(f"Accuracy: {accuracy}")
print(f"Adjusted Rand Index (ARI): {ari}")
