In [None]:
import pandas as pd
from sentence_transformers import SentenceTransformer
import numpy as np

# Load Excel file
file_path = "../../LLM_output_generation/extracted_video_anomalies_all_models.xlsx"
df = pd.read_excel(file_path)

# Initialize model
model = SentenceTransformer("all-MiniLM-L6-v2")

# Define model names
model_names = [
    "claude3.5sonnet",
    "claude3.5",
    "claude3.7",
    "gpt4o-mini",
    "gpt4o"
]

# Prepare final DataFrame
final_data = pd.DataFrame()
final_data["video_name"] = df["video_name"]

# Embed and extract selected fields
for model_name in model_names:
    desc_col = f"{model_name}-description"
    reas_col = f"{model_name}-reasoning"
    anomaly_col = f"{model_name}-anomaly"

    if desc_col in df.columns:
        desc_texts = df[desc_col].tolist()
        desc_embeds = []
        for text in desc_texts:
            if pd.isna(text):
                desc_embeds.append(np.nan)
            else:
                desc_embeds.append(model.encode(text))
        final_data[f"{model_name}-desc_emb"] = desc_embeds

    if reas_col in df.columns:
        reas_texts = df[reas_col].tolist()
        reas_embeds = []
        for text in reas_texts:
            if pd.isna(text):
                reas_embeds.append(np.nan)
            else:
                reas_embeds.append(model.encode(text))
        final_data[f"{model_name}-reas_emb"] = reas_embeds

    if anomaly_col in df.columns:
        final_data[f"{model_name}-anomaly"] = df[anomaly_col]

# Save to output file
output_path = "embedded_only_video_anomalies.xlsx"
final_data.to_excel(output_path, index=False)

print(f"✅ Done! File saved as: {output_path}")


In [None]:
import pandas as pd
import numpy as np
import itertools
from sklearn.metrics.pairwise import cosine_similarity

# Load Excel file
file_path = "embedded_only_video_anomalies.xlsx"
df = pd.read_excel(file_path)

# Model names
model_names = [
    "claude3.5sonnet",
    "claude3.5",
    "claude3.7",
    "gpt4o-mini",
    "gpt4o"
]

# Parse stringified NumPy arrays into Python lists
def parse_embedding(x):
    if isinstance(x, str):
        try:
            return np.fromstring(x.strip("[]"), sep=" ").tolist()
        except:
            return np.nan
    return x

# Apply parsing to all embedding columns
for model in model_names:
    for prefix in ["desc", "reas"]:
        col = f"{model}-{prefix}_emb"
        if col in df.columns:
            df[col] = df[col].apply(parse_embedding)

# Compute pairwise cosine similarities for each (model1, model2) pair
def compute_similarities(df, prefix):
    similarities = pd.DataFrame()
    similarities["video_name"] = df["video_name"]
    for model1, model2 in itertools.combinations(model_names, 2):
        col1 = f"{model1}-{prefix}_emb"
        col2 = f"{model2}-{prefix}_emb"
        sim_col = f"{model1}_vs_{model2}_{prefix}_sim"
        sim_values = []
        for v1, v2 in zip(df[col1], df[col2]):
            try:
                sim = cosine_similarity([v1], [v2])[0][0]
            except:
                sim = np.nan
            sim_values.append(sim)
        similarities[sim_col] = sim_values
    return similarities

# Compute similarities
desc_sim = compute_similarities(df, "desc")
reas_sim = compute_similarities(df, "reas")

# Get anomaly columns
anomaly_df = df[["video_name"] + [f"{model}-anomaly" for model in model_names]]

# Combine all results
final_df = desc_sim.merge(reas_sim, on="video_name").merge(anomaly_df, on="video_name")

# Save to Excel
final_df.to_excel("video_anomaly_similarities_output.xlsx", index=False)


In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

suffix = "desc"


# Load files
anno_df = pd.read_excel('../wound_groundtruth.xlsx')


