In [1]:
!pip install mediapipe opencv-python-headless tqdm torch cupy-cuda11x

Collecting mediapipe
  Downloading mediapipe-0.10.18-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
Collecting cupy-cuda11x
  Downloading cupy_cuda11x-13.3.0-cp310-cp310-manylinux2014_x86_64.whl.metadata (2.7 kB)
Collecting protobuf<5,>=4.25.3 (from mediapipe)
  Downloading protobuf-4.25.5-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.1-py3-none-any.whl.metadata (1.4 kB)
Downloading mediapipe-0.10.18-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (36.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m36.1/36.1 MB[0m [31m56.5 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hDownloading cupy_cuda11x-13.3.0-cp310-cp310-manylinux2014_x86_64.whl (96.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m96.6/96.6 MB[0m [31m18.5 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hDownloading protobuf-4.25.5-cp37-abi3-manyl

In [6]:
!pip uninstall cupy cupy-cuda11x -y

  pid, fd = os.forkpty()


Found existing installation: cupy 13.3.0
Uninstalling cupy-13.3.0:
  Successfully uninstalled cupy-13.3.0
Found existing installation: cupy-cuda11x 13.3.0
Uninstalling cupy-cuda11x-13.3.0:
  Successfully uninstalled cupy-cuda11x-13.3.0


In [7]:
!pip install cupy-cuda11x

Collecting cupy-cuda11x
  Using cached cupy_cuda11x-13.3.0-cp310-cp310-manylinux2014_x86_64.whl.metadata (2.7 kB)
Using cached cupy_cuda11x-13.3.0-cp310-cp310-manylinux2014_x86_64.whl (96.6 MB)
Installing collected packages: cupy-cuda11x
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
cudf 24.10.1 requires cubinlinker, which is not installed.
cudf 24.10.1 requires libcudf==24.10.*, which is not installed.
cudf 24.10.1 requires ptxcompiler, which is not installed.
cuml 24.10.0 requires cuvs==24.10.*, which is not installed.
cuml 24.10.0 requires nvidia-cublas, which is not installed.
cuml 24.10.0 requires nvidia-cufft, which is not installed.
cuml 24.10.0 requires nvidia-curand, which is not installed.
cuml 24.10.0 requires nvidia-cusolver, which is not installed.
cuml 24.10.0 requires nvidia-cusparse, which is not installed.
cudf 24.10.1 requires cuda-pyth

In [4]:
import cv2
import mediapipe as mp
import numpy as np
import os
from tqdm import tqdm

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)

def preprocess_video(video_path, output_folder):
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    preprocessed_data = []
    
    for frame_idx in tqdm(range(frame_count)):
        ret, frame = cap.read()
        if not ret:
            break
        
        # Convert the image to RGB and process it with MediaPipe
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)
        
        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark
            
            # Extract relevant landmarks (e.g., shoulders, elbows, wrists)
            left_shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x,
                             landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y]
            right_shoulder = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                              landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
            left_elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x,
                          landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y]
            right_elbow = [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x,
                           landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y]
            left_wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x,
                          landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y]
            right_wrist = [landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].x,
                           landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].y]
            
            # Calculate angles
            left_arm_angle = calculate_angle(left_shoulder, left_elbow, left_wrist)
            right_arm_angle = calculate_angle(right_shoulder, right_elbow, right_wrist)
            
            preprocessed_data.append({
                'frame': frame_idx,
                'left_arm_angle': left_arm_angle,
                'right_arm_angle': right_arm_angle,
                'landmarks': [(landmark.x, landmark.y, landmark.z) for landmark in landmarks]
            })
    
    cap.release()
    
    # Save preprocessed data
    np.save(os.path.join(output_folder, f"{os.path.basename(video_path)}_preprocessed.npy"), preprocessed_data)

