In [None]:
# 🧩 STEP 1: Install dependencies
!pip install -q torch==1.13.1+cu116 torchvision==0.14.1+cu116 --extra-index-url https://download.pytorch.org/whl/cu116
!pip install -q git+https://github.com/facebookresearch/segment-anything.git

# 🧠 STEP 2: Import packages and mount Google Drive
import os
import cv2
import numpy as np
import torch
from segment_anything import sam_model_registry, SamPredictor
from google.colab import drive

drive.mount('/content/drive')

# 🗂️ STEP 3: Set paths
INPUT_VIDEO_DIR = "/content/drive/MyDrive/Test"      # Folder with input .avi files
OUTPUT_VIDEO_DIR = "/content/drive/MyDrive/SegmentedTest"        # Folder for cropped outputs
SAM_CHECKPOINT_PATH = "/content/drive/MyDrive/sam_vit_b.pth"      # SAM model checkpoint path

os.makedirs(OUTPUT_VIDEO_DIR, exist_ok=True)

# 🧠 STEP 4: Load SAM
sam = sam_model_registry["vit_b"](checkpoint=SAM_CHECKPOINT_PATH)
sam.to('cuda' if torch.cuda.is_available() else 'cpu')
predictor = SamPredictor(sam)

# 🧠 STEP 5: Function to get bounding box from first frame using SAM
def get_bbox_from_first_frame(video_path):
    cap = cv2.VideoCapture(video_path)
    ret, first_frame = cap.read()
    cap.release()

    if not ret:
        print(f"❌ Couldn't read first frame from {video_path}")
        return None

    rgb = cv2.cvtColor(first_frame, cv2.COLOR_BGR2RGB)
    predictor.set_image(rgb)

    h, w = rgb.shape[:2]
    input_point = np.array([[w // 2, h // 2]])
    input_label = np.array([1])

    masks, _, _ = predictor.predict(
        point_coords=input_point,
        point_labels=input_label,
        multimask_output=False
    )

    mask = masks[0].astype(np.uint8)
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours:
        print(f"⚠️ No contours found in first frame of {video_path}")
        return None

    x, y, bw, bh = cv2.boundingRect(contours[0])
    return (x, y, bw, bh)

# 🎞️ STEP 6: Crop video using bounding box
def crop_video(video_path, output_path, bbox):
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    fps = cap.get(cv2.CAP_PROP_FPS)
    x, y, w, h = bbox

    out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        cropped = frame[y:y+h, x:x+w]
        out.write(cropped)

    cap.release()
    out.release()

# 🔁 STEP 7: Batch process all videos
for file in os.listdir(INPUT_VIDEO_DIR):
    if file.lower().endswith(".avi"):
        input_path = os.path.join(INPUT_VIDEO_DIR, file)
        output_path = os.path.join(OUTPUT_VIDEO_DIR, f"cropped_{file}")

        print(f"▶️ Processing: {file}")
        bbox = get_bbox_from_first_frame(input_path)

        if bbox:
            crop_video(input_path, output_path, bbox)
            print(f"✅ Done: {output_path}")
        else:
            print(f"❌ Skipped: {file} (no valid bbox)")

[31mERROR: Could not find a version that satisfies the requirement torch==1.13.1+cu116 (from versions: 2.2.0, 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.4.0, 2.4.1, 2.5.0, 2.5.1, 2.6.0, 2.7.0, 2.7.1, 2.8.0)[0m[31m
[0m[31mERROR: No matching distribution found for torch==1.13.1+cu116[0m[31m
[0m  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for segment_anything (setup.py) ... [?25l[?25hdone


ValueError: mount failed

In [None]:
# Import required libraries
import torch
import torch.nn as nn
from torchvision import models, transforms
import cv2
import os
import numpy as np
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Define base directories and folder names
base_dir = '/content/drive/MyDrive'
pure_train_folder = os.path.join(base_dir, '/content/drive/MyDrive/SegmentedVideoPT')
adulterated_train_folder = os.path.join(base_dir, '/content/drive/MyDrive/SegmentedVideoAT')
pure_val_folder = os.path.join(base_dir, '/content/drive/MyDrive/SegmentedVideoPV')
adulterated_val_folder = os.path.join(base_dir, '/content/drive/MyDrive/SegmentedVideoAV')
test_folder = os.path.join(base_dir, '/content/drive/MyDrive/SegmentedTest')

# Create data pool for video paths
video_paths = {
    'pure_train': [],
    'adulterated_train': [],
    'pure_val': [],
    'adulterated_val': [],
    'test': []
}

# Populate video paths
for folder, key in [
    (pure_train_folder, 'pure_train'),
    (adulterated_train_folder, 'adulterated_train'),
    (pure_val_folder, 'pure_val'),
    (adulterated_val_folder, 'adulterated_val'),
    (test_folder, 'test')
]:
    if os.path.exists(folder):
        for file in os.listdir(folder):
            if file.lower().endswith(('.mp4', '.avi', '.mov')):
                video_paths[key].append(os.path.join(folder, file))
    else:
        print(f"Folder {folder} not found")

# Load pre-trained ViT model and modify to remove classification head
model = models.vit_b_16(weights=models.ViT_B_16_Weights.IMAGENET1K_V1)
model.eval()

# Remove the classification head to use the transformer output
model.heads = nn.Identity()  # Replace the head with an identity function to get the [CLS] token

# Define preprocessing transformations (adjusted for ViT input)
preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),  # ViT-B/16 expects 224x224 input
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet normalization
])

# Function to extract frames and features from a video
def extract_frames_and_features(video_path, num_frames=10):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return None, None

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Processing video: {video_path}, Total frames: {total_frames}")
    if total_frames == 0:
        print("No frames in video.")
        cap.release()
        return None, None

    frame_indices = [int(i * total_frames / num_frames) for i in range(num_frames)]
    if frame_indices and frame_indices[-1] >= total_frames:
        frame_indices[-1] = total_frames - 1
    elif not frame_indices and total_frames > 0:
        frame_indices = [0]

    frames, features = [], []
    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            input_tensor = preprocess(frame_rgb).unsqueeze(0)
            with torch.no_grad():
                feature = model(input_tensor).squeeze().numpy()  # 768-dimensional [CLS] token feature per frame
            frames.append(frame_rgb)
            features.append(feature)
        else:
            print(f"Failed to read frame {idx} from video {video_path}")

    cap.release()
    print(f"Extracted {len(features)} features from {video_path}")
    return frames, features

# Function to average features across frames
def average_features(features):
    if not features:
        return None
    feature_matrix = np.array(features)  # Shape should be (num_frames, feature_dim)
    averaged_feature = np.mean(feature_matrix, axis=0)  # Average across frames
    return averaged_feature

# Define sigmoid layer for binary classification
class SigmoidClassifier(nn.Module):
    def __init__(self, input_dim):
        super(SigmoidClassifier, self).__init__()
        self.linear = nn.Linear(input_dim, 1)  # Single output for binary classification

    def forward(self, x):
        x = self.linear(x)
        return x

# Prepare training data
train_data = []
train_labels = []
print("Preparing training data...")
for label, paths in [('pure', video_paths['pure_train']), ('adulterated', video_paths['adulterated_train'])]:
    print(f"Processing {label} training videos...")
    for video_path in paths:
        frames, features = extract_frames_and_features(video_path)
        if features:
            averaged_feature = average_features(features)
            train_data.append(averaged_feature)
            train_labels.append(0 if label == 'pure' else 1)
    print(f"Finished processing {label} training videos.")

print(f"Number of training data samples: {len(train_data)}")
if not train_data:
    print("No training data collected. Exiting.")
else:
    train_data = np.array(train_data)
    train_labels = np.array(train_labels)

    # Normalize training data
    # train_data = (train_data - train_data.min()) / (train_data.max() - train_data.min() + 1e-8)
    # print(f"Train data shape after normalization: {train_data.shape}")
    # print(f"Train data min: {train_data.min()}, max: {train_data.max()}")

    # Initialize and train the classifier
    input_dim = train_data.shape[1]
    classifier = SigmoidClassifier(input_dim)
    criterion = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001)

    # Convert data to tensors
    train_data_tensor = torch.FloatTensor(train_data)
    train_labels_tensor = torch.FloatTensor(train_labels).reshape(-1, 1)

    # Training loop
    num_epochs = 200
    for epoch in range(num_epochs):
        optimizer.zero_grad()
        outputs = classifier(train_data_tensor)
        loss = criterion(outputs, train_labels_tensor)
        loss.backward()
        optimizer.step()
        if (epoch + 1) % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

    # Validation
    val_data = []
    val_labels = []
    print("Preparing validation data...")
    for label, paths in [('pure', video_paths['pure_val']), ('adulterated', video_paths['adulterated_val'])]:
        print(f"Processing {label} validation videos...")
        for video_path in paths:
            frames, features = extract_frames_and_features(video_path)
            if features:
                averaged_feature = average_features(features)
                val_data.append(averaged_feature)
                val_labels.append(0 if label == 'pure' else 1)
        print(f"Finished processing {label} validation videos.")

    print(f"Number of validation data samples: {len(val_data)}")
    if not val_data:
        print("No validation data collected. Skipping validation.")
    else:
        val_data = np.array(val_data)
        val_labels = np.array(val_labels)
        # val_data = (val_data - val_data.min()) / (val_data.max() - val_data.min() + 1e-8)
        # print(f"Validation data shape after normalization: {val_data.shape}")
        # print(f"Validation data min: {val_data.min()}, max: {val_data.max()}")
        val_data_tensor = torch.FloatTensor(val_data)
        val_labels_tensor = torch.FloatTensor(val_labels).reshape(-1, 1)

        with torch.no_grad():
            val_outputs = classifier(val_data_tensor)
            val_loss = criterion(val_outputs, val_labels_tensor)
            val_accuracy = ((torch.sigmoid(val_outputs).round() == val_labels_tensor).float().mean()).item()
            print(f'Validation Loss: {val_loss.item():.4f}, Accuracy: {val_accuracy:.4f}')

    # Test prediction
    print("Performing test predictions...")
    for video_path in video_paths['test'][:12]:
        frames, features = extract_frames_and_features(video_path)
        if features:
            averaged_feature = average_features(features)
            test_data_tensor = torch.FloatTensor(averaged_feature).reshape(1, -1)
            with torch.no_grad():
                prediction = torch.sigmoid(classifier(test_data_tensor))
                predicted_label = 0 if prediction.item() < 0.5 else 1
                probability = prediction.item()
            video_name = os.path.splitext(os.path.basename(video_path))[0]
            print(f'Test video {video_name}: Predicted label {predicted_label} (0=Pure, 1=Adulterated), Probability: {probability:.4f}')
        else:
            video_name = os.path.splitext(os.path.basename(video_path))[0]
            print(f'Could not extract features for test video {video_name}. Skipping prediction.')

    print("Feature extraction, training, validation, and testing completed.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Downloading: "https://download.pytorch.org/models/vit_b_16-c867db91.pth" to /root/.cache/torch/hub/checkpoints/vit_b_16-c867db91.pth
100%|██████████| 330M/330M [00:05<00:00, 61.6MB/s]


Preparing training data...
Processing pure training videos...
Processing video: /content/drive/MyDrive/SegmentedVideoPT/cropped_35-60 (24).avi, Total frames: 490
Extracted 10 features from /content/drive/MyDrive/SegmentedVideoPT/cropped_35-60 (24).avi
Processing video: /content/drive/MyDrive/SegmentedVideoPT/cropped_65-90 (24).avi, Total frames: 450
Extracted 10 features from /content/drive/MyDrive/SegmentedVideoPT/cropped_65-90 (24).avi
Processing video: /content/drive/MyDrive/SegmentedVideoPT/cropped_95-110 (23).avi, Total frames: 490
Extracted 10 features from /content/drive/MyDrive/SegmentedVideoPT/cropped_95-110 (23).avi
Processing video: /content/drive/MyDrive/SegmentedVideoPT/cropped_35-60 (23).avi, Total frames: 450
Extracted 10 features from /content/drive/MyDrive/SegmentedVideoPT/cropped_35-60 (23).avi
Processing video: /content/drive/MyDrive/SegmentedVideoPT/cropped_65-90 (23).avi, Total frames: 450
Extracted 10 features from /content/drive/MyDrive/SegmentedVideoPT/cropped_6

In [None]:
 from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Import required libraries
import torch
import torch.nn as nn
from torchvision import transforms
import cv2
import os
import numpy as np
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Define base directories and folder names
base_dir = '/content/drive/MyDrive'
pure_train_folder = os.path.join(base_dir, 'SegmentedVideoPT')
adulterated_train_folder = os.path.join(base_dir, 'SegmentedVideoAT')
pure_val_folder = os.path.join(base_dir, 'SegmentedVideoPV')
adulterated_val_folder = os.path.join(base_dir, 'SegmentedVideoAV')
test_folder = os.path.join(base_dir, 'SegmentedTest')

# Create data pool for video paths
video_paths = {
    'pure_train': [],
    'adulterated_train': [],
    'pure_val': [],
    'adulterated_val': [],
    'test': []
}

# Populate video paths
for folder, key in [
    (pure_train_folder, 'pure_train'),
    (adulterated_train_folder, 'adulterated_train'),
    (pure_val_folder, 'pure_val'),
    (adulterated_val_folder, 'adulterated_val'),
    (test_folder, 'test')
]:
    if os.path.exists(folder):
        for file in os.listdir(folder):
            if file.lower().endswith(('.mp4', '.avi', '.mov')):
                video_paths[key].append(os.path.join(folder, file))
    else:
        print(f"Folder {folder} not found")

# Define a simple 3D CNN model
class Simple3DCNN(nn.Module):
    def __init__(self, num_frames=10):
        super(Simple3DCNN, self).__init__()
        self.num_frames = num_frames
        self.conv1 = nn.Conv3d(3, 32, kernel_size=(3, 3, 3), padding=(1, 1, 1))
        self.pool1 = nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2))
        self.conv2 = nn.Conv3d(32, 64, kernel_size=(3, 3, 3), padding=(1, 1, 1))
        self.pool2 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2))
        self.conv3 = nn.Conv3d(64, 128, kernel_size=(3, 3, 3), padding=(1, 1, 1))
        self.pool3 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2))
        self.fc1 = nn.Linear(128 * (num_frames // 4) * (224 // 8) * (224 // 8), 512)
        self.fc2 = nn.Linear(512, 1)  # Output logit for binary classification
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool1(self.relu(self.conv1(x)))
        x = self.pool2(self.relu(self.conv2(x)))
        x = self.pool3(self.relu(self.conv3(x)))
        x = x.view(x.size(0), -1)  # Flatten
        x = self.dropout(self.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

# Initialize the 3D CNN model
model = Simple3DCNN(num_frames=10)
model = model.cuda() if torch.cuda.is_available() else model  # Move to GPU if available
model.train()  # Set to training mode for initialization

# Define preprocessing transformations for 3D CNN
preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),  # 3D CNN expects 224x224 spatial input
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet-like normalization
])

# Function to extract frames and prepare video clip for 3D CNN
def extract_video_clip(video_path, num_frames=10):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return None

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Processing video: {video_path}, Total frames: {total_frames}")
    if total_frames == 0:
        print("No frames in video.")
        cap.release()
        return None

    frame_indices = [int(i * total_frames / num_frames) for i in range(num_frames)]
    if frame_indices and frame_indices[-1] >= total_frames:
        frame_indices[-1] = total_frames - 1
    elif not frame_indices and total_frames > 0:
        frame_indices = [0]


    clip = []
    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            clip.append(preprocess(frame_rgb))
        else:
            print(f"Failed to read frame {idx} from video {video_path}")

    cap.release()
    if len(clip) < num_frames:
        # Pad with last frame if needed
        if clip: # Check if clip is not empty
            clip += [clip[-1]] * (num_frames - len(clip))
        else: # If clip is empty, return None
            return None


    # Shape: (T, C, H, W) to (C, T, H, W)
    return torch.stack(clip).permute(1, 0, 2, 3)


# Prepare training data and train the model
from torch.utils.data import DataLoader, TensorDataset

# Prepare data in batches
train_clips = []
train_labels_list = []
for label, paths in [('pure', video_paths['pure_train']), ('adulterated', video_paths['adulterated_train'])]:
    print(f"Processing {label} training videos...")
    for video_path in paths:
        video_clip = extract_video_clip(video_path, num_frames=10)
        if video_clip is not None:
            train_clips.append(video_clip)
            train_labels_list.append(0 if label == 'pure' else 1)
    print(f"Finished processing {label} training videos.")

print(f"Number of training data samples: {len(train_clips)}")
if not train_clips:
    print("No training data collected. Exiting.")
else:
    train_dataset = TensorDataset(torch.stack(train_clips), torch.FloatTensor(train_labels_list).reshape(-1, 1))
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)

    # Train the model
    criterion = nn.BCEWithLogitsLoss().cuda() if torch.cuda.is_available() else nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    num_epochs = 6
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for batch_clips, batch_labels in train_loader:
            batch_clips = batch_clips.cuda() if torch.cuda.is_available() else batch_clips
            batch_labels = batch_labels.cuda() if torch.cuda.is_available() else batch_labels
            optimizer.zero_grad()
            outputs = model(batch_clips)
            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        if (epoch + 1) % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {total_loss / len(train_loader):.4f}')

    # Validation
    val_data = []
    val_labels = []
    print("Preparing validation data...")
    for label, paths in [('pure', video_paths['pure_val']), ('adulterated', video_paths['adulterated_val'])]:
        print(f"Processing {label} validation videos...")
        for video_path in paths:
            video_clip = extract_video_clip(video_path, num_frames=10)
            if video_clip is not None:
                val_data.append(video_clip)
                val_labels.append(0 if label == 'pure' else 1)
        print(f"Finished processing {label} validation videos.")

    print(f"Number of validation data samples: {len(val_data)}")
    if not val_data:
        print("No validation data collected. Skipping validation.")
    else:
        val_labels_tensor = torch.FloatTensor(val_labels).reshape(-1, 1).cuda() if torch.cuda.is_available() else torch.FloatTensor(val_labels).reshape(-1, 1)

        with torch.no_grad():
            model.eval()
            val_outputs = model(torch.stack(val_data).cuda() if torch.cuda.is_available() else torch.stack(val_data)) # Pass the stacked video clips to the model
            val_loss = criterion(val_outputs, val_labels_tensor)
            val_accuracy = ((torch.sigmoid(val_outputs).round() == val_labels_tensor).float().mean()).item()
            print(f'Validation Loss: {val_loss.item():.4f}, Accuracy: {val_accuracy:.4f}')

    # Test prediction
    print("Performing test predictions...")
    for video_path in video_paths['test'][:4]:
        video_clip = extract_video_clip(video_path, num_frames=10)
        if video_clip is not None:
            test_data_tensor = video_clip.unsqueeze(0).cuda() if torch.cuda.is_available() else video_clip.unsqueeze(0)
            with torch.no_grad():
                model.eval()
                prediction = torch.sigmoid(model(test_data_tensor))
                predicted_label = 0 if prediction.item() < 0.5 else 1
                probability = prediction.item()
            video_name = os.path.splitext(os.path.basename(video_path))[0]
            print(f'Test video {video_name}: Predicted label {predicted_label} (0=Pure, 1=Adulterated), Probability: {probability:.4f}')
        else:
            video_name = os.path.splitext(os.path.basename(video_path))[0]
            print(f'Could not extract features for test video {video_name}. Skipping prediction.')

    print("Feature extraction, training, validation, and testing completed.")

In [None]:
# Uninstall existing PyTorch and torchvision
!pip uninstall torch torchvision -y

# Install specific versions known for better stability
!pip install torch==1.13.1+cu116 torchvision==0.14.1+cu116 --extra-index-url https://download.pytorch.org/whl/cu116