In [41]:
# Install Required Libraries (Run in Colab or a new environment)
!pip install torch transformers huggingface_hub opencv-python numpy ipywidgets scikit-learn




In [42]:
import torch
import torch.nn as nn
import numpy as np
import cv2
from transformers import AutoImageProcessor, AutoModelForImageClassification, pipeline
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import logging
from IPython.display import display
import ipywidgets as widgets

# Initialize logging for real-time feedback
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', force=True)


In [43]:
class Config:
    FRAME_SIZE = (224, 224)
    NUM_FRAMES = 32
    CNN_FEATURE_DIM = 512
    LSTM_HIDDEN_DIM = 256
    NUM_CLASSES = 2  # Binary classification (0 or 1)
    DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'


In [44]:
class DiffusionFeatureExtractor:
    def __init__(self, model_name="google/vit-base-patch16-224"):
        try:
            self.feature_extractor = AutoImageProcessor.from_pretrained(model_name, use_fast=True)
            self.model = AutoModelForImageClassification.from_pretrained(model_name).to(Config.DEVICE)
            logging.info("Diffusion model and feature extractor loaded successfully.")
        except Exception as e:
            logging.error("Error loading model:", e)
            raise

    def extract_features_with_reconstruction_error(self, frames):
        inputs = self.feature_extractor(images=frames, return_tensors="pt", do_rescale=False).to(Config.DEVICE)

        # Original frame features
        with torch.no_grad():
            original_features = self.model(**inputs).logits

        # Perturb frames and reprocess to simulate reconstruction
        reconstructed_frames = frames * 0.95
        reconstructed_inputs = self.feature_extractor(images=reconstructed_frames, return_tensors="pt", do_rescale=False).to(Config.DEVICE)

        with torch.no_grad():
            reconstructed_features = self.model(**reconstructed_inputs).logits
        reconstruction_error = (original_features - reconstructed_features).abs()

        # Flatten and interpolate directly to target dimensions
        reconstruction_error = reconstruction_error.flatten(start_dim=1)
        reconstruction_error = reconstruction_error.view(Config.NUM_FRAMES, 1, -1, 1)
        reconstruction_error = F.interpolate(reconstruction_error, size=(32, 32), mode='bilinear', align_corners=False)

        return reconstruction_error


In [45]:
def video_to_frames(video_path, frame_size=(224, 224), num_frames=32):
    cap = cv2.VideoCapture(video_path)
    frames = []
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if frame_count < num_frames:
        logging.warning(f"Video only has {frame_count} frames, expected {num_frames}.")

    while cap.isOpened() and len(frames) < num_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, frame_size)
        frames.append(frame)
    cap.release()
    frames = np.array(frames) / 255.0
    logging.info(f"Processed {len(frames)} frames from the video.")
    return frames


