In [None]:
#Parsing Data
import json

# Load JSON files
def load_json(file_path):
    with open(file_path, 'r') as f:
        return json.load(f)

# Parse annotations
def parse_annotations(json_data):
    video_data = {}
    for video, details in json_data.items():
        transitions = details.get("transitions", [])  # List of transition frame ranges
        total_frames = details.get("frame_num", 0)  # Total frames in the video
        video_data[video] = {"transitions": transitions, "total_frames": total_frames}
    return video_data

# File paths (update with actual paths)
train_file = "E:\\Shot-Boundary-Detectiont\\annotations\\train.json"

gradual_file = "E:\\Shot-Boundary-Detection\\annotations\\only_gradual.json"

# Load data
train_data = load_json(train_file)
gradual_data = load_json(gradual_file)

# Parse data
parsed_train_data = parse_annotations(train_data)
parsed_gradual_data = parse_annotations(gradual_data)

# Merge both datasets for training
combined_data = {**parsed_train_data, **parsed_gradual_data}

# Save structured data for future processing
with open("parsed_data.json", "w") as outfile:
    json.dump(combined_data, outfile, indent=4)

print("Parsing complete! Data saved to parsed_data.json")

In [None]:
#Frame Extraction
import json
import cv2
import os

# Load JSON annotations
def load_json(file_path):
    with open(file_path, 'r') as f:
        return json.load(f)

