In [None]:
# Colab: Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
# Install dependencies
!pip install tensorflow opencv-python




In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import VGG19
from tensorflow.keras.layers import (TimeDistributed, ConvLSTM2D, Dense,
                                    Flatten, Input, Multiply, Reshape,
                                    GlobalAveragePooling2D)
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

In [None]:
DATA_DIR = '/content/drive/MyDrive/action_data'
SEQUENCE_LENGTH = 20  # Increased sequence length for better temporal context
IMG_SIZE = (224, 224)
CLASSES = ['punch', 'non_punch']
STRIDE = 5  # Sliding window stride for temporal overlap
DROPOUT_RATE = 0.5
L2_REG = 0.001
LEARNING_RATE = 1e-5
BATCH_SIZE = 4
EPOCHS = 10

In [None]:
def temporal_sampling(video_path, max_frames=SEQUENCE_LENGTH):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_indices = np.linspace(0, total_frames-1, max_frames, dtype=np.int32)

    frames = []
    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = cv2.resize(frame, IMG_SIZE)
            frames.append(frame)
    cap.release()

    # Temporal padding with reflected frames
    while len(frames) < max_frames:
        frames.extend(frames[::-1][:max_frames-len(frames)])

    return np.array(frames)

In [None]:
def load_dataset_with_overlap(data_dir, classes):
    X, y = [], []
    for label, class_name in enumerate(classes):
        class_dir = os.path.join(data_dir, class_name)
        for filename in os.listdir(class_dir):
            if filename.lower().endswith(('.avi', '.mp4', '.mov', '.mkv')):
                video_path = os.path.join(class_dir, filename)
                frames = temporal_sampling(video_path)

                # Create overlapping sequences
                for i in range(0, len(frames)-SEQUENCE_LENGTH+1, STRIDE):
                    sequence = frames[i:i+SEQUENCE_LENGTH]
                    X.append(sequence)
                    y.append(label)

    return np.array(X), np.array(y)

In [None]:
def build_attention_lstm_model():
    input_layer = Input(shape=(SEQUENCE_LENGTH, 224, 224, 3))
    vgg = VGG19(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    vgg.trainable = False

    # Keep spatial dimensions for ConvLSTM2D
    x = TimeDistributed(vgg)(input_layer)
    x = ConvLSTM2D(128, (3, 3), activation='tanh',
                   recurrent_activation='sigmoid',
                   kernel_regularizer=l2(L2_REG),
                   return_sequences=True)(x)

    # Attention mechanism (optional, keep as before)
    attention = TimeDistributed(Dense(1, activation='tanh'))(x)
    attention = tf.keras.layers.Softmax(axis=1)(attention)
    x = Multiply()([x, attention])

    x = TimeDistributed(Flatten())(x)
    x = tf.keras.layers.GlobalAveragePooling1D()(x)
    x = Dense(64, activation='relu', kernel_regularizer=l2(L2_REG))(x)
    x = tf.keras.layers.Dropout(DROPOUT_RATE)(x)
    output = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=input_layer, outputs=output)
    optimizer = Adam(learning_rate=LEARNING_RATE, clipvalue=1.0)
    model.compile(loss='binary_crossentropy',
                  optimizer=optimizer,
                  metrics=['accuracy', tf.keras.metrics.Precision(name='precision'), tf.keras.metrics.Recall(name='recall')])
    return model


In [None]:
# ================== Enhanced Training Pipeline ==================
# Load dataset
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
X, y = load_dataset_with_overlap(DATA_DIR, CLASSES)
print('Enhanced data shape:', X.shape, y.shape)

# Calculate class weights
class_weights = compute_class_weight('balanced', classes=np.array([0, 1]), y=y)
class_weights = {0: class_weights[0], 1: class_weights[1]}

# Normalize pixel values
X = X.astype('float32') / 255.

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42, shuffle=True
)


Enhanced data shape: (327, 20, 224, 224, 3) (327,)


In [None]:
# Build and train model
model = build_attention_lstm_model()
model.summary()

