# بسم الله الرحمن الرحيم

In [None]:
!pip install evaluate

In [None]:
import os
import cv2
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import zipfile
import torch
from torch.utils.data import Dataset

from transformers import (
    AutoProcessor,
    VideoMAEForVideoClassification,
    # ViViTForVideoClassification,
    TrainingArguments,
    Trainer
)

import evaluate


In [None]:
# === utils / seed / device ===
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)


In [None]:
with zipfile.ZipFile('/content/drive/MyDrive/Data set/Shop DataSet.zip','r') as file :
  file.extractall('/content/Data set')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
dataset = {}
datapath = '/content/Data set/Shop DataSet'

for filename in os.listdir(datapath):
  videos_paths = []
  for video_name in os.listdir(os.path.join(datapath,filename)):
    videos_paths.append(os.path.join(datapath,filename,video_name))
  dataset[filename] = videos_paths


In [None]:
plt.figure(figsize=(5,5))
plt.bar(dataset.keys(), [len(videos) for videos in dataset.values()])
plt.show()

In [None]:
data_list = []
for category, videos in dataset.items():
    for video_path in videos:
        data_list.append({'labels': category, 'Video Path': video_path})

df_videos = pd.DataFrame(data_list)
df_videos

In [None]:
# Identify the minority and majority classes
class_counts = df_videos['labels'].value_counts()
minority_class = class_counts.idxmin()
majority_class = class_counts.idxmax()
minority_count = class_counts[minority_class]
majority_count = class_counts[majority_class]

print(f"Minority class: {minority_class} with {minority_count} samples")
print(f"Majority class: {majority_class} with {majority_count} samples")

# Separate minority and majority class samples
df_minority = df_videos[df_videos['labels'] == minority_class]
df_majority = df_videos[df_videos['labels'] == majority_class]

# Randomly duplicate minority class samples
df_minority_oversampled = df_minority.sample(majority_count, replace=True, random_state=42)

# Concatenate the oversampled minority class with the majority class
df_oversampled = pd.concat([df_majority, df_minority_oversampled], axis=0)

# Shuffle the oversampled dataset
df_oversampled = df_oversampled.sample(frac=1, random_state=42).reset_index(drop=True)

print("\nClass distribution after oversampling:")
print(df_oversampled['labels'].value_counts())

oversampled_class_counts = df_oversampled['labels'].value_counts()
plt.figure(figsize=(5,5))
plt.bar(oversampled_class_counts.index, oversampled_class_counts.values)
plt.title('Class Distribution After Oversampling')
plt.xlabel('Class')
plt.ylabel('Number of Samples')
plt.show()

In [None]:
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
df_oversampled['labels'] = le.fit_transform(df_oversampled['labels'])
df_oversampled

In [None]:
class VideoDataset(Dataset):
    def __init__(self, df, processor, frames_per_video=16, img_size=(224,224)):
        self.df = df.reset_index(drop=True)
        self.processor = processor
        self.frames_per_video = frames_per_video
        self.img_size = img_size

    def __len__(self):
        return len(self.df)

    def _read_frames(self, path):
        cap = cv2.VideoCapture(path)
        frames = []
        while len(frames) < self.frames_per_video:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.resize(frame, self.img_size)
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = frame.astype(np.float32) / 255.0
            frames.append(frame)
        cap.release()
        # padding if short
        if len(frames) == 0:
            # in case the video can't be read, return zeros
            return [np.zeros((self.img_size[0], self.img_size[1], 3), dtype=np.float32)] * self.frames_per_video
        while len(frames) < self.frames_per_video:
            frames.append(np.zeros_like(frames[0], dtype=np.float32))
        return frames

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        path = row['Video Path']
        label = int(row['labels'])  # ensure int label

        frames = self._read_frames(path)  # list of (H,W,C) numpy arrays

        # processor will produce tensors shaped the way the model expects
        inputs = self.processor(frames, return_tensors="pt")  # e.g. {'pixel_values': tensor(1, C, T, H, W)}
        # remove batch dim
        item = {k: v.squeeze(0) for k, v in inputs.items()}
        item["labels"] = torch.tensor(label, dtype=torch.long)
        return item