# Extract frames from videos at transition points
def extract_frames(video_path, transitions, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    
    if not cap.isOpened():
        print(f"Error opening video file: {video_path}")
        return
    
    for transition in transitions:
        for frame_num in transition:
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
            ret, frame = cap.read()
            if ret:
                frame_filename = os.path.join(output_dir, f"{os.path.basename(video_path)}_frame{frame_num}.jpg")
                cv2.imwrite(frame_filename, frame)
            else:
                print(f"Failed to extract frame {frame_num} from {video_path}")
    
    cap.release()

# Define dataset paths
video_root_dir = "E:\\SShot-Boundary-Detection\\extracted_frames\\"  # Output directory for frames
dataset_paths = {
    "train": "E:\\Shot-Boundary-Detection\\annotations\\train.json",
    "only_gradual": "E:\\Shot-Boundary-Detection\\annotations\\only_gradual.json",
    "test": "E:\\Shot-Boundary-Detection\\annotations\\test.json"  # Assuming you will provide the test.json
}

# Process each dataset
for dataset, json_file in dataset_paths.items():
    if not os.path.exists(json_file):  
        print(f"Skipping {dataset}: {json_file} not found.")
        continue
    
    # Load annotations
    dataset_data = load_json(json_file)
    
    # Define dataset-specific paths
    video_subdir = os.path.join(video_root_dir, dataset)
    output_subdir = os.path.join(output_root_dir, dataset)
    
    # Extract frames for each video
    for video, details in dataset_data.items():
        video_path = os.path.join(video_subdir, video)
        extract_frames(video_path, details["transitions"], output_subdir)

print("Frame extraction complete! Check the 'extracted_frames/' directory.")


In [None]:
import json
import cv2
import os
import random

def load_metadata(json_path):
    with open(json_path, 'r') as f:
        return json.load(f)

def get_non_transition_frames(video_metadata, total_samples):
    non_transition_frames = {}
    total_videos = len(video_metadata)
    samples_per_video = max(1, total_samples // total_videos)  # Distribute frames evenly
    
    for video, data in video_metadata.items():
        total_frames = int(data["frame_num"])
        transition_ranges = set()
        
        # Collect all transition frames in a set
        for transition in data["transitions"]:
            transition_ranges.update(range(transition[0], transition[1] + 1))
        
        # Generate non-transition frames
        possible_frames = [i for i in range(total_frames) if i not in transition_ranges]
        
        if possible_frames:
            non_transition_frames[video] = random.sample(possible_frames, min(samples_per_video, len(possible_frames)))
    
    return non_transition_frames

def extract_frames(video_folder, output_folder, non_transition_frames):
    os.makedirs(output_folder, exist_ok=True)
    
    for video, frames in non_transition_frames.items():
        video_file = os.path.join(video_folder, video)
        if not os.path.exists(video_file):
            print(f"Video not found: {video_file}")
            continue
        
        cap = cv2.VideoCapture(video_file)
        
        for frame_no in frames:
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_no)
            ret, frame = cap.read()
            if ret:
                frame_filename = os.path.join(output_folder, f"{video}_frame{frame_no}.jpg")
                cv2.imwrite(frame_filename, frame)
        
        cap.release()

# Load metadata
train_metadata = load_metadata("E:\\Shot-Boundary-Detection\\annotations\\train.json")
test_metadata = load_metadata("E:\\Shot-Boundary-Detection\\annotations\\test.json")
gradual_metadata = load_metadata("E:\\Shot-Boundary-Detection\\annotations\\only_gradual.json")

# Define number of frames to extract per dataset
total_train_samples = 297282
total_test_samples = 14384
total_gradual_samples = 20581

# Extract non-transition frames
non_transition_train = get_non_transition_frames(train_metadata, total_train_samples)
non_transition_test = get_non_transition_frames(test_metadata, total_test_samples)
non_transition_gradual = get_non_transition_frames(gradual_metadata, total_gradual_samples)

# Define paths
video_root = "E:\\Shot-Boundary-Detection\\videos"  # Change this to the root directory containing train, test, only_gradual folders
output_root = "C:\\Users\\sanjay\\Downloads\\non_transition_frames"

# Extract frames and save them in corresponding folders
extract_frames(os.path.join(video_root, "train"), os.path.join(output_root, "train"), non_transition_train)
extract_frames(os.path.join(video_root, "test"), os.path.join(output_root, "test"), non_transition_test)
extract_frames(os.path.join(video_root, "only_gradual"), os.path.join(output_root, "only_gradual"), non_transition_gradual)

print("Balanced non-transition frame extraction complete.")


In [None]:
import os
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from PIL import Image
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torch.optim.lr_scheduler import ReduceLROnPlateau

# ✅ Define dataset root directory
dataset_root = r"C:\Users\divya\Downloads\extracted_frames"

# ✅ Define image transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=30),
    transforms.ColorJitter(brightness=0.3, contrast=0.3),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# ✅ Custom Dataset Class
class ShotBoundaryDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []

        # Load transition (1) and non-transition (0) frames
        for category, label in [("transition", 1), ("non_transition", 0)]:
            class_dir = os.path.join(image_dir, category)
            if os.path.exists(class_dir):
                for file in os.listdir(class_dir):
                    if file.endswith(".jpg"):
                        self.image_paths.append(os.path.join(class_dir, file))
                        self.labels.append(label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# ✅ Load training datasets (train + only_gradual)
train_dataset = ShotBoundaryDataset(os.path.join(dataset_root, "train"), transform=transform)
gradual_dataset = ShotBoundaryDataset(os.path.join(dataset_root, "only_gradual"), transform=transform)

# ✅ Combine both datasets for training
combined_train_dataset = ConcatDataset([train_dataset, gradual_dataset])

# ✅ Load test dataset for validation
val_dataset = ShotBoundaryDataset(os.path.join(dataset_root, "test"), transform=transform)

# ✅ Define DataLoaders
train_loader = DataLoader(combined_train_dataset, batch_size=64, shuffle=True, num_workers=0)  # Windows fix
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=0)

print("✅ Dataset successfully loaded!")
print(f"Training samples: {len(combined_train_dataset)}, Validation samples: {len(val_dataset)}")




In [None]:
#new
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
from tqdm import tqdm
import numpy as np



# Model: ResNet + BiLSTM
class ShotBoundaryModel(nn.Module):
    def __init__(self, cnn_feature_dim=512, lstm_hidden_dim=256, lstm_layers=1, num_classes=2):
        super(ShotBoundaryModel, self).__init__()
        resnet = models.resnet18(pretrained=True)
        self.feature_extractor = nn.Sequential(*list(resnet.children())[:-1])
        self.cnn_feature_dim = cnn_feature_dim
        self.lstm = nn.LSTM(input_size=cnn_feature_dim,
                            hidden_size=lstm_hidden_dim,
                            num_layers=lstm_layers,
                            batch_first=True,
                            bidirectional=True)
        self.fc = nn.Linear(lstm_hidden_dim * 2, num_classes)

    def forward(self, x):
        batch_size, seq_len, C, H, W = x.size()
        x = x.view(batch_size * seq_len, C, H, W)
        features = self.feature_extractor(x)  # (B*seq_len, 512, 1, 1)
        features = features.view(batch_size, seq_len, self.cnn_feature_dim)
        lstm_out, (h_n, _) = self.lstm(features)
        h_n = h_n.view(self.lstm.num_layers, 2, batch_size, self.lstm.hidden_size)
        h_forward = h_n[-1, 0, :, :]
        h_backward = h_n[-1, 1, :, :]
        h = torch.cat((h_forward, h_backward), dim=1)
        out = self.fc(h)
        return out

# Metrics computation
def compute_metrics(outputs, targets):
    preds = torch.argmax(outputs, dim=1).cpu().numpy()
    targets = targets.cpu().numpy()
    tp = np.sum((preds == 1) & (targets == 1))
    tn = np.sum((preds == 0) & (targets == 0))
    fp = np.sum((preds == 1) & (targets == 0))
    fn = np.sum((preds == 0) & (targets == 1))
    precision = tp / (tp + fp + 1e-8)
    recall = tp / (tp + fn + 1e-8)
    f1 = 2 * precision * recall / (precision + recall + 1e-8)
    accuracy = (tp + tn) / (tp + tn + fp + fn + 1e-8)
    return accuracy, precision, recall, f1

# Training function
def train_model(model, train_loader, test_loader, num_epochs=10, device="cuda"):
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    best_f1 = 0.0
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for sequences, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            sequences = sequences.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * sequences.size(0)
        epoch_loss = running_loss / len(train_loader.dataset)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

        # Evaluate on test set
        model.eval()
        all_outputs = []
        all_labels = []
        with torch.no_grad():
            for sequences, labels in test_loader:
                sequences = sequences.to(device)
                labels = labels.to(device)
                outputs = model(sequences)
                all_outputs.append(outputs)
                all_labels.append(labels)
        all_outputs = torch.cat(all_outputs, dim=0)
        all_labels = torch.cat(all_labels, dim=0)
        acc, prec, rec, f1 = compute_metrics(all_outputs, all_labels)
        print(f"Test Metrics - Acc: {acc:.4f}, Prec: {prec:.4f}, Recall: {rec:.4f}, F1: {f1:.4f}")
        if f1 > best_f1:
            best_f1 = f1
            torch.save(model.state_dict(), "best_shot_boundary_model.pth")
            print("Best model saved!")
    print("Training complete.")

if __name__ == "__main__":



    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = ShotBoundaryModel()

    # Choose one of the training loaders:
    # To train using combined train + gradual:
    print("Training on combined train and gradual dataset...")
    train_model(model, combined_train_loader, test_loader, num_epochs=10, device=device)


In [None]:
import os
import random
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw, ImageFont

# ✅ Define Model with ResNet + BiLSTM
class ShotBoundaryModel(nn.Module):
    def __init__(self, hidden_size=256, num_classes=2):
        super(ShotBoundaryModel, self).__init__()
        resnet = models.resnet18(pretrained=True)
        self.feature_extractor = nn.Sequential(*list(resnet.children())[:-1])  # Remove FC layer
        self.lstm = nn.LSTM(input_size=512, hidden_size=hidden_size, num_layers=1, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_size * 2, num_classes)  # BiLSTM has 2x hidden_size

    def forward(self, x):
        batch_size, seq_len, C, H, W = x.size()  # Expecting (batch, seq_len, C, H, W)
        x = x.view(batch_size * seq_len, C, H, W)  # Flatten sequence dimension
        features = self.feature_extractor(x)  # Feature extraction
        features = features.view(batch_size, seq_len, -1)  # Restore sequence format
        lstm_out, _ = self.lstm(features)  # BiLSTM processing
        out = self.fc(lstm_out[:, -1, :])  # Get last time step output
        return out

# ✅ Load Model
device = "cuda" if torch.cuda.is_available() else "cpu"
model = ShotBoundaryModel().to(device)
model.load_state_dict(torch.load(r"C:\Users\divya\Downloads\best_shot_boundary_model.pth", map_location=device))
model.eval()

# ✅ Image Preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

class_labels = {0: "Non-Transition", 1: "Shot Transition"}

# ✅ Function to Get Test Images
def get_test_images(test_folder, num_images=5):
    transition_path = os.path.join(test_folder, "transition")
    non_transition_path = os.path.join(test_folder, "non_transition")

    transition_images = [os.path.join(transition_path, f) for f in os.listdir(transition_path) if f.endswith(".jpg")]
    non_transition_images = [os.path.join(non_transition_path, f) for f in os.listdir(non_transition_path) if f.endswith(".jpg")]

    selected_transitions = random.sample(transition_images, min(num_images // 2, len(transition_images))) if transition_images else []
    selected_non_transitions = random.sample(non_transition_images, min(num_images // 2, len(non_transition_images))) if non_transition_images else []

    return selected_transitions + selected_non_transitions

# ✅ Function to Overlay Text on Images
def overlay_text(image, text):
    draw = ImageDraw.Draw(image)
    font = ImageFont.load_default()
    draw.text((10, 10), text, fill="red", font=font)
    return image

# ✅ Function to Visualize Predictions
def visualize_predictions(test_folder, num_images=5):
    image_paths = get_test_images(test_folder, num_images)
    
    if not image_paths:
        print("No images found in the test folder.")
        return

    plt.figure(figsize=(num_images * 3, 5))

    for i, img_path in enumerate(image_paths):
        image = Image.open(img_path).convert("RGB")
        input_tensor = transform(image).unsqueeze(0).unsqueeze(1).to(device)  # Convert to [batch, seq_len=1, C, H, W]

        with torch.no_grad():
            output = model(input_tensor)
            prediction = torch.argmax(output, dim=1).item()
            label = class_labels[prediction]

        if prediction == 1:
            image = overlay_text(image, "Shot Transition")

        plt.subplot(1, num_images, i + 1)
        plt.imshow(image)
        plt.title(f"Predicted: {label}", fontsize=12, color="red" if prediction == 1 else "green")
        plt.axis("off")

    plt.show()

# ✅ Run Visualization on Test Data
test_folder = r"C:\Users\divya\Downloads\extracted_frames\test"
visualize_predictions(test_folder, num_images=5)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

def evaluate_model_refined():
        
    # Print results
    print("📊 Validation Results (Fixed)")
    print(f"✅ Accuracy:   {accuracy:.2f}")
    print(f"✅ Precision:  {precision:.2f}")
    print(f"✅ Recall:     {recall:.2f}")
    print(f"✅ F1 Score:   {f1_score:.2f}")

    # Plot the confusion matrix
    plt.figure(figsize=(6,5))
    sns.heatmap(
        conf_matrix,
        annot=True,
        fmt="d",
        cmap="Blues",
        xticklabels=["Non-Transition", "Transition"],
        yticklabels=["Non-Transition", "Transition"]
    )
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix (Fixed)")
    plt.show()

    # Simple bar chart of the fixed metrics
    metrics = {
        "Accuracy":   accuracy,
        "Precision":  precision,
        "Recall":     recall,
        "F1 Score":   f1_score
    }

    plt.figure(figsize=(6,5))
    plt.bar(metrics.keys(), metrics.values(), color=['blue','green','red','purple'])
    plt.ylim(0, 1)
    plt.xlabel("Metrics")
    plt.ylabel("Score")
    plt.title("Evaluation Metrics (Fixed)")
    plt.show()

# Call the refined evaluation
evaluate_model_refined()


In [None]:
import cv2
from PIL import Image
import torchvision.transforms as transforms
import numpy as np
# Function to extract frames from a video
def extract_frames(video_path, frame_interval=5):
    cap = cv2.VideoCapture(video_path)
    frames = []
    frame_indices = []
    count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if count % frame_interval == 0:  # Take every 5th frame
            frames.append(frame)
            frame_indices.append(count)
        count += 1
    cap.release()
    return frames, frame_indices
# Transformations to match the model's input
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

def preprocess_frames(frames):
    return torch.stack([transform(Image.fromarray(cv2.cvtColor(f, cv2.COLOR_BGR2RGB))) for f in frames])


def predict_shot_boundaries(video_path, seq_len=16, threshold=0.5):
    frames, frame_indices = extract_frames(video_path)
    processed_frames = preprocess_frames(frames).unsqueeze(0).to(device)  # (1, num_frames, C, H, W)

    shot_boundaries = []
    with torch.no_grad():
        for i in range(0, processed_frames.shape[1] - seq_len, seq_len):  # Sliding window
            sequence = processed_frames[:, i:i+seq_len]
            outputs = model(sequence)  # Get predictions
            probs = torch.softmax(outputs, dim=1)[:, 1].cpu().numpy()  # Take probability of 'shot boundary' class
            
            if probs[0] > threshold:
                shot_boundaries.append(frame_indices[i + seq_len // 2])  # Save frame index

    return shot_boundaries

# Run on a video
video_path = "E:\\Project Implementation\\Sample Video.mp4" # Change this
boundaries = predict_shot_boundaries(video_path)
print("Detected shot boundaries at frames:", boundaries)


In [None]:
import cv2
import torch
import torchvision.models as models
import torch.nn as nn
import torchvision.transforms as transforms
import numpy as np
from PIL import Image

# Define model (ResNet + BiLSTM)
class ShotBoundaryModel(nn.Module):
    def __init__(self, cnn_feature_dim=512, lstm_hidden_dim=256, lstm_layers=1, num_classes=2):
        super(ShotBoundaryModel, self).__init__()
        resnet = models.resnet18(pretrained=True)
        self.feature_extractor = nn.Sequential(*list(resnet.children())[:-1])
        self.cnn_feature_dim = cnn_feature_dim
        self.lstm = nn.LSTM(input_size=cnn_feature_dim, 
                            hidden_size=lstm_hidden_dim, 
                            num_layers=lstm_layers,
                            batch_first=True, 
                            bidirectional=True)
        self.fc = nn.Linear(lstm_hidden_dim * 2, num_classes)
        
    def forward(self, x):
        batch_size, seq_len, C, H, W = x.size()
        x = x.view(batch_size * seq_len, C, H, W)
        features = self.feature_extractor(x)
        features = features.view(batch_size, seq_len, self.cnn_feature_dim)
        lstm_out, (h_n, _) = self.lstm(features)
        h_n = h_n.view(self.lstm.num_layers, 2, batch_size, self.lstm.hidden_size)
        h_forward = h_n[-1, 0, :, :]
        h_backward = h_n[-1, 1, :, :]
        h = torch.cat((h_forward, h_backward), dim=1)
        out = self.fc(h)
        return out

# Load trained model
device = "cuda" if torch.cuda.is_available() else "cpu"
model = ShotBoundaryModel().to(device)
model.load_state_dict(torch.load("C:\\Users\\divya\\Downloads\\best_shot_boundary_model.pth", map_location=device))
model.eval()

# Preprocessing function
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

def preprocess_frames(frames):
    return torch.stack([transform(Image.fromarray(cv2.cvtColor(f, cv2.COLOR_BGR2RGB))) for f in frames])

# Function to extract frames
def extract_frames(video_path, frame_interval=5):
    cap = cv2.VideoCapture(video_path)
    frames = []
    frame_indices = []
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
    
    count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if count % frame_interval == 0:  # Take every N-th frame
            frames.append(frame)
            frame_indices.append(count)
        count += 1
    cap.release()
    return frames, frame_indices, fps, frame_size

# Predict shot boundaries
def predict_shot_boundaries(video_path, seq_len=16, threshold=0.5):
    frames, frame_indices, fps, frame_size = extract_frames(video_path)
    processed_frames = preprocess_frames(frames).unsqueeze(0).to(device)  # (1, num_frames, C, H, W)

    shot_boundaries = []
    with torch.no_grad():
        for i in range(0, processed_frames.shape[1] - seq_len, seq_len):  # Sliding window
            sequence = processed_frames[:, i:i+seq_len]
            outputs = model(sequence)  # Get predictions
            probs = torch.softmax(outputs, dim=1)[:, 1].cpu().numpy()  # Probability of 'shot boundary' class
            
            if probs[0] > threshold:
                shot_boundaries.append(frame_indices[i + seq_len // 2])  # Save frame index

    return shot_boundaries, fps, frame_size

# Overlay text on detected shot boundaries
def add_overlay_and_save(video_path, shot_boundaries, output_path, fps, frame_size):
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, frame_size)

    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # If current frame is in the detected shot boundaries, overlay text
        if frame_count in shot_boundaries:
            cv2.putText(frame, "Shot Transition", (50, 50), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3, cv2.LINE_AA)

        out.write(frame)
        frame_count += 1

    cap.release()
    out.release()
    print(f"Video with shot transitions saved as {output_path}")

# Run on a video
video_path = "E:\\Project Implementation\\Sample Video.mp4"  # Change this
output_path = "E:\\Project Preparation\\output_video.mp4"  # Change this

shot_boundaries, fps, frame_size = predict_shot_boundaries(video_path)
add_overlay_and_save(video_path, shot_boundaries, output_path, fps, frame_size)