def calculate_angle(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    if angle > 180.0:
        angle = 360 - angle
    return float(angle)

# Main preprocessing loop
input_folder = '/kaggle/input/pushups-all2/pushups_all_2'
output_folder = '/kaggle/working/preprocessed_data'
os.makedirs(output_folder, exist_ok=True)

for video_file in os.listdir(input_folder):
    if video_file.endswith('.mp4'):
        video_path = os.path.join(input_folder, video_file)
        preprocess_video(video_path, output_folder)

print("Preprocessing complete!")

  0%|          | 0/212 [00:00<?, ?it/s]W0000 00:00:1733225585.603161     115 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1733225585.660109     115 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1733225585.683148     114 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.
100%|██████████| 212/212 [00:05<00:00, 40.26it/s]
100%|██████████| 58/58 [00:01<00:00, 43.28it/s]
100%|██████████| 172/172 [00:03<00:00, 43.22it/s]
100%|██████████| 113/113 [00:02<00:00, 38.49it/s]
100%|██████████| 83/83 [00:02<00:00, 40.69it/s]
100%|██████████| 148/148 [00:03<00:00, 42.05it/s]
100%|██████████| 156/156 [00:03<00:00, 41.37it/s]
100%|██████████| 75/75 [00:01<00:00, 41.19it

Preprocessing complete!





In [5]:
!pwd


  pid, fd = os.forkpty()


/kaggle/working


In [6]:
!ls -ltra ./preprocessed_data

total 12248
drwxr-xr-x 4 root root   4096 Dec  3 11:32 ..
-rw-r--r-- 1 root root 239830 Dec  3 11:33 wrong28.mp4_preprocessed.npy
-rw-r--r-- 1 root root  65348 Dec  3 11:33 correct40.mp4_preprocessed.npy
-rw-r--r-- 1 root root 194510 Dec  3 11:33 wrong36.mp4_preprocessed.npy
-rw-r--r-- 1 root root 127663 Dec  3 11:33 wrong46.mp4_preprocessed.npy
-rw-r--r-- 1 root root  93673 Dec  3 11:33 correct34.mp4_preprocessed.npy
-rw-r--r-- 1 root root 167318 Dec  3 11:33 correct28.mp4_preprocessed.npy
-rw-r--r-- 1 root root 176382 Dec  3 11:33 correct32.mp4_preprocessed.npy
-rw-r--r-- 1 root root  84609 Dec  3 11:33 correct6.mp4_preprocessed.npy
-rw-r--r-- 1 root root 105003 Dec  3 11:33 wrong13.mp4_preprocessed.npy
-rw-r--r-- 1 root root  89141 Dec  3 11:33 correct4.mp4_preprocessed.npy
-rw-r--r-- 1 root root  67614 Dec  3 11:33 correct39.mp4_preprocessed.npy
-rw-r--r-- 1 root root  84609 Dec  3 11:33 correct49.mp4_preprocessed.npy
-rw-r--r-- 1 root root  77811 Dec  3 11:33 correct30.mp4_preproc

In [7]:
import numpy as np

# Specify the path to your .npy file
file_path = '/kaggle/working/preprocessed_data/wrong32.mp4_preprocessed.npy'

# Load the data from the .npy file
data = np.load(file_path, allow_pickle=True)

# Print the type of data to understand its structure
print(type(data))
print(data.shape)
print(len(data))
# If it's a list of dictionaries, you can inspect individual elements
print(data[0])  # Print the first element to see its structure

# Optionally, print more elements or specific keys if needed
for item in data[:5]:  # View the first 5 entries
    print(item)

<class 'numpy.ndarray'>
(65,)
65
{'frame': 1, 'left_arm_angle': 168.8868973843826, 'right_arm_angle': 169.29243989846563, 'landmarks': [(0.8566663861274719, 0.37731680274009705, -0.09211250394582748), (0.866101861000061, 0.35935208201408386, -0.07533276826143265), (0.8658832311630249, 0.3563706576824188, -0.07546903938055038), (0.8658827543258667, 0.3526790738105774, -0.07558848708868027), (0.8654646873474121, 0.35698917508125305, -0.12583020329475403), (0.8646005392074585, 0.3527193069458008, -0.1258518546819687), (0.8639950752258301, 0.3477899134159088, -0.12584808468818665), (0.8498237729072571, 0.31134480237960815, 0.0257448460906744), (0.8488703966140747, 0.30833902955055237, -0.20123735070228577), (0.837600827217102, 0.3745482265949249, -0.04886886849999428), (0.8362727165222168, 0.36985698342323303, -0.11491266638040543), (0.7276058197021484, 0.33265307545661926, 0.15302757918834686), (0.7189415097236633, 0.33552035689353943, -0.27987566590309143), (0.698556661605835, 0.57811665

# 3d-Resnet

# Model Training

In [40]:
input_folder = '/kaggle/working/preprocessed_data'
sequence_length = 100
batch_size = 16
num_epochs = 50
learning_rate = 0.001
weight_decay = 1e-5
print("Done")

Done


In [52]:
import os
import time
from typing import List, Tuple

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, Dataset


# Dataset Class
class PushupDataset(Dataset):
    def __init__(self, data: List[np.ndarray], labels: List[int], sequence_length: int = 100):
        """
        Initialize the PushupDataset.

        Args:
            data: List of numpy arrays containing sequence data
            labels: List of labels (0 for incorrect, 1 for correct)
            sequence_length: Target sequence length for padding/truncating
        """
        self.data = data
        self.labels = labels
        self.sequence_length = sequence_length

    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        sequence = self.data[idx]
        label = self.labels[idx]

        # Pad or truncate sequence
        if len(sequence) < self.sequence_length:
            pad_length = self.sequence_length - len(sequence)
            sequence = np.pad(sequence, ((0, pad_length), (0, 0)), mode='constant')
        elif len(sequence) > self.sequence_length:
            sequence = sequence[:self.sequence_length]

        # Permute the sequence dimensions to match the model's expected input
        sequence = torch.FloatTensor(sequence).permute(1, 0)  # Shape: [num_features, sequence_length]
        return sequence, torch.LongTensor([label])


# Model Classes
class Conv3DBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3,
                 stride: int = 1, padding: int = 1):
        super().__init__()
        self.conv = nn.Conv3d(in_channels, out_channels, kernel_size, stride, padding)
        self.bn = nn.BatchNorm3d(out_channels)
        self.relu = nn.ReLU(inplace=False)  # Ensure inplace=False

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.relu(self.bn(self.conv(x)))


class ResidualBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int):
        super().__init__()
        self.conv1 = Conv3DBlock(in_channels, out_channels)
        self.conv2 = Conv3DBlock(out_channels, out_channels)
        self.downsample = None
        if in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv3d(in_channels, out_channels, kernel_size=1, stride=1),
                nn.BatchNorm3d(out_channels)
            )
        self.relu = nn.ReLU(inplace=False)  # Ensure inplace=False

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        identity = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample is not None:
            identity = self.downsample(x)
        out = out + identity  # Change from in-place addition
        return self.relu(out)


class ResNet3D(nn.Module):
    def __init__(self, block: nn.Module, layers: List[int], num_classes: int = 2,
                 input_channels: int = 8):
        super().__init__()
        self.in_channels = 64
        self.conv1 = Conv3DBlock(input_channels, 64, kernel_size=7, stride=2, padding=3)
        self.maxpool = nn.MaxPool3d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool3d((1, 1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block: nn.Module, out_channels: int, blocks: int,
                    stride: int = 1) -> nn.Sequential:
        layers = []
        layers.append(block(self.in_channels, out_channels))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x shape: [batch_size, channels, sequence_length]
        x = x.unsqueeze(2).unsqueeze(-1)  # Shape: [batch_size, channels, 1, sequence_length, 1]
        x = self.conv1(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


def resnet3d18(num_classes: int = 2, input_channels: int = 8) -> ResNet3D:
    return ResNet3D(ResidualBlock, [2, 2, 2, 2], num_classes, input_channels)


# Training Function
def train_model(model: nn.Module, train_loader: DataLoader, val_loader: DataLoader,
                num_epochs: int, learning_rate: float, weight_decay: float,
                device: torch.device = None) -> nn.Module:
    """
    Train the model.

    Args:
        model: The ResNet3D model to train
        train_loader: DataLoader for training data
        val_loader: DataLoader for validation data
        num_epochs: Number of epochs to train
        learning_rate: Initial learning rate
        weight_decay: Weight decay for optimizer
        device: Device to train on (default: None, will use cuda if available)

    Returns:
        Trained model
    """
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}", flush=True)

    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)

    best_val_loss = float('inf')
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0

        for batch_data, batch_labels in train_loader:
            batch_data = batch_data.to(device)
            batch_labels = batch_labels.squeeze().to(device)

            optimizer.zero_grad()
            outputs = model(batch_data)
            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_total += batch_labels.size(0)
            train_correct += predicted.eq(batch_labels).sum().item()

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for batch_data, batch_labels in val_loader:
                batch_data = batch_data.to(device)
                batch_labels = batch_labels.squeeze().to(device)

                outputs = model(batch_data)
                loss = criterion(outputs, batch_labels)
                val_loss += loss.item()

                _, predicted = outputs.max(1)
                val_total += batch_labels.size(0)
                val_correct += predicted.eq(batch_labels).sum().item()

        # Calculate metrics
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        train_accuracy = 100. * train_correct / train_total
        val_accuracy = 100. * val_correct / val_total

        print(f'Epoch [{epoch+1}/{num_epochs}]', flush=True)
        print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%', flush=True)
        print(f'Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%', flush=True)

        # Learning rate scheduling
        scheduler.step(val_loss)

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_loss': val_loss,
            }, 'best_model.pth')
            print("Saved new best model", flush=True)

        print('-' * 60, flush=True)

    print('Training completed.', flush=True)
    return model