In [46]:
class CNNLSTMModel(nn.Module):
    def __init__(self):
        super(CNNLSTMModel, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(32 * 8 * 8, Config.CNN_FEATURE_DIM)
        self.lstm = nn.LSTM(Config.CNN_FEATURE_DIM, Config.LSTM_HIDDEN_DIM, batch_first=True)
        self.classifier = nn.Linear(Config.LSTM_HIDDEN_DIM, Config.NUM_CLASSES)  # Binary output

    def forward(self, x):
        batch_size, num_frames, channels, height, width = x.size()
        cnn_out = []
        for t in range(num_frames):
            frame_features = x[:, t]
            cnn_out.append(self.fc(self.flatten(self.cnn(frame_features))))
        cnn_out = torch.stack(cnn_out, dim=1)
        lstm_out, _ = self.lstm(cnn_out)
        final_output = self.classifier(lstm_out[:, -1, :])
        return final_output


In [47]:
def calculate_metrics(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    logging.info(f"Metrics - Accuracy: {accuracy:.2f}, Precision: {precision:.2f}, Recall: {recall:.2f}, F1 Score: {f1:.2f}")
    return accuracy, precision, recall, f1

def calculate_reconstruction_error(original_features, reconstructed_features):
    error = (original_features - reconstructed_features).abs().mean().item()
    return error


In [48]:
def run_pipeline(video_path, y_true):
    frames = video_to_frames(video_path)
    if len(frames) == 0:
        logging.error("Error: No frames were extracted from the video.")
        return

    diffusion_model = DiffusionFeatureExtractor()
    reconstruction_error_features = diffusion_model.extract_features_with_reconstruction_error(frames)
    reconstruction_error_features = reconstruction_error_features.unsqueeze(0).to(Config.DEVICE)

    cnn_lstm_model = CNNLSTMModel().to(Config.DEVICE)
    cnn_lstm_model.eval()
    with torch.no_grad():
        prediction = cnn_lstm_model(reconstruction_error_features)
    predicted_class = torch.argmax(prediction, dim=1).item()

    accuracy, precision, recall, f1 = calculate_metrics([y_true], [predicted_class])
    logging.info(f"Binary Classification Result: {predicted_class}")

    original_features = diffusion_model.model(**diffusion_model.feature_extractor(images=frames, return_tensors="pt", do_rescale=False)).logits
    reconstructed_frames = frames * 0.95
    reconstructed_features = diffusion_model.model(**diffusion_model.feature_extractor(images=reconstructed_frames, return_tensors="pt", do_rescale=False)).logits
    reconstruction_error = calculate_reconstruction_error(original_features, reconstructed_features)
    logging.info(f"Reconstruction Error (Anomaly Detection): {reconstruction_error:.4f}")

    return predicted_class, accuracy, precision, recall, f1, reconstruction_error


In [49]:
def initialize_captioning_model():
    return pipeline("image-to-text", model="Salesforce/blip-image-captioning-base")

def initialize_summarization_model():
    return pipeline("summarization", model="facebook/bart-large-cnn")

captioning_model = initialize_captioning_model()
summarization_model = initialize_summarization_model()




In [50]:
def generate_video_description(video_path, captioning_model, frame_sample_rate=10):
    cap = cv2.VideoCapture(video_path)
    descriptions = []
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    success, frame_id = True, 0

    while success:
        success, frame = cap.read()
        if frame_id % frame_sample_rate == 0 and success:
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            caption = captioning_model(frame_rgb)[0]["generated_text"]
            descriptions.append(caption)
        frame_id += 1

    cap.release()
    return descriptions


In [40]:
def summarize_descriptions(descriptions, summarization_model):
    """
    Summarize the descriptions of frames into an overall video summary.

    Args:
        descriptions (list): List of frame descriptions.
        summarization_model (pipeline): Hugging Face summarization pipeline.

    Returns:
        summary (str): Summarized description of the video content.
    """
    text = " ".join(descriptions)
    summary = summarization_model(
        text,
        max_length=50,  # Adjust based on the desired summary length
        min_length=20,  # Minimum length for summarization
        max_new_tokens=50,  # Controls the number of new tokens generated
        do_sample=False
    )[0]["summary_text"]
    return summary


In [51]:
def run_pipeline_with_description(video_path, y_true):
    # Run original inference
    predicted_class, accuracy, precision, recall, f1, reconstruction_error = run_pipeline(video_path, y_true)

    # Generate frame descriptions and a summary
    descriptions = generate_video_description(video_path, captioning_model)
    video_summary = summarize_descriptions(descriptions, summarization_model)

    # Output results
    print(f"Predicted Class: {predicted_class}")
    print(f"Metrics - Accuracy: {accuracy:.2f}, Precision: {precision:.2f}, Recall: {recall:.2f}, F1 Score: {f1:.2f}")
    print(f"Reconstruction Error: {reconstruction_error:.4f}")
    print("Frame Descriptions:", descriptions)
    print("Video Summary:", video_summary)

    return predicted_class, accuracy, precision, recall, f1, reconstruction_error, descriptions, video_summary


In [52]:
from PIL import Image  # Import PIL to handle image conversion

def generate_video_description(video_path, captioning_model, frame_sample_rate=10):
    """
    Generate descriptions for key frames in the video.

    Args:
        video_path (str): Path to the video file.
        captioning_model (pipeline): Hugging Face image captioning pipeline.
        frame_sample_rate (int): Interval of frames to sample. Higher values mean fewer sampled frames.

    Returns:
        descriptions (list): List of frame descriptions.
    """
    cap = cv2.VideoCapture(video_path)
    descriptions = []
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    success, frame_id = True, 0

    while success:
        success, frame = cap.read()
        if frame_id % frame_sample_rate == 0 and success:
            # Convert the frame to RGB and then to a PIL image
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(frame_rgb)

            # Generate a description for the frame
            caption = captioning_model(pil_image)[0]["generated_text"]
            descriptions.append(caption)
        frame_id += 1

    cap.release()
    return descriptions


In [54]:
def upload_and_predict_with_description(y_true=1):
    upload_button = widgets.FileUpload(accept=".mp4", multiple=False)
    display(upload_button)

    def on_upload_change(change):
        for name, file_info in upload_button.value.items():
            video_path = f'/content/{name}'
            with open(video_path, 'wb') as f:
                f.write(file_info['content'])

            logging.info("Running pipeline on uploaded video...")

            # Run the pipeline with description generation
            predicted_class, accuracy, precision, recall, f1, reconstruction_error, descriptions, video_summary = run_pipeline_with_description(video_path, y_true)

            # Display the results
            print(f"Predicted Class: {predicted_class}")
            print(f"Metrics - Accuracy: {accuracy:.2f}, Precision: {precision:.2f}, Recall: {recall:.2f}, F1 Score: {f1:.2f}")
            print(f"Reconstruction Error: {reconstruction_error:.4f}")
            print("Frame Descriptions:", descriptions)
            print("Video Summary:", video_summary)

    # Trigger the function when the file is uploaded
    upload_button.observe(on_upload_change, names='value')

# Run this function to display the upload button
upload_and_predict_with_description(y_true=1)  # Set y_true to the ground truth label of the uploaded video


FileUpload(value={}, accept='.mp4', description='Upload')

2024-10-31 05:06:11,793 - INFO - Running pipeline on uploaded video...
2024-10-31 05:06:11,934 - INFO - Processed 32 frames from the video.
2024-10-31 05:06:12,590 - INFO - Diffusion model and feature extractor loaded successfully.
2024-10-31 05:06:56,059 - INFO - Metrics - Accuracy: 1.00, Precision: 1.00, Recall: 1.00, F1 Score: 1.00
2024-10-31 05:06:56,060 - INFO - Binary Classification Result: 1
2024-10-31 05:07:39,070 - INFO - Reconstruction Error (Anomaly Detection): 0.0250
