In [None]:
!!wget --no-check-certificate 'https://drive.google.com/uc?export=download&id=1j71BjISymIyAqDbsuxNjBj4nqx9ozF0e' -O validation_data_cleaned.json

['--2025-02-17 05:39:44--  https://drive.google.com/uc?export=download&id=1j71BjISymIyAqDbsuxNjBj4nqx9ozF0e',
 'Resolving drive.google.com (drive.google.com)... 142.250.141.100, 142.250.141.113, 142.250.141.139, ...',
 'Connecting to drive.google.com (drive.google.com)|142.250.141.100|:443... connected.',
 'HTTP request sent, awaiting response... 303 See Other',
 'Location: https://drive.usercontent.google.com/download?id=1j71BjISymIyAqDbsuxNjBj4nqx9ozF0e&export=download [following]',
 '--2025-02-17 05:39:44--  https://drive.usercontent.google.com/download?id=1j71BjISymIyAqDbsuxNjBj4nqx9ozF0e&export=download',
 'Resolving drive.usercontent.google.com (drive.usercontent.google.com)... 142.251.2.132, 2607:f8b0:4023:c0d::84',
 'Connecting to drive.usercontent.google.com (drive.usercontent.google.com)|142.251.2.132|:443... connected.',
 'HTTP request sent, awaiting response... 200 OK',
 'Length: 88335 (86K) [application/octet-stream]',
 'Saving to: ‘validation_data_cleaned.json’',
 '',
 ''

In [None]:
!pip install torch torchvision eva-decord tqdm numpy mamba-ssm triton

Collecting eva-decord
  Downloading eva_decord-0.6.1-py3-none-manylinux2010_x86_64.whl.metadata (449 bytes)
Collecting mamba-ssm
  Downloading mamba_ssm-2.2.4.tar.gz (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.8/91.8 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Usi

In [None]:
import torch
import torch.nn as nn
import decord
import cv2
import json
import numpy as np
from typing import Dict, List, Tuple
import torchvision.transforms as T
from tqdm import tqdm

In [None]:
import os
import json
import logging
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.utils.data import Dataset, DataLoader
from torch.nn.parallel import DistributedDataParallel
from torch.optim.lr_scheduler import CosineAnnealingLR, LinearLR, ChainedScheduler
import decord
import numpy as np
from mamba_ssm import Mamba
from einops import rearrange
import torchvision.transforms as T
from tqdm.notebook import tqdm
from typing import Dict, List, Tuple
from collections import defaultdict

In [None]:
class FeatureAggregatedBiS6(nn.Module):
    """Enhanced Bi-directional S6 Block"""
    def __init__(self, dim: int, kernel_sizes: List[int] = [3,5,7], expansion: int = 2):
        super().__init__()
        self.convs = nn.ModuleList([
            nn.Sequential(
                nn.ConstantPad1d((k//2, (k-1)//2), 0),
                nn.Conv1d(dim, dim, k),
                nn.GELU()
            ) for k in kernel_sizes
        ])

        self.s6_fwd = Mamba(
            d_model=dim,
            d_state=16,
            d_conv=4,
            expand=expansion
        )
        self.s6_bwd = Mamba(
            d_model=dim,
            d_state=16,
            d_conv=4,
            expand=expansion
        )
        self.norm = nn.LayerNorm(dim)
        self.gate = nn.Parameter(torch.ones(1))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        B, C, T = x.shape
        residual = x

        # Multi-scale temporal aggregation
        conv_outs = [conv(x) for conv in self.convs]
        x = sum(conv_outs) * self.gate

        # Bi-directional processing
        x = x.permute(0, 2, 1)  # [B, T, C]
        x_fwd = self.s6_fwd(x)
        x_bwd = self.s6_bwd(x.flip(1)).flip(1)
        x = x_fwd + x_bwd

        return self.norm(x.permute(0, 2, 1) + residual)

class DualBiS6TAL(nn.Module):
    """Dual-path S6 Architecture for Temporal Action Localization"""
    def __init__(self, num_classes: int, dim: int = 128, recur_steps: int = 4):
        super().__init__()
        # Feature extractor
        self.encoder = nn.Sequential(
            nn.Conv3d(3, dim, kernel_size=(3,7,7), stride=(1,2,2), padding=(1,3,3)),
            nn.GELU(),
            nn.MaxPool3d(kernel_size=(1,3,3), stride=(1,2,2), padding=(0,1,1)),
            nn.BatchNorm3d(dim)
        )

        # Temporal modeling
        self.temporal_blocks = nn.ModuleList([
            FeatureAggregatedBiS6(dim)
            for _ in range(recur_steps)
        ])

        # Pyramid branches
        self.pyramid = nn.ModuleList([
            nn.Sequential(
                FeatureAggregatedBiS6(dim),
                nn.MaxPool1d(2, stride=2)
            ) for _ in range(4)
        ])

        # Prediction heads
        self.cls_head = nn.Sequential(
            nn.Conv1d(dim, dim//2, 3, padding=1),
            nn.GELU(),
            nn.Conv1d(dim//2, num_classes, 1)
        )

        self.reg_head = nn.Sequential(
            nn.Conv1d(dim, dim//2, 3, padding=1),
            nn.GELU(),
            nn.Conv1d(dim//2, 2, 1),
            nn.Tanh()  # Bounded regression outputs
        )

    def forward(self, x: torch.Tensor) -> Dict:
        # Initial features
        x = self.encoder(x)  # [B, C, T, H, W]
        x = x.flatten(3).mean(-1)  # [B, C, T]

        # Temporal modeling
        for block in self.temporal_blocks:
            x = block(x)

        # Multi-scale pyramid
        pyramid_features = [x]
        for branch in self.pyramid:
            x = branch(x)
            pyramid_features.append(x)

        # Merge pyramid features
        merged = torch.cat([
            nn.functional.interpolate(f, size=pyramid_features[0].shape[-1])
            for f in pyramid_features
        ], dim=1)

        return {
            'cls_logits': self.cls_head(merged),
            'reg_outputs': self.reg_head(merged)
        }

class TemporalAttention(nn.Module):
    def __init__(self, dim: int, num_heads: int = 4):
        super().__init__()
        self.num_heads = num_heads
        self.head_dim = dim // num_heads
        self.scale = self.head_dim ** -0.5
        self.qkv = nn.Linear(dim, dim * 3)
        self.proj = nn.Linear(dim, dim)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        B, C, T = x.shape
        x = x.permute(0, 2, 1)
        qkv = self.qkv(x).reshape(B, T, 3, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]
        attn = (q @ k.transpose(-2, -1)) * self.scale
        x = (attn.softmax(dim=-1) @ v).transpose(1, 2).reshape(B, T, C)
        return self.proj(x).permute(0, 2, 1)

class RefinedFeatureAggregatedBiS6(nn.Module):
    """Enhanced Bi-directional S6 Block with temporal attention"""
    def __init__(self, dim: int, kernel_sizes: List[int] = [3,5,7], expansion: int = 2):
        super().__init__()
        self.convs = nn.ModuleList([
            nn.Sequential(
                nn.ConstantPad1d((k//2, (k-1)//2), 0),
                nn.Conv1d(dim, dim, k),
                nn.GELU()
            ) for k in kernel_sizes
        ])

        self.temporal_attn = TemporalAttention(dim)

        self.s6_fwd = Mamba(
            d_model=dim,
            d_state=16,
            d_conv=4,
            expand=expansion
        )
        self.s6_bwd = Mamba(
            d_model=dim,
            d_state=16,
            d_conv=4,
            expand=expansion
        )

        self.norm1 = nn.LayerNorm(dim)
        self.norm2 = nn.LayerNorm(dim)
        self.gate = nn.Parameter(torch.ones(1))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Multi-scale temporal aggregation
        conv_outs = [conv(x) for conv in self.convs]
        x = sum(conv_outs) * self.gate

        # Temporal attention
        x = x + self.temporal_attn(x)
        x = self.norm1(x.permute(0, 2, 1)).permute(0, 2, 1)

        # Bi-directional processing
        x_p = x.permute(0, 2, 1)  # [B, T, C]
        x_fwd = self.s6_fwd(x_p)
        x_bwd = self.s6_bwd(x_p.flip(1)).flip(1)
        x = x_fwd + x_bwd
        x = self.norm2(x).permute(0, 2, 1)

        return x

class RefinedDualBiS6TAL(nn.Module):
    """Improved Dual-path S6 Architecture with enhanced temporal modeling"""
    def __init__(self, num_classes: int, dim: int = 128, recur_steps: int = 4):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv3d(3, dim, kernel_size=(3,7,7), stride=(1,2,2), padding=(1,3,3)),
            nn.GELU(),
            nn.MaxPool3d(kernel_size=(1,3,3), stride=(1,2,2), padding=(0,1,1)),
            nn.BatchNorm3d(dim)
        )

        # Temporal modeling with attention
        self.temporal_blocks = nn.ModuleList([
            RefinedFeatureAggregatedBiS6(dim)
            for _ in range(recur_steps)
        ])

        # Multi-scale pyramid with attention
        self.pyramid = nn.ModuleList([
            nn.Sequential(
                RefinedFeatureAggregatedBiS6(dim),
                nn.MaxPool1d(2, stride=2)
            ) for _ in range(4)
        ])

        # Prediction heads with confidence modeling
        self.cls_head = nn.Sequential(
            nn.Conv1d(dim * 5, dim, 1),
            nn.GELU(),
            nn.Conv1d(dim, dim//2, 3, padding=1),
            nn.GELU(),
            nn.Conv1d(dim//2, num_classes, 1)
        )

        self.reg_head = nn.Sequential(
            nn.Conv1d(dim * 5, dim, 1),
            nn.GELU(),
            nn.Conv1d(dim, dim//2, 3, padding=1),
            nn.GELU(),
            nn.Conv1d(dim//2, 2, 1),
            nn.Tanh()
        )

        self.conf_head = nn.Sequential(
            nn.Conv1d(dim * 5, dim//2, 3, padding=1),
            nn.GELU(),
            nn.Conv1d(dim//2, 1, 1),
            nn.Sigmoid()
        )

    def forward(self, x: torch.Tensor) -> Dict:
        # Initial features
        x = self.encoder(x)
        x = x.flatten(3).mean(-1)

        # Temporal modeling
        for block in self.temporal_blocks:
            x = block(x)

        # Multi-scale pyramid
        pyramid_features = [x]
        curr_feat = x
        for branch in self.pyramid:
            curr_feat = branch(curr_feat)
            pyramid_features.append(
                nn.functional.interpolate(
                    curr_feat,
                    size=x.shape[-1],
                    mode='linear'
                )
            )

        # Merge pyramid features
        merged = torch.cat(pyramid_features, dim=1)

        return {
            'cls_logits': self.cls_head(merged),
            'reg_outputs': self.reg_head(merged),
            'confidence': self.conf_head(merged)
        }

In [None]:
class VideoProcessor:
    """Handles video loading and preprocessing"""
    def __init__(self, frame_size: int = 128, clip_len: int = 8):
        self.frame_size = frame_size
        self.clip_len = clip_len
        self.transform = T.Compose([
            T.Resize((frame_size, frame_size)),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def load_video(self, video_path: str) -> Tuple[torch.Tensor, float]:
      """Load and preprocess video frames"""
      vr = decord.VideoReader(video_path)
      total_frames = len(vr)
      fps = vr.get_avg_fps()

      # Sample frames uniformly
      indices = np.linspace(0, total_frames-1, num=self.clip_len, dtype=int)
      frames = vr.get_batch(indices).asnumpy()
      frames = torch.from_numpy(frames).float() / 255.0
      frames = frames.permute(0, 3, 1, 2)  # (T, C, H, W)

      # Apply transforms
      frames = self.transform(frames)

      # Permute to match model input shape: (C, T, H, W)
      frames = frames.permute(1, 0, 2, 3)

      return frames, fps

class InferenceEngine:
    """Handles model loading and inference"""
    def __init__(self, checkpoint_path: str, class_labels_path: str, device: str = 'cuda'):
        self.device = torch.device(device if torch.cuda.is_available() else 'cpu')

        # Load class labels
        with open(class_labels_path, 'r') as f:
            self.class_to_idx = json.load(f)
            self.idx_to_class = {v: k for k, v in self.class_to_idx.items()}

        # Initialize model
        self.model = RefinedDualBiS6TAL(
            num_classes=len(self.class_to_idx),
            dim=64,
            recur_steps=2
        ).to(self.device)

        # Load checkpoint
        checkpoint = torch.load(checkpoint_path, map_location=self.device)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.model.eval()

    def predict(self, frames: torch.Tensor, confidence_threshold: float = 0.5) -> List[Dict]:
      """Run inference and post-process results"""
      with torch.no_grad():
          # Prepare input
          frames = frames.unsqueeze(0)  # Add batch dimension: (1, C, T, H, W)
          frames = frames.to(self.device)

          # Forward pass
          outputs = self.model(frames)

          # Post-process predictions
          cls_scores = torch.sigmoid(outputs['cls_logits'][0])  # Remove batch dim
          reg_outputs = outputs['reg_outputs'][0]
          confidence = outputs['confidence'][0]

          # Extract predictions above threshold
          predictions = []
          for t in range(cls_scores.shape[0]):
              for c in range(cls_scores.shape[1]):
                  if cls_scores[t, c] * confidence[t, 0] > confidence_threshold:
                      # Convert regression outputs to temporal boundaries
                      center_offset = reg_outputs[t, 0]
                      length = reg_outputs[t, 1] * 100  # Scale factor from training

                      start_time = max(0, t + center_offset - length/2)
                      end_time = min(cls_scores.shape[0], t + center_offset + length/2)

                      predictions.append({
                          'action': self.idx_to_class[c],
                          'start_frame': int(start_time),
                          'end_frame': int(end_time),
                          'confidence': float(cls_scores[t, c] * confidence[t, 0])
                      })

          return predictions

class VideoAnnotator:
    """Handles video visualization and annotation"""
    def __init__(self, output_path: str):
        self.output_path = output_path

    def annotate_video(self, video_path: str, predictions: List[Dict], fps: float):
        """Create annotated video with predictions"""
        # Open input video
        cap = cv2.VideoCapture(video_path)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        # Create output video writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(
            self.output_path,
            fourcc,
            fps,
            (width, height)
        )

        frame_idx = 0
        with tqdm(total=int(cap.get(cv2.CAP_PROP_FRAME_COUNT))) as pbar:
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break

                # Find active predictions for current frame
                active_preds = [
                    p for p in predictions
                    if p['start_frame'] <= frame_idx <= p['end_frame']
                ]

                # Draw predictions
                for pred in active_preds:
                    # Add text overlay
                    text = f"{pred['action']} ({pred['confidence']:.2f})"
                    cv2.putText(
                        frame,
                        text,
                        (50, 50),  # Position can be adjusted
                        cv2.FONT_HERSHEY_SIMPLEX,
                        1,
                        (0, 255, 0),
                        2
                    )

                out.write(frame)
                frame_idx += 1
                pbar.update(1)

        cap.release()
        out.release()

def run_inference(
    video_path: str,
    checkpoint_path: str,
    class_labels_path: str,
    output_path: str,
    confidence_threshold: float = 0.5
):
    """Main inference pipeline"""
    # Initialize components
    processor = VideoProcessor()
    engine = InferenceEngine(checkpoint_path, class_labels_path)
    annotator = VideoAnnotator(output_path)

    # Process video
    frames, fps = processor.load_video(video_path)

    # Run inference
    predictions = engine.predict(frames, confidence_threshold)

    # Create annotated video
    annotator.annotate_video(video_path, predictions, fps)

    return predictions


In [None]:
import os
import json
import torch
import torch.nn as nn
import decord
import cv2
import numpy as np
from typing import Dict, List, Tuple
import torchvision.transforms as T
from tqdm import tqdm

# Modify the VideoProcessor class in the inference code
class VideoProcessor:
    """Handles video loading and preprocessing"""
    def __init__(self, frame_size: int = 128, num_clips: int = 32, temporal_stride: int = 4):
        self.frame_size = frame_size
        self.num_clips = num_clips
        self.temporal_stride = temporal_stride
        self.transform = T.Compose([
            T.Resize((frame_size, frame_size)),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def load_video(self, video_path: str) -> Tuple[torch.Tensor, float]:
        """Load and preprocess video frames to match training setup"""
        vr = decord.VideoReader(video_path)
        total_frames = len(vr)
        fps = vr.get_avg_fps()

        # Sample frames using the same parameters as during training
        num_frames = self.num_clips * self.temporal_stride
        indices = np.linspace(0, total_frames-1, num=num_frames, dtype=int)
        frames = vr.get_batch(indices).asnumpy()
        frames = torch.from_numpy(frames).float() / 255.0
        frames = frames.permute(0, 3, 1, 2)  # (T, C, H, W)

        # Apply transforms
        frames = self.transform(frames)

        # Permute to match model input shape: (C, T, H, W)
        frames = frames.permute(1, 0, 2, 3)

        return frames, fps

class InferenceEngine:
    """Handles model loading and inference"""
    def __init__(self, checkpoint_path: str, class_labels_path: str, device: str = 'cuda'):
        self.device = torch.device(device if torch.cuda.is_available() else 'cpu')

        # Load class labels
        with open(class_labels_path, 'r') as f:
            self.class_to_idx = json.load(f)
            self.idx_to_class = {v: k for k, v in self.class_to_idx.items()}

            print(self.idx_to_class)

        # Initialize model
        self.model = RefinedDualBiS6TAL(
            num_classes=len(self.class_to_idx),
            dim=64,
            recur_steps=2
        ).to(self.device)

        # Load checkpoint
        checkpoint = torch.load(checkpoint_path, map_location=self.device)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.model.eval()

    def predict(self, frames: torch.Tensor, confidence_threshold: float = 0.5) -> List[Dict]:
        """Run inference and post-process results"""
        with torch.no_grad():
            # Prepare input
            frames = frames.unsqueeze(0).to(self.device)  # Add batch dimension

            # Forward pass
            outputs = self.model(frames)

            # Post-process predictions
            # Adjust dimensions: [C, T] -> [T, C]
            cls_scores = torch.sigmoid(outputs['cls_logits'][0].permute(1, 0))  # (T, C)
            reg_outputs = outputs['reg_outputs'][0].permute(1, 0)  # (T, 2)
            confidence = outputs['confidence'][0].permute(1, 0)  # (T, 1)

            # Extract predictions above threshold
            predictions = []
            for t in range(cls_scores.shape[0]):  # Iterate over temporal dimension
                for c in range(cls_scores.shape[1]):  # Iterate over classes
                    if cls_scores[t, c] * confidence[t, 0] > confidence_threshold:
                        # Convert regression outputs to temporal boundaries
                        center_offset = reg_outputs[t, 0]
                        length = reg_outputs[t, 1] * 100  # Scale factor from training

                        start_time = max(0, t + center_offset - length/2)
                        end_time = min(cls_scores.shape[0], t + center_offset + length/2)


                        print()

                        predictions.append({
                            'action': self.idx_to_class[int(c)],
                            'start_frame': int(start_time),
                            'end_frame': int(end_time),
                            'confidence': float(cls_scores[t, c] * confidence[t, 0])
                        })

            return predictions

class VideoAnnotator:
    """Handles video visualization and annotation"""
    def __init__(self, output_path: str):
        self.output_path = output_path

    def annotate_video(self, video_path: str, predictions: List[Dict], fps: float):
        """Create annotated video with predictions"""
        # Open input video
        cap = cv2.VideoCapture(video_path)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        # Create output video writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(
            self.output_path,
            fourcc,
            fps,
            (width, height)
        )

        frame_idx = 0
        with tqdm(total=int(cap.get(cv2.CAP_PROP_FRAME_COUNT))) as pbar:
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break

                # Find active predictions for current frame
                active_preds = [
                    p for p in predictions
                    if p['start_frame'] <= frame_idx <= p['end_frame']
                ]

                # Draw predictions
                for pred in active_preds:
                    # Add text overlay
                    text = f"{pred['action']} ({pred['confidence']:.2f})"
                    cv2.putText(
                        frame,
                        text,
                        (50, 50),  # Position can be adjusted
                        cv2.FONT_HERSHEY_SIMPLEX,
                        1,
                        (0, 255, 0),
                        2
                    )

                out.write(frame)
                frame_idx += 1
                pbar.update(1)

        cap.release()
        out.release()


def run_inference(
    video_path: str,
    checkpoint_path: str,
    class_labels_path: str,
    output_path: str,
    confidence_threshold: float = 0.5
):
    """Main inference pipeline"""
    # Initialize components
    processor = VideoProcessor()
    engine = InferenceEngine(checkpoint_path, class_labels_path)
    annotator = VideoAnnotator(output_path)

    # Process video
    frames, fps = processor.load_video(video_path)

    # Run inference
    predictions = engine.predict(frames, confidence_threshold)

    # Create annotated video
    annotator.annotate_video(video_path, predictions, fps)

    return predictions


In [None]:

predictions = run_inference(
    video_path="/content/WhatsApp Video 2025-02-16 at 18.33.40.mp4",
    checkpoint_path="/content/checkpoint_epoch_20.pth",
    class_labels_path="/content/class_labels.txt",
    output_path="annotated_video.mp4",
    confidence_threshold=0.1
)

# Print predictions
for pred in predictions:
    print(
        f"Action: {pred['action']}, "
        f"Time: {pred['start_frame']}-{pred['end_frame']}, "
        f"Confidence: {pred['confidence']:.2f}"
    )

{0: 'Archery', 1: 'Ballet', 2: 'Bathing dog', 3: 'Belly dance', 4: 'Brushing hair', 5: 'Brushing teeth', 6: 'Doing nails', 7: 'Playing guitarra', 8: 'Smoking a cigarette', 9: 'Spinning'}


  checkpoint = torch.load(checkpoint_path, map_location=self.device)










100%|██████████| 6098/6098 [00:32<00:00, 186.54it/s]

Action: Bathing dog, Time: 1--1, Confidence: 0.10
Action: Playing guitarra, Time: 1--1, Confidence: 0.13
Action: Archery, Time: 124-128, Confidence: 0.16
Action: Ballet, Time: 124-128, Confidence: 0.10
Action: Belly dance, Time: 124-128, Confidence: 0.13
Action: Spinning, Time: 124-128, Confidence: 0.11





In [None]:
predictions

[]

In [None]:
import cv2
import torch
import numpy as np
from flask import Flask, Response
from threading import Thread

# Load your model
model = torch.load("checkpoint_epoch_20.pth", map_location=torch.device('cpu'))
model.eval()

def process_frame(frame):
    """Preprocess frame and perform inference."""
    frame = cv2.resize(frame, (224, 224))  # Resize to match model input
    frame_tensor = torch.tensor(frame, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0) / 255.0
    with torch.no_grad():
        output = model(frame_tensor)
    return output.cpu().numpy()

# Flask app for video streaming
app = Flask(__name__)
capture = cv2.VideoCapture(0)  # Use default laptop camera
external_camera = cv2.VideoCapture(1)  # Attempt to connect to an external camera

def generate_frames(source):
    while True:
        success, frame = source.read()
        if not success:
            break

        # Process frame through the model
        result = process_frame(frame)

        # Overlay result on frame
        cv2.putText(frame, f"Prediction: {result}", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        _, buffer = cv2.imencode('.jpg', frame)
        frame_bytes = buffer.tobytes()
        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')

@app.route('/video_feed/laptop')
def video_feed_laptop():
    return Response(generate_frames(capture), mimetype='multipart/x-mixed-replace; boundary=frame')

@app.route('/video_feed/external')
def video_feed_external():
    return Response(generate_frames(external_camera), mimetype='multipart/x-mixed-replace; boundary=frame')

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000, debug=True, threaded=True)