# Feature Extraction Function
def extract_features(frame: dict) -> List[float]:
    """Extract features from a single frame"""
    return [
        frame['left_arm_angle'],
        frame['right_arm_angle'],
        frame['landmarks'][11][1],  # LEFT_SHOULDER y-coordinate
        frame['landmarks'][12][1],  # RIGHT_SHOULDER y-coordinate
        frame['landmarks'][13][1],  # LEFT_ELBOW y-coordinate
        frame['landmarks'][14][1],  # RIGHT_ELBOW y-coordinate
        frame['landmarks'][15][1],  # LEFT_WRIST y-coordinate
        frame['landmarks'][16][1]   # RIGHT_WRIST y-coordinate
    ]


# Data Preparation Function
def prepare_data(input_folder: str, sequence_length: int = 100,
                 batch_size: int = 16) -> Tuple[DataLoader, DataLoader]:
    """
    Prepare data for training and validation with progress tracking and optimizations.
    """
    start_time = time.time()
    all_data = []
    all_labels = []
    total_files = len([f for f in os.listdir(input_folder) if f.endswith('_preprocessed.npy')])
    processed_files = 0

    print(f"Starting to process {total_files} files...", flush=True)

    for file in os.listdir(input_folder):
        if file.endswith('_preprocessed.npy'):
            try:
                print(f"Processing file {processed_files + 1}/{total_files}: {file}", flush=True)
                file_start_time = time.time()

                # Load data
                data = np.load(os.path.join(input_folder, file), allow_pickle=True)

                # Pre-allocate features array for better performance
                features = np.zeros((len(data), 8))

                # Extract features
                for i, frame in enumerate(data):
                    features[i] = extract_features(frame)

                all_data.append(features)
                label = 1 if "correct" in file.lower() else 0
                all_labels.append(label)

                processed_files += 1
                file_time = time.time() - file_start_time
                print(f"Processed {file} in {file_time:.2f} seconds", flush=True)

            except Exception as e:
                print(f"Error processing file {file}: {str(e)}", flush=True)
                continue

    if not all_data:
        raise ValueError("No valid data files found in the input folder")

    print("\nNormalizing data...", flush=True)

    # Optimize normalization by processing all sequences at once
    all_data_flat = np.vstack([np.array(seq) for seq in all_data])
    scaler = StandardScaler()
    all_data_normalized_flat = scaler.fit_transform(all_data_flat)

    # Reshape back to original structure
    cumulative_lengths = np.cumsum([len(seq) for seq in all_data])
    all_data_normalized = np.split(all_data_normalized_flat, cumulative_lengths[:-1])

    # Standardize sequence lengths
    all_data_normalized = [
        seq[:sequence_length] if len(seq) > sequence_length
        else np.pad(seq, ((0, sequence_length - len(seq)), (0, 0)), mode='constant')
        for seq in all_data_normalized
    ]

    print("Splitting data into train and validation sets...", flush=True)
    X_train, X_val, y_train, y_val = train_test_split(
        all_data_normalized, all_labels, test_size=0.2, random_state=42
    )

    # Create datasets and data loaders
    train_dataset = PushupDataset(X_train, y_train, sequence_length)
    val_dataset = PushupDataset(X_val, y_val, sequence_length)

    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,  # Set to 0 to avoid issues with multiprocessing
        pin_memory=True
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=0,
        pin_memory=True
    )

    total_time = time.time() - start_time
    print(f"\nData preparation completed in {total_time:.2f} seconds!")
    print(f"Number of training batches: {len(train_loader)}")
    print(f"Number of validation batches: {len(val_loader)}")

    return train_loader, val_loader


