**Dual Stage Attention Prediction Module**

In [None]:
# Initialization
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import cv2
import glob
import os
import pandas as pd
from google.colab.patches import cv2_imshow
from tensorflow.keras import layers, models, losses, optimizers
from math import log10, sqrt
from sklearn.preprocessing import MinMaxScaler

# Load UCSD Ped1 Dataset (7200 images)
all_images = []
im_dir = "/content/drive/MyDrive/Train_Data"
list_of_files = sorted(os.listdir(im_dir), reverse=False)

for im_folder in list_of_files:  # iterate over each folder
    list_of_img_files = sorted(os.listdir(im_dir + '/' + im_folder), reverse=False)
    for image_file in list_of_img_files:  # iterate over each image
        image = cv2.imread((im_dir + '/' + im_folder + '/' + image_file), cv2.IMREAD_GRAYSCALE)  # Read as grayscale
        image = cv2.resize(image, (128, 128))  # Resize to 128x128
        all_images.append(image)

# Convert to NumPy array and reshape
all_images = np.array(all_images).reshape(340, 20, 128, 128, 1) # shape for 7200 images

# Split into training and validation sets
indexes = np.arange(all_images.shape[0])
np.random.shuffle(indexes)
train_index = indexes[: int(0.9 * all_images.shape[0])]
val_index = indexes[int(0.9 * all_images.shape[0]):]

train_dataset = all_images[train_index] / 255.0  # Normalize
val_dataset = all_images[val_index] / 255.0  # Normalize

# Helper function to create shifted frames
def create_shifted_frames(data):
    x = data[:, :-1, :, :, :]
    y = data[:, 1:, :, :, :]
    return x, y

# Apply frame shifting
x_train, y_train = create_shifted_frames(train_dataset)
x_val, y_val = create_shifted_frames(val_dataset)

# Inspect the dataset
print("Training Dataset Shapes:", x_train.shape, y_train.shape)
print("Validation Dataset Shapes:", x_val.shape, y_val.shape)

# Visualizing sample frames
fig, axes = plt.subplots(4, 5, figsize=(10, 8))
data_choice = np.random.choice(range(len(train_dataset)), size=1)[0]

for idx, ax in enumerate(axes.flat):
    ax.imshow(np.squeeze(train_dataset[data_choice][idx]), cmap="gray")
    ax.set_title(f"Frame {idx + 1}")
    ax.axis("off")
plt.show()

# CBAM Attention Block
import tensorflow.keras.backend as K

def cbam_block(input_tensor, reduction_ratio=8):
    # Channel Attention
    channel_avg_pool = layers.GlobalAveragePooling3D()(input_tensor)
    channel_max_pool = layers.GlobalMaxPooling3D()(input_tensor)

    channel_avg_pool = layers.Dense(units=input_tensor.shape[-1] // reduction_ratio, activation="relu")(channel_avg_pool)
    channel_avg_pool = layers.Dense(units=input_tensor.shape[-1], activation="sigmoid")(channel_avg_pool)

    channel_max_pool = layers.Dense(units=input_tensor.shape[-1] // reduction_ratio, activation="relu")(channel_max_pool)
    channel_max_pool = layers.Dense(units=input_tensor.shape[-1], activation="sigmoid")(channel_max_pool)

    channel_attention = layers.Add()([channel_avg_pool, channel_max_pool])
    channel_attention = layers.Multiply()([input_tensor, channel_attention])

    # Spatial Attention
    spatial_avg_pool = layers.Lambda(lambda x: K.expand_dims(K.mean(x, axis=-1), axis=-1))(channel_attention)
    spatial_max_pool = layers.Lambda(lambda x: K.expand_dims(K.max(x, axis=-1), axis=-1))(channel_attention)
    spatial_attention = layers.Concatenate(axis=-1)([spatial_avg_pool, spatial_max_pool])
    spatial_attention = layers.Conv3D(filters=1, kernel_size=7, activation="sigmoid", padding="same")(spatial_attention)
    spatial_attention = layers.Multiply()([channel_attention, spatial_attention])

    return spatial_attention

# DSAPM Model
inp = layers.Input(shape=(None, *x_train.shape[2:]))

# Block 1
x1 = layers.ConvLSTM2D(filters=64, kernel_size=(5, 5), padding="same", return_sequences=True, activation="relu")(inp)
x1 = layers.BatchNormalization()(x1)
x1 = cbam_block(x1)

# Block 2
x2 = layers.ConvLSTM2D(filters=64, kernel_size=(5, 5), padding="same", return_sequences=True, activation="relu")(x1)
x2 = layers.BatchNormalization()(x2)
x2 = cbam_block(x2)
x2 = layers.Add()([x1, x2])

# Block 3
x3 = layers.ConvLSTM2D(filters=128, kernel_size=(3, 3), padding="same", return_sequences=True, activation="relu")(x2)
x3 = layers.BatchNormalization()(x3)
x3 = cbam_block(x3)
x3 = layers.Add()([x2, x3])

# Block 4
x4 = layers.ConvLSTM2D(filters=128, kernel_size=(3, 3), padding="same", return_sequences=True, activation="relu")(x3)
x4 = layers.BatchNormalization()(x4)
x4 = cbam_block(x4)
x4 = layers.Add()([x3, x4])

# Block 5
x5 = layers.ConvLSTM2D(filters=64, kernel_size=(1, 1), padding="same", return_sequences=True, activation="relu")(x4)
x5 = layers.BatchNormalization()(x5)
x5 = cbam_block(x5)
x5 = layers.Add()([x4, x5])

# Output Layer
x = layers.Conv3D(filters=1, kernel_size=(3, 3, 3), activation="sigmoid", padding="same")(x5)

# Compile Model
model = keras.models.Model(inp, x)
model.compile(loss="binary_crossentropy", optimizer=keras.optimizers.Adam())
model.summary()

# Training
early_stopping = keras.callbacks.EarlyStopping(monitor="val_loss", patience=10)
reduce_lr = keras.callbacks.ReduceLROnPlateau(monitor="val_loss", patience=5)

model.fit(
    x_train, y_train,
    batch_size=64,
    epochs=40,
    validation_data=(x_val, y_val),
    callbacks=[early_stopping, reduce_lr],
)

# Calculate MSE
MSE = []
for j in range(len(train_dataset)):
    example = train_dataset[j]
    frames = example[:19, :, :, :]
    original_frame = example[19, :, :, :]

    predicted_frames = model.predict(np.expand_dims(frames, axis=0))
    predicted_frame = np.squeeze(predicted_frames)[-1, :, :, :]

    mse_value = np.square(np.subtract(predicted_frame, original_frame)).mean()
    MSE.append(mse_value)

print("MSE List:", MSE)

# Calculate PSNR
PSNR = []
max_pixel = 1.0  # Normalized pixel range

for mse_value in MSE:
    if mse_value == 0:
        psnr_value = float('inf')  # Avoid division by zero
    else:
        psnr_value = 20 * log10(max_pixel / sqrt(mse_value))
    PSNR.append(psnr_value)

print("PSNR List:", PSNR)

# Normalize PSNR
scaler = MinMaxScaler()
norm_PSNR_predicted = scaler.fit_transform(np.array(PSNR).reshape(-1, 1)).flatten()

# Compute Frame Scores
Predicted_train = norm_PSNR_predicted.tolist()

# Save to Excel
df = pd.DataFrame({"Frame Score": Predicted_train})
df.to_excel("frame_scores_train.xlsx", index=False)

# Save Model
os.makedirs("models", exist_ok=True)
model.save("models/dsapm.h5")