In [None]:
train_df, test_df = train_test_split(df_oversampled, test_size=0.2, random_state=42)

In [None]:

model1_id = "MCG-NJU/videomae-base"                       # original TF TimeSformer cell used this id
model2_id = "MCG-NJU/videomae-base-finetuned-kinetics"    # VideoMAE
model3_id = "google/vivit-b-16x2-kinetics400"             # ViViT

# Use AutoProcessor for each model (processor often compatible across these video models)
processor = AutoProcessor.from_pretrained(model1_id)

train_dataset = VideoDataset(train_df, processor = processor)
eval_dataset  = VideoDataset(test_df,  processor = processor)


In [None]:
accuracy_metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    return accuracy_metric.compute(predictions=preds, references=labels)


In [None]:
import inspect

def make_training_args_safe(**kwargs):
    """
    Build TrainingArguments robustly for different transformers versions.
    Ensures evaluation/save strategies are consistent or disables load_best_model_at_end.
    """
    sig = inspect.signature(TrainingArguments.__init__)
    accepted = set(sig.parameters.keys()) - {"self", "args", "kwargs"}

    # find which names this version accepts for eval/save strategy
    eval_keys = [k for k in ("evaluation_strategy", "eval_strategy") if k in accepted]
    save_keys = [k for k in ("save_strategy",) if k in accepted]  # usually save_strategy exists

    # copy only accepted kwargs
    filtered = {k: v for k, v in kwargs.items() if k in accepted}

    # if user asked to load best model at end, ensure eval & save strategies exist and match
    if filtered.get("load_best_model_at_end", False):
        # choose keys available
        eval_key = eval_keys[0] if eval_keys else None
        save_key = save_keys[0] if save_keys else None

        if eval_key is None or save_key is None:
            # cannot guarantee strategies in this transformers version -> disable feature
            filtered["load_best_model_at_end"] = False
        else:
            # if both keys not provided in filtered, set default to "epoch"
            if eval_key not in filtered and save_key not in filtered:
                filtered[eval_key] = "epoch"
                filtered[save_key] = "epoch"
            # if one provided but other not, copy it
            elif eval_key in filtered and save_key not in filtered:
                filtered[save_key] = filtered[eval_key]
            elif save_key in filtered and eval_key not in filtered:
                filtered[eval_key] = filtered[save_key]
            # if both exist but different, override to match (choose eval value)
            elif filtered.get(eval_key) != filtered.get(save_key):
                filtered[save_key] = filtered[eval_key]

    # output_dir is required
    if "output_dir" not in filtered:
        raise ValueError("output_dir is required for TrainingArguments.")

    return TrainingArguments(**filtered)


def fine_tune_model(model_class, model_id, out_dir,
                    num_labels=2, epochs=10, per_device_batch_size=2, lr=0.01):
    # load model (ignore mismatched sizes so backbone loads, head reinit)
    model = model_class.from_pretrained(model_id, num_labels=num_labels, ignore_mismatched_sizes=True).to(device)

    ta_kwargs = dict(
        output_dir=out_dir,
        # use keys that newer versions accept; make_training_args_safe will adapt if needed
        evaluation_strategy="epoch",
        save_strategy="epoch",
        learning_rate=lr,
        per_device_train_batch_size=per_device_batch_size,
        per_device_eval_batch_size=per_device_batch_size,
        num_train_epochs=epochs,
        weight_decay=0.01,
        load_best_model_at_end=True,
        fp16=torch.cuda.is_available(),
        logging_dir=f"{out_dir}/logs",
        logging_steps=10,
        push_to_hub=False
    )

    training_args = make_training_args_safe(**ta_kwargs)

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        tokenizer=processor,
        compute_metrics=compute_metrics
    )

    trainer.train()
    eval_res = trainer.evaluate(eval_dataset)
    trainer.save_model(out_dir)
    processor.save_pretrained(out_dir)
    return trainer, eval_res


In [None]:
trainer2, eval2 = fine_tune_model(VideoMAEForVideoClassification, model2_id, out_dir="results/videomae", epochs=10, per_device_batch_size=2 , lr=0.01)
print("Model 2 eval:", eval2)