# Main Execution
if __name__ == "__main__":
    # Hyperparameters
    input_folder = "/kaggle/working/preprocessed_data"  # Replace with your data folder path
    sequence_length = 100
    batch_size = 16
    num_epochs = 25
    learning_rate = 0.001
    weight_decay = 1e-4

    # Prepare data
    train_loader, val_loader = prepare_data(
        input_folder=input_folder,
        sequence_length=sequence_length,
        batch_size=batch_size
    )

    # Initialize model
    model = resnet3d18(num_classes=2, input_channels=8)

    # Start training
    print("Starting training...", flush=True)
    trained_model = train_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        num_epochs=num_epochs,
        learning_rate=learning_rate,
        weight_decay=weight_decay
    )
    print("Training completed.", flush=True)


Starting to process 100 files...
Processing file 1/100: wrong11.mp4_preprocessed.npy
Processed wrong11.mp4_preprocessed.npy in 0.00 seconds
Processing file 2/100: wrong24.mp4_preprocessed.npy
Processed wrong24.mp4_preprocessed.npy in 0.00 seconds
Processing file 3/100: wrong49.mp4_preprocessed.npy
Processed wrong49.mp4_preprocessed.npy in 0.00 seconds
Processing file 4/100: wrong2.mp4_preprocessed.npy
Processed wrong2.mp4_preprocessed.npy in 0.00 seconds
Processing file 5/100: wrong47.mp4_preprocessed.npy
Processed wrong47.mp4_preprocessed.npy in 0.00 seconds
Processing file 6/100: correct18.mp4_preprocessed.npy
Processed correct18.mp4_preprocessed.npy in 0.00 seconds
Processing file 7/100: wrong5.mp4_preprocessed.npy
Processed wrong5.mp4_preprocessed.npy in 0.00 seconds
Processing file 8/100: correct20.mp4_preprocessed.npy
Processed correct20.mp4_preprocessed.npy in 0.00 seconds
Processing file 9/100: correct41.mp4_preprocessed.npy
Processed correct41.mp4_preprocessed.npy in 0.00 seco

In [53]:
import os
import time
from typing import List, Tuple

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, Dataset
import joblib  # Import joblib to save the scaler