In [None]:
history = model.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    class_weight=class_weights,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True),
        tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3)
    ]
)

In [None]:
def temporal_smoothing(predictions, window_size=7):
    return np.convolve(predictions, np.ones(window_size)/window_size, mode='same')

y_pred_prob = model.predict(X_test).flatten()
y_pred_smoothed = temporal_smoothing(y_pred_prob)
optimal_threshold = 0.35  # Determine from validation data
y_pred = (y_pred_smoothed > optimal_threshold).astype(int)

# Calculate metrics
print("Confusion Matrix:\n", confusion_matrix(y_test, y_pred))
print("Accuracy:", accuracy_score(y_test, y_pred))
print("Precision:", precision_score(y_test, y_pred))
print("Recall:", recall_score(y_test, y_pred))
print("F1 Score:", f1_score(y_test, y_pred))
print("ROC-AUC:", roc_auc_score(y_test, y_pred_prob))

NameError: name 'model' is not defined

In [None]:
def dynamic_thresholding(predictions, base_threshold=0.3, sensitivity=0.1):
    """Adaptive threshold based on prediction distribution"""
    mean_pred = np.mean(predictions)
    return base_threshold + (mean_pred * sensitivity)

In [None]:
def mark_punches_in_video(input_video_path, output_video_path):
    cap = cv2.VideoCapture(input_video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    frame_buffer = []
    prediction_buffer = []
    smoothing_window = 5

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Maintain buffer of recent frames
        frame_buffer.append(frame)
        if len(frame_buffer) > SEQUENCE_LENGTH * 2:
            frame_buffer.pop(0)

        # Process when buffer has enough frames
        if len(frame_buffer) >= SEQUENCE_LENGTH:
            # Prepare sequence with temporal overlap
            start_idx = max(0, len(frame_buffer)-SEQUENCE_LENGTH)
            sequence_frames = frame_buffer[start_idx:start_idx+SEQUENCE_LENGTH]

            # Preprocess
            seq = [cv2.resize(f, IMG_SIZE) for f in sequence_frames]
            seq = np.array(seq).astype('float32') / 255.0
            seq = np.expand_dims(seq, axis=0)

            # Predict with temporal context
            punch_prob = model.predict(seq, verbose=0)[0][0]
            prediction_buffer.append(punch_prob)

            # Apply temporal smoothing
            if len(prediction_buffer) > smoothing_window:
                smoothed_prob = np.mean(prediction_buffer[-smoothing_window:])
            else:
                smoothed_prob = punch_prob

            # Dynamic thresholding
            current_threshold = dynamic_thresholding(prediction_buffer)

            # Visual feedback
            if smoothed_prob > current_threshold:
                # Dynamic bounding box based on prediction confidence
                box_scale = 0.3 + (smoothed_prob * 0.4)
                h, w = frame.shape[:2]
                cx, cy = w//2, h//2
                box_size = int(min(w,h) * box_scale)
                x1, y1 = cx - box_size//2, cy - box_size//2
                x2, y2 = cx + box_size//2, cy + box_size//2

                cv2.rectangle(frame, (x1, y1), (x2, y2), (0,0,255), 3)
                cv2.putText(frame, f"PUNCH: {smoothed_prob:.2f}", (x1, y1-10),
                          cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,255), 2)

        out.write(frame)

    cap.release()
    out.release()
    print(f"Enhanced annotated video saved to {output_video_path}")

[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m35s[0m 5s/step
Confusion Matrix:
 [[26  2]
 [24 14]]
Accuracy: 0.6060606060606061
Precision: 0.875
Recall: 0.3684210526315789
F1 Score: 0.5185185185185185
ROC-AUC: 0.7453007518796994


In [None]:
# Example usage
input_video = '/content/drive/MyDrive/punch/test/videos/v_Punch_g05_c02.avi'
output_video = '/content/drive/MyDrive/enhanced_vgg19_convlstm.mp4'
mark_punches_in_video(input_video, output_video)

Annotated video saved to /content/drive/MyDrive/vgg19_convlstm.mp4
