<a href="https://colab.research.google.com/github/Dimble777/Action-Recognition-AI-777/blob/main/Action_recognition_777.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# ============================================================
# 📦 Setup (Colab environment)
# ============================================================
!pip install tensorflow opencv-python

#from google.colab import drive
#drive.mount('/content/drive')

from google.colab import files
import zipfile, os, numpy as np, cv2, tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.applications import MobileNetV2


# ============================================================
# ⚙️ Constants (update as needed)
# ============================================================
dataset_path = "/content/drive/MyDrive/data"  # path to extracted dataset
IMG_SIZE = (224, 224)
IMG_HEIGHT, IMG_WIDTH = IMG_SIZE
CHANNELS = 3
SEQ_LEN = 16
BATCH_SIZE = 4
EPOCHS = 8
NUM_ACTION_CLASSES = 5      # number of your real action classes
NUM_CLASSES_WITH_BG = NUM_ACTION_CLASSES + 1  # +1 for background

cls_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
box_loss = tf.keras.losses.MeanSquaredError()


# ============================================================
# 🏷️ Action labels mapping
# ============================================================
ACTION_LABELS = {
    0: "using phone",

}




# ============================================================
# 🔍 Helper: Parse YOLO label file
# ============================================================
def parse_yolo_file(label_file):
    if not os.path.exists(label_file):
        return None
    with open(label_file, "r") as f:
        lines = [l.strip() for l in f.readlines() if l.strip()]
    if len(lines) == 0:
        return None
    parts = lines[0].split()
    if len(parts) < 5:
        return None
    cls = int(parts[0])
    cx, cy, w, h = map(float, parts[1:5])
    return (cls, cx, cy, w, h)

# ============================================================
# 🎞️ Dataset generator
# ============================================================
def video_clip_generator(split):
    split_dir = os.path.join(dataset_path, split)
    # Filter out non-directory entries like .DS_Store
    video_folders = sorted([f for f in os.listdir(split_dir) if os.path.isdir(os.path.join(split_dir, f))])

    for video_folder in video_folders:
        images_dir = os.path.join(split_dir, video_folder, "images")
        labels_dir = os.path.join(split_dir, video_folder, "labels")
        frames = sorted(os.listdir(images_dir))
        # Filter out non-image files (like .DS_Store)
        frames = [f for f in frames if f.lower().endswith(('.png', '.jpg', '.jpeg'))]


        # slide window for temporal clips
        for start in range(0, len(frames) - SEQ_LEN + 1, SEQ_LEN):
            clip_imgs = []
            bbox_targets = np.zeros((SEQ_LEN, 4), dtype=np.float32)
            class_targets = np.full((SEQ_LEN,), NUM_CLASSES_WITH_BG - 1, dtype=np.int32)
            mask = np.zeros((SEQ_LEN,), dtype=np.float32)

            for i, idx in enumerate(range(start, start + SEQ_LEN)):
                img_name = frames[idx]
                img_path = os.path.join(images_dir, img_name)
                lbl_path = os.path.join(labels_dir, img_name.replace(".jpg", ".txt").replace(".png", ".txt").replace(".jpeg", ".txt"))

                img = cv2.imread(img_path)

                # Check if image was loaded successfully
                if img is None:
                    print(f"Warning: Could not load image {img_path}. Skipping frame.")
                    continue  # Skip this frame and continue to the next

                img = cv2.resize(img, IMG_SIZE) / 255.0
                clip_imgs.append(img)

                if os.path.exists(lbl_path):
                    with open(lbl_path, "r") as f:
                        line = f.readline().strip().split()
                        # Check if line is not empty before accessing elements
                        if line and len(line) >= 5: # Check if line has at least 5 elements
                            try:
                                cls = int(line[0])
                                x, y, w, h = map(float, line[1:])
                                x_min = (x - w / 2) * IMG_SIZE[0]
                                y_min = (y - h / 2) * IMG_SIZE[1]
                                x_max = (x + w / 2) * IMG_SIZE[0]
                                y_max = (y + h / 2) * IMG_SIZE[1]
                                bbox_targets[i] = [x_min, y_min, x_max, y_max]
                                class_targets[i] = cls
                                mask[i] = 1.0
                            except ValueError:
                                print(f"Warning: Invalid data format in label file {lbl_path}. Skipping label.")
                        else:
                            print(f"Warning: Empty or invalid label file {lbl_path}. Skipping label.")


            # Only yield a clip if it has the required sequence length
            if len(clip_imgs) == SEQ_LEN:
                yield np.array(clip_imgs, dtype=np.float32), bbox_targets, class_targets, mask