# Dataset Class
class PushupDataset(Dataset):
    def __init__(self, data: List[np.ndarray], labels: List[int], sequence_length: int = 100):
        """
        Initialize the PushupDataset.

        Args:
            data: List of numpy arrays containing sequence data
            labels: List of labels (0 for incorrect, 1 for correct)
            sequence_length: Target sequence length for padding/truncating
        """
        self.data = data
        self.labels = labels
        self.sequence_length = sequence_length

    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        sequence = self.data[idx]
        label = self.labels[idx]

        # Pad or truncate sequence
        if len(sequence) < self.sequence_length:
            pad_length = self.sequence_length - len(sequence)
            sequence = np.pad(sequence, ((0, pad_length), (0, 0)), mode='constant')
        elif len(sequence) > self.sequence_length:
            sequence = sequence[:self.sequence_length]

        # Permute the sequence dimensions to match the model's expected input
        sequence = torch.FloatTensor(sequence).permute(1, 0)  # Shape: [num_features, sequence_length]
        return sequence, torch.LongTensor([label])


# Model Classes
class Conv3DBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int, kernel_size: int = 3,
                 stride: int = 1, padding: int = 1):
        super().__init__()
        self.conv = nn.Conv3d(in_channels, out_channels, kernel_size, stride, padding)
        self.bn = nn.BatchNorm3d(out_channels)
        self.relu = nn.ReLU(inplace=False)  # Ensure inplace=False

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.relu(self.bn(self.conv(x)))


class ResidualBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int):
        super().__init__()
        self.conv1 = Conv3DBlock(in_channels, out_channels)
        self.conv2 = Conv3DBlock(out_channels, out_channels)
        self.downsample = None
        if in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv3d(in_channels, out_channels, kernel_size=1, stride=1),
                nn.BatchNorm3d(out_channels)
            )
        self.relu = nn.ReLU(inplace=False)  # Ensure inplace=False

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        identity = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample is not None:
            identity = self.downsample(x)
        out = out + identity  # Change from in-place addition
        return self.relu(out)


class ResNet3D(nn.Module):
    def __init__(self, block: nn.Module, layers: List[int], num_classes: int = 2,
                 input_channels: int = 8):
        super().__init__()
        self.in_channels = 64
        self.conv1 = Conv3DBlock(input_channels, 64, kernel_size=7, stride=2, padding=3)
        self.maxpool = nn.MaxPool3d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool3d((1, 1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block: nn.Module, out_channels: int, blocks: int,
                    stride: int = 1) -> nn.Sequential:
        layers = []
        layers.append(block(self.in_channels, out_channels))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x shape: [batch_size, channels, sequence_length]
        x = x.unsqueeze(2).unsqueeze(-1)  # Shape: [batch_size, channels, 1, sequence_length, 1]
        x = self.conv1(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


def resnet3d18(num_classes: int = 2, input_channels: int = 8) -> ResNet3D:
    return ResNet3D(ResidualBlock, [2, 2, 2, 2], num_classes, input_channels)


# Training Function
def train_model(model: nn.Module, train_loader: DataLoader, val_loader: DataLoader,
                num_epochs: int, learning_rate: float, weight_decay: float,
                device: torch.device = None) -> nn.Module:
    """
    Train the model.

    Args:
        model: The ResNet3D model to train
        train_loader: DataLoader for training data
        val_loader: DataLoader for validation data
        num_epochs: Number of epochs to train
        learning_rate: Initial learning rate
        weight_decay: Weight decay for optimizer
        device: Device to train on (default: None, will use cuda if available)

    Returns:
        Trained model
    """
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}", flush=True)

    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)

    best_val_loss = float('inf')
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0

        for batch_data, batch_labels in train_loader:
            batch_data = batch_data.to(device)
            batch_labels = batch_labels.squeeze().to(device)

            optimizer.zero_grad()
            outputs = model(batch_data)
            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_total += batch_labels.size(0)
            train_correct += predicted.eq(batch_labels).sum().item()

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for batch_data, batch_labels in val_loader:
                batch_data = batch_data.to(device)
                batch_labels = batch_labels.squeeze().to(device)

                outputs = model(batch_data)
                loss = criterion(outputs, batch_labels)
                val_loss += loss.item()

                _, predicted = outputs.max(1)
                val_total += batch_labels.size(0)
                val_correct += predicted.eq(batch_labels).sum().item()

        # Calculate metrics
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        train_accuracy = 100. * train_correct / train_total
        val_accuracy = 100. * val_correct / val_total

        print(f'Epoch [{epoch+1}/{num_epochs}]', flush=True)
        print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%', flush=True)
        print(f'Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%', flush=True)

        # Learning rate scheduling
        scheduler.step(val_loss)

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_loss': val_loss,
            }, 'best_model.pth')
            print("Saved new best model", flush=True)

        print('-' * 60, flush=True)

    print('Training completed.', flush=True)

    # Save the final trained model
    torch.save(model.state_dict(), 'final_trained_model.pth')
    print("Final trained model saved as 'final_trained_model.pth'", flush=True)

    return model