#To obtain video_anomaly_similarities_output_desc.xlsx, we only need to keep columns regarding description similarities in video_anomaly_similarities_output.xlsx

desc_df = pd.read_excel(f'video_anomaly_similarities_output_{suffix}.xlsx')

# Process video names
anno_df['Image Name'] = anno_df['Image Name'].str.replace('.jpg', '', regex=False)
desc_df['video_name'] = desc_df['video_name'].str.replace('.jpg', '', regex=False)

# Merge
merged_df = pd.merge(anno_df, desc_df, left_on='Image Name', right_on='video_name')

# Split by type with same ratio
train_list = []
test_list = []

for label in ['Abrasions', 'Ingrown Nails', 'Stab Wound', 'Bruises', 'Cut', 'Laceration', 'Burns']:
    subset = merged_df[merged_df['Label'] == label]
    train, test = train_test_split(subset, test_size=0.2, random_state=42)
    train_list.append(train)
    test_list.append(test)
    print(len(test))

# Combine and drop columns
train_df = pd.concat(train_list).reset_index(drop=True).drop(columns=['Label', 'Image Name'])
test_df = pd.concat(test_list).reset_index(drop=True).drop(columns=['Label','Image Name'])

train_df.to_excel(f'train_dataset_{suffix}.xlsx', index=False)
test_df.to_excel(f'test_dataset_{suffix}.xlsx', index=False)


In [None]:
import pandas as pd
from sklearn.model_selection import StratifiedKFold

suffix = "desc"

# Load files
anno_df = pd.read_excel('../wound_groundtruth.xlsx')
desc_df = pd.read_excel('train_dataset_desc.xlsx')

# Clean video names
anno_df['Image Name'] = anno_df['Image Name'].str.replace('.jpg', '', regex=False)
desc_df['video_name'] = desc_df['video_name'].str.replace('.jpg', '', regex=False)

# Merge datasets
merged_df = pd.merge(anno_df, desc_df, left_on='Image Name', right_on='video_name')

# Optional: Create a copy of Label for stratification
merged_df['strat_label'] = merged_df['Label']

# Initialize StratifiedKFold
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

# Generate and save each training fold
for fold, (train_idx, val_idx) in enumerate(skf.split(merged_df, merged_df['strat_label'])):
    train_fold = merged_df.iloc[train_idx].reset_index(drop=True)
    # Drop unnecessary columns
    train_fold = train_fold.drop(columns=['Label', 'Image Name', 'strat_label'])

    # Save to Excel
    train_fold.to_excel(f'train_dataset_fold{fold}_{suffix}.xlsx', index=False)

print("✅ 5-fold training splits saved.")


In [None]:
import numpy as np
import pandas as pd