# ============================================================
# 🔄 Create tf.data.Dataset
# ============================================================
def make_dataset(split, batch_size=BATCH_SIZE, shuffle=True):
    gen = lambda: video_clip_generator(split)
    output_signature = (
        tf.TensorSpec(shape=(SEQ_LEN, IMG_SIZE[0], IMG_SIZE[1], 3), dtype=tf.float32),
        tf.TensorSpec(shape=(SEQ_LEN, 4), dtype=tf.float32),
        tf.TensorSpec(shape=(SEQ_LEN,), dtype=tf.int32),
        tf.TensorSpec(shape=(SEQ_LEN,), dtype=tf.float32),
    )
    ds = tf.data.Dataset.from_generator(gen, output_signature=output_signature)
    if shuffle:
        ds = ds.shuffle(256)
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    # Map dataset to (x, (bbox, class, mask))
    ds = ds.map(lambda imgs, bbox_targets, class_targets, mask: (imgs, (bbox_targets, class_targets, mask)))
    return ds


# ============================================================
# 🧠 Build model
# ============================================================
def build_model(seq_len=SEQ_LEN, img_size=IMG_SIZE, num_classes_with_bg=NUM_CLASSES_WITH_BG):
    inp = layers.Input(shape=(seq_len, img_size[0], img_size[1], 3))
    def frame_cnn():
        base = MobileNetV2(input_shape=(img_size[0], img_size[1], 3),
                           include_top=False, weights='imagenet', pooling='avg')
        base.trainable = False
        return base
    td = layers.TimeDistributed(frame_cnn())(inp)
    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True))(td)
    per_frame = layers.TimeDistributed(layers.Dense(128, activation='relu'))(x)
    bbox_out = layers.TimeDistributed(layers.Dense(4, activation='sigmoid'), name="bbox")(per_frame)
    class_out = layers.TimeDistributed(layers.Dense(num_classes_with_bg), name="class_logits")(per_frame)
    model = models.Model(inputs=inp, outputs=[bbox_out, class_out])
    return model

# ============================================================
# ⚡ Custom training wrapper
# ============================================================
class DetModel(tf.keras.Model):
    def __init__(self, base_model, **kwargs):
        super().__init__(**kwargs)
        self.model = base_model
        self.cls_acc = tf.keras.metrics.SparseCategoricalAccuracy(name="frame_acc")
        self.loss_tracker = tf.keras.metrics.Mean(name="loss")

    def call(self, inputs):
        return self.model(inputs)

    def compile(self, optimizer, cls_loss, box_loss):
        super().compile(optimizer=optimizer, loss={'bbox': box_loss, 'class_logits': cls_loss})
        self.cls_loss = cls_loss
        self.box_loss = box_loss


    @property
    def metrics(self):
        return [self.loss_tracker, self.cls_acc]

    def train_step(self, data):
        imgs, (bbox_true, class_true, mask) = data
        with tf.GradientTape() as tape:
            bbox_pred, class_logits = self.model(imgs, training=True)
            cls_l = self.cls_loss(class_true, class_logits)
            box_l = self.box_loss(bbox_true, bbox_pred)
            loss = cls_l + box_l

        grads = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
        self.loss_tracker.update_state(loss)
        self.cls_acc.update_state(class_true, class_logits)
        return {"loss": loss, "bbox": box_l, "class_logits": cls_l}

    def test_step(self, data):
        imgs, (bbox_true, class_true, mask) = data
        bbox_pred, class_logits = self.model(imgs, training=False)
        cls_l = self.cls_loss(class_true, class_logits)
        box_l = self.box_loss(bbox_true, bbox_pred)
        loss = cls_l + box_l
        self.loss_tracker.update_state(loss)
        self.cls_acc.update_state(class_true, class_logits)
        return {"loss": loss, "bbox": box_l, "class_logits": cls_l}