# Feature Extraction Function
def extract_features(frame: dict) -> List[float]:
    """Extract features from a single frame"""
    return [
        frame['left_arm_angle'],
        frame['right_arm_angle'],
        frame['landmarks'][11][1],  # LEFT_SHOULDER y-coordinate
        frame['landmarks'][12][1],  # RIGHT_SHOULDER y-coordinate
        frame['landmarks'][13][1],  # LEFT_ELBOW y-coordinate
        frame['landmarks'][14][1],  # RIGHT_ELBOW y-coordinate
        frame['landmarks'][15][1],  # LEFT_WRIST y-coordinate
        frame['landmarks'][16][1]   # RIGHT_WRIST y-coordinate
    ]


# Data Preparation Function
def prepare_data(input_folder: str, sequence_length: int = 100,
                 batch_size: int = 16) -> Tuple[DataLoader, DataLoader]:
    """
    Prepare data for training and validation with progress tracking and optimizations.
    """
    start_time = time.time()
    all_data = []
    all_labels = []
    total_files = len([f for f in os.listdir(input_folder) if f.endswith('_preprocessed.npy')])
    processed_files = 0

    print(f"Starting to process {total_files} files...", flush=True)

    for file in os.listdir(input_folder):
        if file.endswith('_preprocessed.npy'):
            try:
                print(f"Processing file {processed_files + 1}/{total_files}: {file}", flush=True)
                file_start_time = time.time()

                # Load data
                data = np.load(os.path.join(input_folder, file), allow_pickle=True)

                # Pre-allocate features array for better performance
                features = np.zeros((len(data), 8))

                # Extract features
                for i, frame in enumerate(data):
                    features[i] = extract_features(frame)

                all_data.append(features)
                label = 1 if "correct" in file.lower() else 0
                all_labels.append(label)

                processed_files += 1
                file_time = time.time() - file_start_time
                print(f"Processed {file} in {file_time:.2f} seconds", flush=True)

            except Exception as e:
                print(f"Error processing file {file}: {str(e)}", flush=True)
                continue

    if not all_data:
        raise ValueError("No valid data files found in the input folder")

    print("\nNormalizing data...", flush=True)

    # Optimize normalization by processing all sequences at once
    all_data_flat = np.vstack([np.array(seq) for seq in all_data])
    scaler = StandardScaler()
    all_data_normalized_flat = scaler.fit_transform(all_data_flat)

    # Save the scaler for later use
    joblib.dump(scaler, 'scaler.pkl')
    print("Scaler saved as 'scaler.pkl'", flush=True)

    # Reshape back to original structure
    cumulative_lengths = np.cumsum([len(seq) for seq in all_data])
    all_data_normalized = np.split(all_data_normalized_flat, cumulative_lengths[:-1])

    # Standardize sequence lengths
    all_data_normalized = [
        seq[:sequence_length] if len(seq) > sequence_length
        else np.pad(seq, ((0, sequence_length - len(seq)), (0, 0)), mode='constant')
        for seq in all_data_normalized
    ]

    print("Splitting data into train and validation sets...", flush=True)
    X_train, X_val, y_train, y_val = train_test_split(
        all_data_normalized, all_labels, test_size=0.2, random_state=42
    )

    # Create datasets and data loaders
    train_dataset = PushupDataset(X_train, y_train, sequence_length)
    val_dataset = PushupDataset(X_val, y_val, sequence_length)

    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,  # Set to 0 to avoid issues with multiprocessing
        pin_memory=True
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=0,
        pin_memory=True
    )

    total_time = time.time() - start_time
    print(f"\nData preparation completed in {total_time:.2f} seconds!")
    print(f"Number of training batches: {len(train_loader)}")
    print(f"Number of validation batches: {len(val_loader)}")

    return train_loader, val_loader


# Main Execution
if __name__ == "__main__":
    # Hyperparameters
    input_folder = "/kaggle/working/preprocessed_data"  # Replace with your data folder path
    sequence_length = 100
    batch_size = 16
    num_epochs = 25
    learning_rate = 0.001
    weight_decay = 1e-4

    # Prepare data
    train_loader, val_loader = prepare_data(
        input_folder=input_folder,
        sequence_length=sequence_length,
        batch_size=batch_size
    )

    # Initialize model
    model = resnet3d18(num_classes=2, input_channels=8)

    # Start training
    print("Starting training...", flush=True)
    trained_model = train_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        num_epochs=num_epochs,
        learning_rate=learning_rate,
        weight_decay=weight_decay
    )
    print("Training completed.", flush=True)