class PMF:
    def __init__(self, num_users, num_items, num_factors, learning_rate=0.01, reg_param=0.01):
        self.num_users = num_users
        self.num_items = num_items
        self.num_factors = num_factors
        self.learning_rate = learning_rate
        self.reg_param = reg_param
        # Initialize U and V here, so they can be reset for grid search
        self.U = np.random.normal(scale=1./self.num_factors, size=(self.num_users, self.num_factors))
        self.V = np.random.normal(scale=1./self.num_factors, size=(self.num_items, self.num_factors))

    def train(self, train_ratings, val_ratings, num_epochs=100, patience=5):
        # train_ratings and val_ratings are lists of [i, j, rating]
        
        best_val_rmse = float('inf')
        epochs_without_improvement = 0
        
        # Training using SGD
        for epoch in range(num_epochs):
            # Shuffle training data for each epoch
            np.random.shuffle(train_ratings)
            
            for i, j, r_ij in train_ratings:
                i, j = int(i), int(j)
                prediction = self.predict(i, j)
                error = r_ij - prediction

                # Update latent factors
                u_i = self.U[i, :].copy() # Make a copy before update
                v_j = self.V[j, :].copy() # Make a copy before update
                
                self.U[i, :] += self.learning_rate * (error * v_j - self.reg_param * u_i)
                self.V[j, :] += self.learning_rate * (error * u_i - self.reg_param * v_j)

            # --- Validation Phase ---
            val_rmse = self.compute_rmse(val_ratings)
            print(f'Epoch: {epoch+1}, Validation RMSE: {val_rmse:.4f}')

            # --- Early Stopping Logic ---
            if val_rmse < best_val_rmse:
                best_val_rmse = val_rmse
                epochs_without_improvement = 0
                # Optionally save the best model weights
                best_U, best_V = self.U.copy(), self.V.copy()
            else:
                epochs_without_improvement += 1
            
            if epochs_without_improvement >= patience:
                print(f"Stopping early after {epoch+1} epochs.")
                self.U, self.V = best_U, best_V # Restore best weights
                break
        
        return best_val_rmse

    def predict(self, i, j):
        return np.dot(self.U[i, :], self.V[j, :])

    def compute_rmse(self, ratings_data):
        # ratings_data is a list of [i, j, rating]
        if len(ratings_data) == 0:
            return 0.0
            
        error = 0
        for i, j, r_ij in ratings_data:
            i, j = int(i), int(j)
            error += (r_ij - self.predict(i, j)) ** 2
        
        # Return Root Mean Squared Error
        return np.sqrt(error / len(ratings_data))

    def full_matrix(self):
        return np.dot(self.U, self.V.T)

In [None]:
import os
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error

# --- Main pipeline ---
data_dir = "./"  # change if needed
os.makedirs("low list", exist_ok=True)

for fold in range(5):
    suffix = f"fold{fold}_desc"
    train_file = os.path.join(data_dir, f"train_dataset_{suffix}.xlsx")
    test_file = os.path.join(data_dir, f"test_dataset_desc.xlsx")
   
    df = pd.read_excel(train_file, sheet_name="Sheet1")
    R = df.select_dtypes(include=[np.number]).fillna(0).values
    xs, ys = R.nonzero()
    ratings = np.c_[xs, ys, R[xs, ys]]
    np.random.seed(42)
    np.random.shuffle(ratings)
    split_index = int(0.8 * len(ratings))
    train_ratings = ratings[:split_index]
    val_ratings = ratings[split_index:]
    num_users, num_items = R.shape

    param_grid = {
        'num_factors': [5, 10, 15],
        'learning_rate': [0.005, 0.01, 0.05],
        'reg_param': [0.01, 0.1, 0.5]
    }

    best_rmse = float('inf')
    for f in param_grid['num_factors']:
        for lr in param_grid['learning_rate']:
            for reg in param_grid['reg_param']:
                pmf = PMF(num_users, num_items, f, lr, reg)
                rmse = pmf.train(train_ratings, val_ratings, num_epochs=100, patience=5)
                if rmse < best_rmse:
                    best_rmse = rmse
                    best_model = pmf

    V = best_model.V
    V_df = pd.DataFrame(V)
    V_df.to_excel(f"best_V_matrix_{suffix}.xlsx", index=False)

    test_df = pd.read_excel(test_file)
    test_numeric = test_df.select_dtypes(include=[np.number]).apply(lambda x: x.fillna(x.mean()), axis=0).values
    video_names = test_df['video_name'].astype(str).values

    mse_list = []
    c_list = []
    for s in test_numeric:
        c, _, _, _ = np.linalg.lstsq(V, s, rcond=None)
        mse = mean_squared_error(s, V @ c)
        mse_list.append(mse)
        c_list.append(c)

    result_df = pd.DataFrame(c_list, columns=[f'c{i}' for i in range(V.shape[1])])
    result_df['MSE'] = mse_list
    result_df.insert(0, 'video_name', video_names)
    result_df.to_excel(f'optimal_c_and_mse_lstsq_with_name_{suffix}.xlsx', index=False)

    uncertainty_scores = np.array(mse_list)
    for P in [5, 10, 15, 20, 25, 30, 35, 40]:
        tau = np.percentile(uncertainty_scores, 100 - P)
        mask = uncertainty_scores <= tau
        kept_df = pd.DataFrame({
            "video_name": video_names[mask],
            "uncertainty": uncertainty_scores[mask]
        })
        kept_df.to_excel(f"low list/S_low_videos_{P}_sup_{suffix}.xlsx", index=False)


