# **Mount Google Drive**

In [1]:
'''from google.colab import drive
drive.mount('/content/drive')'''

"from google.colab import drive\ndrive.mount('/content/drive')"

In [2]:
dataset_path = '/Shot Detection/Datasets/RAIDataset'
video_dir = f"{dataset_path}/videos"
frames_dir = f"{dataset_path}/Frames"

In [3]:
# Checking the connection.
import os
print(os.listdir(video_dir)[:3])
print(os.listdir(frames_dir)[:3])

['7.mp4', '6.mp4', '4.mp4']
['scenes_8.txt', 'scenes_9.txt', 'scenes_10.txt']


# **Install Required Libraries**

In [4]:
%pip install tensorflow opencv-python numpy

Note: you may need to restart the kernel to use updated packages.


# **Load Video Paths**

In [5]:
def load_video_paths(video_dir):
  videos = []
  for file in os.listdir(video_dir):
    if file.endswith(".mp4"):
      videos.append(os.path.join(video_dir, file))
  # Sort the videos as the order should match the annotations.
  return sorted(videos)

all_videos = load_video_paths(video_dir)
print(all_videos[:3])

['/Users/rishabhmathur/Documents/Development/Machine Learning/Projects/Shot Detection/Datasets/RAIDataset/videos/1.mp4', '/Users/rishabhmathur/Documents/Development/Machine Learning/Projects/Shot Detection/Datasets/RAIDataset/videos/10.mp4', '/Users/rishabhmathur/Documents/Development/Machine Learning/Projects/Shot Detection/Datasets/RAIDataset/videos/2.mp4']


# **Load and Parse Annotations**

In [6]:
def load_annotations(ann_file):
  scenes = []
  with open(ann_file, 'r') as f:
    for line in f:
      start, end = map(int, line.strip().split())
      scenes.append((start, end))

  return scenes

def get_annotaion_file(video_path):
  video_num = os.path.splitext(os.path.basename(video_path))[0]
  return os.path.join(frames_dir, f"scenes_{video_num}.txt")

def is_shot_boundary(frame_idx, scenes):
  for start, _ in scenes:
    if frame_idx == start:
      return True
  return False

# Example Usage: where we are printing first 3 scenes of a video.
video_path = all_videos[0]
ann_file = get_annotaion_file(video_path)
scenes = load_annotations(ann_file)

print(scenes[:3])

[(1, 717), (718, 1590), (1591, 2333)]


# **Frame Extraction and Dataset Creation**

In [7]:
import cv2
import numpy as np
import tensorflow as tf

def extract_frames(video_path, sample_rate=1, target_size=(224, 224)):
  frames = []
  cap = cv2.VideoCapture(video_path)
  frame_count = 0

  while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
      break

    if frame_count % sample_rate == 0:
      frame = cv2.resize(frame, target_size)
      frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
      frames.append(frame)

    frame_count += 1

  cap.release()
  return np.array(frames) / 255.0