Starting to process 100 files...
Processing file 1/100: wrong11.mp4_preprocessed.npy
Processed wrong11.mp4_preprocessed.npy in 0.00 seconds
Processing file 2/100: wrong24.mp4_preprocessed.npy
Processed wrong24.mp4_preprocessed.npy in 0.00 seconds
Processing file 3/100: wrong49.mp4_preprocessed.npy
Processed wrong49.mp4_preprocessed.npy in 0.00 seconds
Processing file 4/100: wrong2.mp4_preprocessed.npy
Processed wrong2.mp4_preprocessed.npy in 0.00 seconds
Processing file 5/100: wrong47.mp4_preprocessed.npy
Processed wrong47.mp4_preprocessed.npy in 0.00 seconds
Processing file 6/100: correct18.mp4_preprocessed.npy
Processed correct18.mp4_preprocessed.npy in 0.00 seconds
Processing file 7/100: wrong5.mp4_preprocessed.npy
Processed wrong5.mp4_preprocessed.npy in 0.00 seconds
Processing file 8/100: correct20.mp4_preprocessed.npy
Processed correct20.mp4_preprocessed.npy in 0.00 seconds
Processing file 9/100: correct41.mp4_preprocessed.npy
Processed correct41.mp4_preprocessed.npy in 0.00 seco

In [54]:
!cd /
!ls -ltra

  pid, fd = os.forkpty()


total 520884
drwxr-xr-x 5 root root      4096 Dec  3 11:30 ..
drwxr-xr-x 2 root root      4096 Dec  3 11:30 .virtual_documents
drwxr-xr-x 2 root root      4096 Dec  3 11:37 preprocessed_data
-rw-r--r-- 1 root root       807 Dec  3 13:41 scaler.pkl
-rw-r--r-- 1 root root 399991746 Dec  3 13:41 best_model.pth
drwxr-xr-x 4 root root      4096 Dec  3 13:41 .
-rw-r--r-- 1 root root 133366830 Dec  3 13:41 final_trained_model.pth


# Testing

In [55]:
# Import necessary libraries
import os
import cv2
import mediapipe as mp
import numpy as np
import torch
import torch.nn as nn
from sklearn.preprocessing import StandardScaler
import joblib
from tqdm import tqdm

# Define the model architecture (must match the training architecture)
class Conv3DBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3,
                 stride=1, padding=1):
        super().__init__()
        self.conv = nn.Conv3d(in_channels, out_channels, kernel_size, stride, padding)
        self.bn = nn.BatchNorm3d(out_channels)
        self.relu = nn.ReLU(inplace=False)

    def forward(self, x):
        return self.relu(self.bn(self.conv(x)))

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv1 = Conv3DBlock(in_channels, out_channels)
        self.conv2 = Conv3DBlock(out_channels, out_channels)
        self.downsample = None
        if in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv3d(in_channels, out_channels, kernel_size=1, stride=1),
                nn.BatchNorm3d(out_channels)
            )
        self.relu = nn.ReLU(inplace=False)

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample is not None:
            identity = self.downsample(x)
        out = out + identity
        return self.relu(out)

class ResNet3D(nn.Module):
    def __init__(self, block, layers, num_classes=2, input_channels=8):
        super().__init__()
        self.in_channels = 64
        self.conv1 = Conv3DBlock(input_channels, 64, kernel_size=7, stride=2, padding=3)
        self.maxpool = nn.MaxPool3d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool3d((1, 1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        layers = []
        layers.append(block(self.in_channels, out_channels))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = x.unsqueeze(2).unsqueeze(-1)  # Shape: [batch_size, channels, 1, sequence_length, 1]
        x = self.conv1(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

def resnet3d18(num_classes=2, input_channels=8):
    return ResNet3D(ResidualBlock, [2, 2, 2, 2], num_classes, input_channels)

# Load the trained model
model = resnet3d18(num_classes=2, input_channels=8)
model.load_state_dict(torch.load('final_trained_model.pth', map_location=torch.device('cpu')))
model.eval()

# Load the scaler
scaler = joblib.load('scaler.pkl')

# Initialize MediaPipe Pose for pose estimation
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False,
                    min_detection_confidence=0.5,
                    min_tracking_confidence=0.5)

def calculate_angle(a, b, c):
    """Calculate the angle between three points."""
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - \
              np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    if angle > 180.0:
        angle = 360 - angle
    return float(angle)

def extract_features(frame):
    """Extract features from a single frame."""
    return [
        frame['left_arm_angle'],
        frame['right_arm_angle'],
        frame['landmarks'][11][1],  # LEFT_SHOULDER y-coordinate
        frame['landmarks'][12][1],  # RIGHT_SHOULDER y-coordinate
        frame['landmarks'][13][1],  # LEFT_ELBOW y-coordinate
        frame['landmarks'][14][1],  # RIGHT_ELBOW y-coordinate
        frame['landmarks'][15][1],  # LEFT_WRIST y-coordinate
        frame['landmarks'][16][1]   # RIGHT_WRIST y-coordinate
    ]

def preprocess_video(video_path, sequence_length=100):
    """Preprocess the video and extract features."""
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    preprocessed_data = []
    frames = []  # Store frames for visualization

    for frame_idx in tqdm(range(frame_count), desc="Processing Video"):
        ret, frame = cap.read()
        if not ret:
            break

        frames.append(frame.copy())  # Save the frame for later visualization

        # Convert the image to RGB and process it with MediaPipe
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)

        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark

            # Extract relevant landmarks (e.g., shoulders, elbows, wrists)
            left_shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x,
                             landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y]
            right_shoulder = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                              landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
            left_elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x,
                          landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y]
            right_elbow = [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x,
                           landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y]
            left_wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x,
                          landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y]
            right_wrist = [landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].x,
                           landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].y]

            # Calculate angles
            left_arm_angle = calculate_angle(left_shoulder, left_elbow, left_wrist)
            right_arm_angle = calculate_angle(right_shoulder, right_elbow, right_wrist)

            preprocessed_data.append({
                'frame': frame_idx,
                'left_arm_angle': left_arm_angle,
                'right_arm_angle': right_arm_angle,
                'landmarks': [(landmark.x, landmark.y, landmark.z) for landmark in landmarks]
            })
        else:
            # If no landmarks detected, append zeros (or handle accordingly)
            preprocessed_data.append({
                'frame': frame_idx,
                'left_arm_angle': 0.0,
                'right_arm_angle': 0.0,
                'landmarks': [(0.0, 0.0, 0.0)] * 33  # Assuming 33 landmarks
            })

    cap.release()
    return preprocessed_data, frames