In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error


for fold in range(5):
    suffix = f"fold{fold}_desc"
    
    # Load files
    V_df = pd.read_excel(f'best_V_matrix_{suffix}.xlsx')
    test_df = pd.read_excel(f'train_dataset_{suffix}.xlsx')
  
    
    # Prepare matrix V and numeric part of test dataset
    V = V_df.values
    test_numeric_df = test_df.select_dtypes(include=[np.number])
    
    # Fill missing values with column mean
    test_numeric_df = test_numeric_df.apply(lambda x: x.fillna(x.mean()), axis=0)
    S_numeric = test_numeric_df.values
    
    # Extract video names (assumes column 'video_name' exists)
    video_names = test_df['video_name'].values
    
    # Solve for c minimizing ||s - Vc||^2 using least squares
    mse_list = []
    c_list = []
    
    for s in S_numeric:
        c, residuals, _, _ = np.linalg.lstsq(V, s, rcond=None)
        s_pred = V @ c
        mse = mean_squared_error(s, s_pred)
        mse_list.append(mse)
        c_list.append(c)
    
    # Output results with video names
    result_df = pd.DataFrame(c_list, columns=[f'c{i}' for i in range(V.shape[1])])
    result_df['MSE'] = mse_list
    result_df.insert(0, 'video_name', video_names)
    
    result_df.to_excel(f'optimal_c_and_mse_lstsq_with_name_{suffix}_train.xlsx', index=False)


    uncertainty_scores = np.array(mse_list)
    for P in [5, 10, 15, 20, 25, 30, 35, 40]:
        tau = np.percentile(uncertainty_scores, 100 - P)
        mask = uncertainty_scores <= tau
        kept_df = pd.DataFrame({
            "video_name": video_names[mask],
            "uncertainty": uncertainty_scores[mask]
        })
        kept_df.to_excel(f"low list/S_low_videos_{P}_sup_{suffix}_train.xlsx", index=False)

 


In [None]:
import pandas as pd
import os

# Load files
groundtruth_path = "wound_groundtruth.xlsx"
predictions_path = "../../LLM_output_generation/extracted_video_anomalies_all_models.xlsx"

groundtruth_df = pd.read_excel(groundtruth_path)
predictions_df = pd.read_excel(predictions_path, sheet_name="Sheet1")

# Normalize filenames and labels
groundtruth_df["filename"] = groundtruth_df["Image Name"].str.strip().str.replace('.jpg', '', regex=False)
groundtruth_df["true_label"] = groundtruth_df["Label"].str.strip()
groundtruth_df["filename"] = groundtruth_df["filename"].astype(str)
predictions_df["filename"] = predictions_df["video_name"].str.strip().str.replace('.jpg', '', regex=False)

# Mapping model names to column suffixes
model_suffixes = {
    "claude3.5sonnet": "claude3.5sonnet-anomaly",
    "claude3.5": "claude3.5-anomaly",
    "claude3.7": "claude3.7-anomaly",
    "gpt4o-mini": "gpt4o-mini-anomaly",
    "gpt4o": "gpt4o-anomaly"
}

# Output directory
output_dir = "../model_predictions"
os.makedirs(output_dir, exist_ok=True)