def create_tf_dataset(video_paths, seq_length=3, batch_size=8):
  def generator():
    for video_path in video_paths:
      frames = extract_frames(video_path)
      ann_file = get_annotaion_file(video_path)
      scenes = load_annotations(ann_file)

      for i in range(len(frames) - seq_length):
        # Extract sequence of frames from starting
        seq = frames[i:i+seq_length]
        mid_idx = i+seq_length // 2
        label = 1 if is_shot_boundary(mid_idx+1, scenes) else 0
        yield (seq, seq[seq_length // 2]), label

  # Defining the output types and shapes for the dataset
  output_types = ((tf.float32, tf.float32), tf.int32)
  output_shapes = (( (seq_length, 224, 224, 3), (224, 224, 3)), ())
  dataset = tf.data.Dataset.from_generator(generator, output_types, output_shapes)

  return dataset.batch(batch_size).prefetch(1)


# Splitting videos into:-
#  - Train
#  - Test
#  - Validation
from sklearn.model_selection import train_test_split

train_videos, test_videos = train_test_split(all_videos, test_size=0.2, random_state=42)
train_videos, val_videos = train_test_split(train_videos, test_size=0.25, random_state=42)

# Creating datasets:-
#   - Train
#   - Test
#   - Validation
train_dataset = create_tf_dataset(train_videos)
test_dataset = create_tf_dataset(test_videos)
val_dataset = create_tf_dataset(val_videos)

Instructions for updating:
Use output_signature instead
Instructions for updating:
Use output_signature instead


# **Model Architecture**

In [8]:
# Initialize and train the model
class PatchEmbedd(tf.keras.layers.Layer):
  def __init__(self, img_size, patch_size, embed_dim):
    super().__init__()
    self.proj = tf.keras.layers.Conv2D(embed_dim, patch_size, strides=patch_size)
    self.flatten = tf.keras.layers.Reshape(target_shape=(-1, embed_dim))

  def call(self, x):
    x = self.proj(x)
    x = self.flatten(x)
    return x

def vit_model(img_size=224, patch_size=16, embed_dim=768, depth=12, num_heads=12):
  inputs = tf.keras.Input(shape=(img_size, img_size, 3))
  patches = PatchEmbedd(img_size, patch_size, embed_dim)(inputs)

  num_patches = (img_size // patch_size) ** 2
  positions = tf.range(start=0, limit=num_patches, delta=1)
  pos_embed = tf.keras.layers.Embedding(input_dim=num_patches+1, output_dim=embed_dim)(positions)
  x = patches + pos_embed

  for _ in range(depth):
    x = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x)
    attn_output = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)(x,x)
    x = x + attn_output
    x = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x)
    x = tf.keras.layers.Dense(units=embed_dim*4, activation=tf.nn.gelu)(x)
    x = tf.keras.layers.Dense(units=embed_dim)(x)
    x = x + patches

  x = tf.keras.layers.GlobalAveragePooling1D()(x)
  return tf.keras.Model(inputs=inputs, outputs=x)

def create_combined_model(img_size=224, lstm_units=256, seq_length=3):
  vit = vit_model(img_size)
  vit.trainable = False

  frame_input = tf.keras.Input(shape=(img_size, img_size, 3))
  frame_features = vit(frame_input)

  sequence_input = tf.keras.Input(shape=(seq_length, img_size, img_size, 3))
  x = tf.keras.layers.TimeDistributed(vit)(sequence_input)
  x = tf.keras.layers.LSTM(lstm_units, return_sequences=True)(x)
  x = tf.keras.layers.LSTM(lstm_units)(x)
  x = tf.keras.layers.Dense(64, activation='relu')(x)
  x = tf.keras.layers.Dense(1, activation='sigmoid')(x)

  model = tf.keras.Model(inputs=[sequence_input, frame_input], outputs=x)
  model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
  return model

seq_length = 3
model = create_combined_model(seq_length=seq_length)
model.fit(train_dataset, epochs=1, validation_data=val_dataset, steps_per_epoch=20, validation_steps=10)



<keras.src.callbacks.History at 0x297b2ce50>

In [1]:
# Evaluate on test set
test_loss, test_acc = model.evaluate(test_dataset, steps=10)
print(f"Test Accuracy: {test_acc}")

def split_video(video_path, model, threshold=0.5):
    frames = extract_frames(video_path)
    boundaries = [0]  # Always consider the first frame as a boundary

    for i in range(len(frames) - seq_length):
        seq = frames[i:i+seq_length]
        middle_frame = seq[seq_length // 2]
        pred = model.predict([np.expand_dims(seq, axis=0), np.expand_dims(middle_frame, axis=0)])
        if pred[0][0] > threshold:
            boundaries.append(i + seq_length // 2 + 1)  # +1 to match 1-indexing

    return boundaries

def save_video_parts(video_path, boundaries, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = None
    current_part = 0
    frame_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret: break

        frame_count += 1
        if frame_count in boundaries:
            if out is not None:
                out.release()
            current_part += 1
            video_name = os.path.splitext(os.path.basename(video_path))[0]
            out_path = f'{output_dir}/{video_name}_part_{current_part}.mp4'
            out = cv2.VideoWriter(out_path, fourcc, fps, (width, height))

        if out is not None:
            out.write(frame)

    cap.release()
    if out is not None:
        out.release()

# Example: Split a video
video_to_split = 'videos/6.mp4'
detected_boundaries = split_video(video_to_split, model)
output_dir = 'Shot Detection/video parts'  # Save parts back to Drive
save_video_parts(video_to_split, detected_boundaries, output_dir)
print(f"Video parts saved to {output_dir}")

NameError: name 'model' is not defined