def visualize_results(frames, predicted_class, confidence):
    """Overlay the prediction on video frames and save the result."""
    # Define the label and color based on the prediction
    if predicted_class == 1:
        label = f'Correct Form ({confidence * 100:.2f}%)'
        color = (0, 255, 0)  # Green for correct
    else:
        label = f'Incorrect Form ({confidence * 100:.2f}%)'
        color = (0, 0, 255)  # Red for incorrect

    # Initialize video writer
    height, width, _ = frames[0].shape
    output_path = '/kaggle/working/output_video.mp4'
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'),
                          20, (width, height))

    for frame in frames:
        # Overlay text on the frame
        cv2.putText(frame, label, (50, 50), cv2.FONT_HERSHEY_SIMPLEX,
                    1.5, color, 3)
        out.write(frame)

    out.release()
    print(f"Visualization saved as {output_path}")

def main():
    # Path to your new video file
    video_path = '/kaggle/input/test-pushup/pushup_002.mp4'  # Update this path

    # Preprocess the video and extract features
    preprocessed_data, frames = preprocess_video(video_path)

    # Extract features from preprocessed data
    features = np.zeros((len(preprocessed_data), 8))
    for i, frame in enumerate(preprocessed_data):
        features[i] = extract_features(frame)

    # Normalize features using the saved scaler
    features_normalized = scaler.transform(features)

    # Standardize sequence length
    sequence_length = 100  # Same as during training
    if len(features_normalized) < sequence_length:
        pad_length = sequence_length - len(features_normalized)
        features_normalized = np.pad(features_normalized, ((0, pad_length), (0, 0)), mode='constant')
    elif len(features_normalized) > sequence_length:
        features_normalized = features_normalized[:sequence_length]

    # Prepare input tensor
    input_tensor = torch.FloatTensor(features_normalized).permute(1, 0).unsqueeze(0)

    # Make prediction
    with torch.no_grad():
        output = model(input_tensor)
        probabilities = torch.softmax(output, dim=1)
        predicted_class = torch.argmax(probabilities, dim=1).item()
        confidence = probabilities[0, predicted_class].item()

    # Output result
    if predicted_class == 1:
        print(f"The push-up form is **correct** with confidence {confidence * 100:.2f}%")
    else:
        print(f"The push-up form is **incorrect** with confidence {confidence * 100:.2f}%")

    # Visualize the results
    visualize_results(frames, predicted_class, confidence)

if __name__ == "__main__":
    main()


  model.load_state_dict(torch.load('final_trained_model.pth', map_location=torch.device('cpu')))
Processing Video:   0%|          | 0/42 [00:00<?, ?it/s]W0000 00:00:1733233968.295797    1434 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1733233968.350436    1434 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
Processing Video: 100%|██████████| 42/42 [00:01<00:00, 31.02it/s]


The push-up form is **correct** with confidence 75.50%
Visualization saved as /kaggle/working/output_video.mp4


In [56]:
!cp final_trained_model.pth scaler.pkl /kaggle/working/


  pid, fd = os.forkpty()


cp: 'final_trained_model.pth' and '/kaggle/working/final_trained_model.pth' are the same file
cp: 'scaler.pkl' and '/kaggle/working/scaler.pkl' are the same file


In [None]:
input_folder = '/kaggle/working/preprocessed_data'
sequence_length = 100
batch_size = 16
num_epochs = 50
learning_rate = 0.001
weight_decay = 1e-5
print("Done")