# Process and save CSVs
for model_name, col_name in model_suffixes.items():
    model_df = predictions_df[["filename", col_name]].rename(columns={
        "filename": "Video Name",
        col_name: "Predicted Label"
    })
    groundtruth_subset = groundtruth_df[["filename", "true_label"]].rename(columns={
        "filename": "Video Name",
        "true_label": "True Label"
    })
    merged = pd.merge(model_df, groundtruth_subset, on="Video Name", how="left")
    merged.to_csv(os.path.join(output_dir, f"vad_results_{model_name}.csv"), index=False)


In [None]:
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Model names
model_names = [
    "claude3.5sonnet",
    "claude3.5",
    "claude3.7",
    "gpt4o-mini",
    "gpt4o"
]
# P values representing percent of videos excluded as high-uncertainty
P_set = [5, 10, 15, 20, 25, 30, 35, 40]



for fold in range(5):
    suffix = f"fold{fold}_desc"



    
    # Store results for each P
    all_results = []
    
    for P in P_set:
        
        try:
            low = pd.read_excel(f'low list/S_low_videos_{P}_sup_{suffix}.xlsx')
        except Exception as e:
            print(f"[Error] Failed to load S_low_videos_{P}_trace.xlsx: {e}")
            continue
        
        low_list = low['video_name'].str.replace('.mp4', '', regex=False)
        
        # Store predictions per model
        model_preds = {}
        
        for model in model_names:
            try:
                df = pd.read_csv(f'../model_predictions/vad_results_{model}.csv')
                df['Video Name'] = df['Video Name'].str.replace('.jpg', '', regex=False)
                df = df[df['Video Name'].isin(low_list)]
                model_preds[model] = df[['Video Name', 'Predicted Label']].set_index('Video Name')
            except Exception as e:
                print(f"[Error] Failed to process {model}: {e}")
        
        # Merge all predictions
        merged = pd.DataFrame(index=low_list)
        for model in model_names:
            merged[model] = model_preds.get(model, pd.DataFrame()).reindex(low_list)['Predicted Label']
        
        # Drop rows with missing predictions
        merged = merged.dropna()
        
        if merged.empty:
            print(f"[Warning] No valid data for P={P}, skipping.")
            continue
        
        # Load ground truth from claude-3-5-sonnet
        gt_df = pd.read_csv('../model_predictions/vad_results_claude3.5sonnet.csv')
        gt_df['Video Name'] = gt_df['Video Name'].str.replace('.mp4', '', regex=False)
        gt_df = gt_df[gt_df['Video Name'].isin(merged.index)]
        ground_truth = gt_df.set_index('Video Name')['True Label']
        
        # Majority voting
        majority_vote = merged.mode(axis=1)[0]
        
        # Align ground truth
        y_true = ground_truth.loc[merged.index]
        
        # Compute metrics
        acc = accuracy_score(y_true, majority_vote)
        prec = precision_score(y_true, majority_vote,  average='macro',zero_division=0)
        rec = recall_score(y_true, majority_vote, average='macro', zero_division=0)
        f1 = f1_score(y_true, majority_vote, average='macro', zero_division=0)
        
        all_results.append({
            'P': P,
            'Num Videos': len(merged),
            'Accuracy': acc,
            'Precision': prec,
            'Recall': rec,
            'F1-Score': f1
        })
        
        print(f"[P={P}] Processed {len(merged)} videos | Accuracy: {acc:.4f}, F1-Score: {f1:.4f}")
    
    # Save final results to one Excel file
    result_df = pd.DataFrame(all_results)
    result_df.to_excel(f'low list/VAD_Majority_Voting_Summary_sup_{suffix}.xlsx', index=False)
    print("[Saved] VAD_Majority_Voting_Summary.xlsx")


In [None]:
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Model names
model_names = [
    "claude3.5sonnet",
    "claude3.5",
    "claude3.7",
    "gpt4o-mini",
    "gpt4o"
]
# P values representing percent of videos excluded as high-uncertainty
P_set = [5, 10, 15, 20, 25, 30, 35, 40]