# ============================================================
# 🚀 Main (fixed)
# ============================================================
if __name__ == "__main__":
    train_ds = make_dataset("train").repeat()
    val_ds = make_dataset("val").repeat() # Added .repeat()

    base_model = build_model()
    det_model = DetModel(base_model)
    optimizer = tf.keras.optimizers.Adam(1e-4)

    det_model.compile(
        optimizer=optimizer,
        cls_loss=cls_loss,
        box_loss=box_loss
    )


# Estimate how many clips are in your dataset
num_train = sum(1 for _ in video_clip_generator("train"))
num_val = sum(1 for _ in video_clip_generator("val"))

# Calculate steps per epoch
steps_per_epoch = max(1, num_train // BATCH_SIZE)
validation_steps = max(1, num_val // BATCH_SIZE)

print(f"Steps per epoch: {steps_per_epoch}, Validation steps: {validation_steps}")

# Train with proper limits
history = det_model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    steps_per_epoch=steps_per_epoch,
    validation_steps=validation_steps
)


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224_no_top.h5
[1m9406464/9406464[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Steps per epoch: 236, Validation steps: 59
Epoch 1/8
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1622s[0m 6s/step - bbox: 13448.6572 - class_logits: 0.2752 - loss: 13448.9326 - val_bbox: 14110.8721 - val_class_logits: 0.0176 - val_loss: 14110.8896
Epoch 2/8
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1351s[0m 6s/step - bbox: 13489.6572 - class_logits: 0.0621 - loss: 13489.7188 - val_bbox: 13082.9199 - val_class_logits: 0.0137 - val_loss: 13082.9336
Epoch 3/8
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1331s[0m 6s/step - bbox: 13447.7314 - class_logits: 0.0543 - loss: 13447.7861 - val_bbox: 13419.2705 - val_class_logits: 0.0171 - val_loss: 13419.2881
Epoch 4/8
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━

In [None]:
det_model.model.save_weights("/content/drive/MyDrive/storevisite_weights.weights.h5")
print("Model saved successfully!")


Model saved successfully!


In [None]:
det_model.model.load_weights("/content/drive/MyDrive/storevisite_weights.weights.h5")
print("Model loaded successfully.")


Model loaded successfully.


In [None]:
det_model.model.load_weights("/content/drive/MyDrive/storevisite_weights.weights.h5")
print("Model loaded successfully.")


Model loaded successfully.


In [None]:

import os

print("\nSearching for .h5 files in MyDrive...\n")

for root, dirs, files in os.walk("/content/drive/MyDrive"):
    for f in files:
        if f.endswith(".h5"):
            print(os.path.join(root, f))


Searching for .h5 files in Drive...

/content/drive/MyDrive/det_model.weights.h5
/content/drive/MyDrive/storevisite_weights.weights.h5


In [None]:
# ============================================================
# 🔮 INFERENCE CODE (Run after training is completed)
# ============================================================

import cv2
import numpy as np
import tensorflow as tf

# Load trained model
det_model.model.load_weights("/content/drive/MyDrive/storevisite_weights.weights.h5")
print("Model loaded successfully.")

# Same constants
IMG_SIZE = (224, 224)
SEQ_LEN = 16
ACTION_LABELS = {
    0: "using phone",
}

# ------------------------------------------------------------
# 🔄 Preprocess frames for model input
# ------------------------------------------------------------
def preprocess_frame(frame):
    frame = cv2.resize(frame, IMG_SIZE)
    frame = frame.astype("float32") / 255.0
    return frame

# ------------------------------------------------------------
# 🎞️ Convert YOLO output (0–1) to pixel bounding box
# ------------------------------------------------------------
def yolo_to_pixels(bbox, orig_w, orig_h):
    x1 = int(bbox[0] * orig_w)
    y1 = int(bbox[1] * orig_h)
    x2 = int(bbox[2] * orig_w)
    y2 = int(bbox[3] * orig_h)
    return x1, y1, x2, y2

# ------------------------------------------------------------
# 🎥 Run inference on a video
# ------------------------------------------------------------
def run_inference(video_path):

    cap = cv2.VideoCapture(video_path)

    frames_buffer = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        orig_h, orig_w = frame.shape[:2]

        # Preprocess & store in buffer
        frames_buffer.append(preprocess_frame(frame))

        # Only run when buffer full
        if len(frames_buffer) == SEQ_LEN:

            clip = np.array(frames_buffer, dtype=np.float32)
            clip = np.expand_dims(clip, axis=0)

            bbox_pred, class_logits = det_model.model(clip, training=False)

            bbox_pred = bbox_pred[0].numpy()
            class_logits = class_logits[0].numpy()

            # Use the last frame prediction for drawing
            last_bbox = bbox_pred[-1]
            last_class = np.argmax(class_logits[-1])

            label = ACTION_LABELS.get(last_class, "unknown")

            # Convert box to pixel coordinates
            x1, y1, x2, y2 = yolo_to_pixels(last_bbox, orig_w, orig_h)

            # Draw bounding box + label
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, label, (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

            # Remove oldest frame
            frames_buffer.pop(0)

        cv2.imshow("Output", frame)

        if cv2.waitKey(1) == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()


# ============================================================
# ▶️ Run inference
# ============================================================
video_path = "/content/drive/MyDrive/test_video.mp4"
run_inference(video_path)


Model loaded successfully.


In [None]:
import cv2
import numpy as np
from google.colab.patches import cv2_imshow

class ShowTrainingOutput(tf.keras.callbacks.Callback):
    def on_train_batch_end(self, batch, logs=None):
        # Take 1 sequence from the batch
        clip = self.model.input[0]  # get the input sequence

        clip_np = clip[0].numpy()   # first sample
        frame = (clip_np[-1] * 255).astype(np.uint8)

        # Predict
        bbox_pred, cls_pred = self.model.predict(clip[:1], verbose=0)

        bbox = bbox_pred[0][-1]
        cls_id = np.argmax(cls_pred[0][-1])

        # Convert bbox to pixel coords
        h, w = frame.shape[:2]
        x1 = int(bbox[0] * w)
        y1 = int(bbox[1] * h)
        x2 = int(bbox[2] * w)
        y2 = int(bbox[3] * h)

        # Draw bbox + label
        cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,0), 2)
        cv2.putText(frame, f"Class: {cls_id}", (x1, y1 - 5),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)

        # Show output frame in Colab
        cv2_imshow(frame)
        cv2.waitKey(1)


In [None]:
class EpochVisualizer(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        sample = next(iter(train_ds))[0]   # first batch frames
        clip = sample[0].numpy()

        frame = (clip[-1] * 255).astype(np.uint8)

        bbox_pred, cls_pred = self.model.predict(sample[:1], verbose=0)

        bbox = bbox_pred[0][-1]
        cls_id = np.argmax(cls_pred[0][-1])

        h, w = frame.shape[:2]
        x1 = int(bbox[0] * w)
        y1 = int(bbox[1] * h)
        x2 = int(bbox[2] * w)
        y2 = int(bbox[3] * h)

        cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,0), 2)
        cv2.putText(frame, f"Class: {cls_id}", (x1,y1-5),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)

        cv2_imshow(frame)
        print("Epoch:", epoch+1)


In [None]:
import cv2
import numpy as np
from google.colab.patches import cv2_imshow

class EpochVisualizer(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):

        # Get 1 batch from train dataset
        batch = next(iter(train_ds))
        clip = batch[0][0].numpy()   # (SEQ, 224, 224, 3)

        # Use last frame for visualization
        frame = (clip[-1] * 255).astype(np.uint8)

        # Expand dims for prediction
        clip_expanded = np.expand_dims(clip, axis=0)

        # Predict
        bbox_pred, class_pred = self.model.predict(clip_expanded, verbose=0)

        bbox = bbox_pred[0][-1]      # last frame bbox
        cls_id = np.argmax(class_pred[0][-1])

        # Convert YOLO box (0–1) to pixel
        h, w = frame.shape[:2]
        x1 = int(bbox[0] * w)
        y1 = int(bbox[1] * h)
        x2 = int(bbox[2] * w)
        y2 = int(bbox[3] * h)

        # Draw on frame
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)
        cv2.putText(frame,
                    f"Predicted: {cls_id}",
                    (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.7,
                    (0,255,0),
                    2)

        # Show in Colab
        print(f"\n📌 Epoch {epoch+1} Output:")
        cv2_imshow(frame)


In [None]:
import cv2
import numpy as np
from google.colab.patches import cv2_imshow

class EpochVisualizer(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):

        # Get one batch
        batch = next(iter(train_ds))

        frames = batch[0][0].numpy()     # shape = (16, 224, 224, 3)
        last_frame = frames[-1]          # shape = (224, 224, 3)

        # Prepare for model (expand dims)
        inp = np.expand_dims(last_frame, axis=0)   # (1, 224, 224, 3)

        # Predict
        pred = self.model.predict(inp, verbose=0)

        # If model outputs class only
        class_id = np.argmax(pred)

        # Draw class text on frame
        frame_to_show = (last_frame * 255).astype(np.uint8)
        cv2.putText(frame_to_show,
                    f"Pred: {class_id}",
                    (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    1,
                    (0, 255, 0),
                    2)

        print(f"\n📌 Epoch {epoch+1} Output:")
        cv2_imshow(frame_to_show)


In [None]:
model.summary()


In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

SEQ_LEN = 16
IMG_SIZE = (224, 224)

def build_video_model():

    # Input: sequence of frames
    inp = layers.Input(shape=(SEQ_LEN, IMG_SIZE[0], IMG_SIZE[1], 3))

    # CNN feature extractor applied to each frame
    cnn = models.Sequential([
        layers.Conv2D(32, (3,3), activation='relu'),
        layers.MaxPooling2D(2,2),
        layers.Conv2D(64, (3,3), activation='relu'),
        layers.MaxPooling2D(2,2),
        layers.Flatten(),
        layers.Dense(128, activation='relu')
    ])

    x = layers.TimeDistributed(cnn)(inp)

    # LSTM to process time dimension
    x = layers.LSTM(128, return_sequences=True)(x)

    # Output 1: Bounding box per frame → 4 values (x1,y1,x2,y2)
    bbox_out = layers.TimeDistributed(layers.Dense(4, activation='sigmoid'))(x)

    # Output 2: Class per frame
    cls_out = layers.TimeDistributed(layers.Dense(2, activation='softmax'))(x)

    model = models.Model(inputs=inp, outputs=[bbox_out, cls_out])
    return model

det_model = build_video_model()

det_model.compile(
    optimizer='adam',
    loss=['mse', 'categorical_crossentropy'],
    loss_weights=[1.0, 1.0]
)

det_model.summary()


In [None]:
import cv2
import numpy as np

def load_clip(video_path, num_frames=16, size=(224, 224)):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # choose 16 evenly spaced frame indices
    indices = np.linspace(0, total_frames - 1, num_frames).astype(int)

    frames = []
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, size)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frames.append(frame)

    cap.release()
    return np.array(frames)   # shape: (16, 224, 224, 3)


In [None]:
import cv2
import os
import numpy as np

IMG_SIZE = (224, 224)
SEQ_LEN = 16
ACTION_LABELS = {0: "using phone"}  # Update as needed

def load_clip_with_labels(video_folder, seq_len=SEQ_LEN, img_size=IMG_SIZE):
    """
    Load a clip (sequence of frames) and corresponding YOLO labels
    """
    images_dir = os.path.join(video_folder, "images")
    labels_dir = os.path.join(video_folder, "labels")

    frames = sorted([f for f in os.listdir(images_dir) if f.lower().endswith(('.jpg', '.png', '.jpeg'))])
    clip_imgs = []
    clip_bboxes = []
    clip_classes = []

    for img_name in frames[:seq_len]:  # take first SEQ_LEN frames
        img_path = os.path.join(images_dir, img_name)
        lbl_path = os.path.join(labels_dir, img_name.rsplit(".",1)[0] + ".txt")

        img = cv2.imread(img_path)
        if img is None:
            continue
        img = cv2.resize(img, img_size)
        clip_imgs.append(img)

        # default values if no label
        bbox = [0,0,0,0]
        cls = -1

        if os.path.exists(lbl_path):
            with open(lbl_path, "r") as f:
                line = f.readline().strip().split()
                if len(line) >= 5:
                    cls = int(line[0])
                    x, y, w, h = map(float, line[1:5])
                    # Convert YOLO normalized to pixel coordinates
                    x1 = int((x - w/2) * img_size[0])
                    y1 = int((y - h/2) * img_size[1])
                    x2 = int((x + w/2) * img_size[0])
                    y2 = int((y + h/2) * img_size[1])
                    bbox = [x1, y1, x2, y2]

        clip_bboxes.append(bbox)
        clip_classes.append(cls)

    clip_imgs = np.array(clip_imgs)
    clip_bboxes = np.array(clip_bboxes)
    clip_classes = np.array(clip_classes)

    return clip_imgs, clip_bboxes, clip_classes


def visualize_clip(clip_imgs, clip_bboxes, clip_classes):
    """
    Display clip frames with bounding boxes and labels
    """
    for frame, bbox, cls in zip(clip_imgs, clip_bboxes, clip_classes):
        x1, y1, x2, y2 = bbox
        label = ACTION_LABELS.get(cls, "unknown")
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)
        cv2.putText(frame, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)
        cv2.imshow("Clip", frame)
        if cv2.waitKey(200) & 0xFF == ord('q'):  # 200ms per frame
            break
    cv2.destroyAllWindows()


In [None]:
from google.colab.patches import cv2_imshow


In [None]:
from google.colab.patches import cv2_imshow
import time

def visualize_clip(clip_imgs, clip_bboxes=None, clip_classes=None, delay=0.2):
    """
    Displays a sequence of frames in Colab with optional bounding boxes and labels.
    """
    for i, frame in enumerate(clip_imgs):
        frame_disp = (frame * 255).astype(np.uint8).copy()

        if clip_bboxes is not None and clip_classes is not None:
            bbox = clip_bboxes[i]
            cls = clip_classes[i]
            label = ACTION_LABELS.get(cls, "unknown")
            x1, y1, x2, y2 = map(int, bbox)
            cv2.rectangle(frame_disp, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame_disp, label, (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

        cv2_imshow(frame_disp)
        time.sleep(delay)  # small pause to simulate video playback


In [None]:
def run_inference_colab(video_path, model, save_path="/content/output.mp4"):
    cap = cv2.VideoCapture(video_path)
    frames_buffer = []
    out = None

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        orig_h, orig_w = frame.shape[:2]
        frame_resized = cv2.resize(frame, (224, 224)).astype("float32") / 255.0
        frames_buffer.append(frame_resized)

        if len(frames_buffer) == 16:
            clip = np.expand_dims(np.array(frames_buffer), axis=0)
            bbox_pred, class_logits = model(clip, training=False)
            bbox_pred = bbox_pred[0].numpy()
            class_logits = class_logits[0].numpy()

            last_bbox = bbox_pred[-1]
            last_class = np.argmax(class_logits[-1])
            label = ACTION_LABELS.get(last_class, "unknown")

            # Convert normalized bbox to pixel coordinates
            x1 = int(last_bbox[0] * orig_w)
            y1 = int(last_bbox[1] * orig_h)
            x2 = int(last_bbox[2] * orig_w)
            y2 = int(last_bbox[3] * orig_h)

            # Draw
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, label, (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
            frames_buffer.pop(0)

        # Initialize video writer
        if out is None:
            fourcc = cv2.VideoWriter_fourcc(*"mp4v")
            out = cv2.VideoWriter(save_path, fourcc, 20, (orig_w, orig_h))

        out.write(frame)

    cap.release()
    out.release()
    print(f"Saved output video to {save_path}")


In [None]:
from google.colab import drive
import os

# 1️⃣ Mount Google Drive
drive.mount('/content/drive')

# 2️⃣ Check the folder structure
data_root = "/content/drive/MyDrive/data"
print(os.listdir(data_root))  # This should list your folders like 'train', 'val', 'test', etc.

# 3️⃣ Check the test folder path
test_root = os.path.join(data_root, "test")
if os.path.exists(test_root):
    print("Test folder found:", os.listdir(test_root))
else:
    print("Test folder does NOT exist! Check your folder name.")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
['train', 'test ', 'val']
Test folder does NOT exist! Check your folder name.


In [None]:
# Correct folder path with trailing space
test_root = os.path.join(data_root, "test ")  # note the space at the end

if os.path.exists(test_root):
    print("Test folder found:", os.listdir(test_root))
else:
    print("Test folder still not found! Check the name carefully.")


Test folder found: ['.DS_Store', 'OutPut116', 'OutPut126', 'OutPut123', 'OutPut105', 'OutPut15.2', 'OutPut53', 'OutPut101.1', 'OutPut3', 'OutPut96', 'OutPut131.mp4', 'OutPut90.mp4', 'OutPut0.19.mp4', 'OutPut94.1', 'OutPut0.22.mp4', 'OutPut54.3', 'OutPut108']


In [None]:
from google.colab.patches import cv2_imshow
import os
import cv2
import numpy as np

# Update these constants
SEQ_LEN = 16
IMG_SIZE = (224, 224)
ACTION_LABELS = {0: "using phone"}

def yolo_to_pixels(bbox, orig_w, orig_h):
    x1 = int(bbox[0] * orig_w)
    y1 = int(bbox[1] * orig_h)
    x2 = int(bbox[2] * orig_w)
    y2 = int(bbox[3] * orig_h)
    return x1, y1, x2, y2

def visualize_test_folder(folder_path, model):
    images_dir = os.path.join(folder_path, "images")
    labels_dir = os.path.join(folder_path, "labels")
    frames = sorted([f for f in os.listdir(images_dir) if f.lower().endswith(('.jpg', '.png'))])

    buffer = []

    for img_name in frames:
        img_path = os.path.join(images_dir, img_name)
        frame = cv2.imread(img_path)
        orig_h, orig_w = frame.shape[:2]

        # Preprocess
        img = cv2.resize(frame, IMG_SIZE) / 255.0
        buffer.append(img)

        if len(buffer) == SEQ_LEN:
            clip = np.expand_dims(np.array(buffer, dtype=np.float32), axis=0)
            bbox_pred, class_logits = model(clip, training=False)

            last_bbox = bbox_pred[0][-1].numpy()
            last_class = np.argmax(class_logits[0][-1].numpy())
            label = ACTION_LABELS.get(last_class, "unknown")

            x1, y1, x2, y2 = yolo_to_pixels(last_bbox, orig_w, orig_h)
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)

            # Show in Colab
            cv2_imshow(frame)

            buffer.pop(0)  # slide window


In [None]:
visualize_test_folder(folder_path, det_model)