for fold in range(5):
    suffix = f"fold{fold}_desc"


    # Store results for each P
    all_results = []
    
    for P in P_set:
        
        try:
            low = pd.read_excel(f'low list/S_low_videos_{P}_sup_{suffix}_train.xlsx')
        except Exception as e:
            print(f"[Error] Failed to load S_low_videos_{P}_trace.xlsx: {e}")
            continue
        
        low_list = low['video_name'].str.replace('.mp4', '', regex=False)
        
        # Store predictions per model
        model_preds = {}
        
        for model in model_names:
            try:
                df = pd.read_csv(f'../model_predictions/vad_results_{model}.csv')
                df['Video Name'] = df['Video Name'].str.replace('.jpg', '', regex=False)
                df = df[df['Video Name'].isin(low_list)]
                model_preds[model] = df[['Video Name', 'Predicted Label']].set_index('Video Name')
            except Exception as e:
                print(f"[Error] Failed to process {model}: {e}")
        
        # Merge all predictions
        merged = pd.DataFrame(index=low_list)
        for model in model_names:
            merged[model] = model_preds.get(model, pd.DataFrame()).reindex(low_list)['Predicted Label']
        
        # Drop rows with missing predictions
        merged = merged.dropna()
        
        if merged.empty:
            print(f"[Warning] No valid data for P={P}, skipping.")
            continue
        
        # Load ground truth from claude-3-5-sonnet
        gt_df = pd.read_csv('../model_predictions/vad_results_claude3.5sonnet.csv')
        gt_df['Video Name'] = gt_df['Video Name'].str.replace('.mp4', '', regex=False)
        gt_df = gt_df[gt_df['Video Name'].isin(merged.index)]
        ground_truth = gt_df.set_index('Video Name')['True Label']
        
        # Majority voting
        majority_vote = merged.mode(axis=1)[0]
        
        # Align ground truth
        y_true = ground_truth.loc[merged.index]
        
        # Compute metrics
        acc = accuracy_score(y_true, majority_vote)
        prec = precision_score(y_true, majority_vote,  average='macro',zero_division=0)
        rec = recall_score(y_true, majority_vote, average='macro', zero_division=0)
        f1 = f1_score(y_true, majority_vote, average='macro', zero_division=0)
        
        all_results.append({
            'P': P,
            'Num Videos': len(merged),
            'Accuracy': acc,
            'Precision': prec,
            'Recall': rec,
            'F1-Score': f1
        })
        
        print(f"[P={P}] Processed {len(merged)} videos | Accuracy: {acc:.4f}, F1-Score: {f1:.4f}")
    
    # Save final results to one Excel file
    result_df = pd.DataFrame(all_results)
    result_df.to_excel(f'low list/VAD_Majority_Voting_Summary_sup_{suffix}_train.xlsx', index=False)
    print("[Saved] VAD_Majority_Voting_Summary.xlsx")


In [None]:
import pandas as pd

#suffix = "desc"


for fold in range(5):
    suffix = f"fold{fold}_desc"


    
    # Load the summary files
    majority_df = pd.read_excel(f'low list/VAD_Majority_Voting_Summary_sup_{suffix}.xlsx')
    
    # Extract metrics
    P_values = majority_df['P']
    overall_accuracy = majority_df['Accuracy']
    # recall = majority_df['Recall']
    # vague_abnormal_accuracy = vague_df['Accuracy']
    
    # Combine into final DataFrame
    out_df = pd.DataFrame({
        'P': P_values,
        'Overall Accuracy': overall_accuracy
        # 'Recall': recall,
        # 'Vague Abnormal Accuracy': vague_abnormal_accuracy
    })
    
    # Remove final two rows
    out_df = out_df.iloc[:, :]
    
    # Save to xlsx
    out_df.to_excel(f'low list/results_P_sup_{suffix}.xlsx', index=False)
