In [None]:
import os
import json

import cv2
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter

from transformers import AutoImageProcessor, AutoModelForPreTraining

In [None]:
# Check if CUDA (GPU support) is available
if torch.cuda.is_available():
    print("CUDA is available. PyTorch can use the GPU.")
    print(f"Device name: {torch.cuda.get_device_name(0)}")
else:
    print("CUDA is not available. PyTorch is using the CPU.")

CUDA is not available. PyTorch is using the CPU.


In [None]:
# prompt: i want to access drive

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
num_frames = 16
image_size = 224
FeatureNum = 7

In [None]:
base_path = "/content/drive/MyDrive/Vision_GYM_Research/Data"
LOG_DIR = "/content/drive/MyDrive/Vision_GYM_Research/tensorboard_logs"

In [None]:
def process_action_data(base_path, action_name):
    """
    Process data for a specific action by loading the corresponding Excel and JSON files,
    adding pose data, and splitting into train, validation, and test sets.

    Args:
        base_path (str): The base directory containing the files.
        action_name (str): The name of the action (e.g., 'squat', 'deadlift', 'lunges').

    Returns:
        tuple: train_df, val_df, test_df DataFrames.
    """
    # Load the Excel file
    excel_file = f"{action_name}_edited.xlsx"
    df = pd.read_excel(os.path.join(base_path, excel_file))

    # Load the JSON files for front and lateral poses
    front_pose_file = f"front_pose_{action_name}.json"
    lat_pose_file = f"lat_pose_{action_name}.json"

    def load_json_as_numpy(json_file):
        with open(json_file, 'r') as file:
            data = json.load(file)
        return np.array(data)

    front_pose_array = load_json_as_numpy(os.path.join(base_path, front_pose_file))
    lat_pose_array = load_json_as_numpy(os.path.join(base_path, lat_pose_file))

    # Ensure `front_pose` and `lat_pose` columns exist
    if 'front_pose' not in df.columns:
        df['front_pose'] = None
    if 'lat_pose' not in df.columns:
        df['lat_pose'] = None

    # Assign the loaded arrays to the DataFrame if lengths match
    if len(front_pose_array) == len(df) and len(lat_pose_array) == len(df):
        df['front_pose'] = list(front_pose_array)
        df['lat_pose'] = list(lat_pose_array)
    else:
        raise ValueError("The length of the loaded arrays does not match the DataFrame.")

    # Shuffle the DataFrame
    df = df.sample(frac=1, random_state=42).reset_index(drop=True)

    # Define split ratios
    train_ratio, val_ratio, test_ratio = 0.7, 0.15, 0.15

    # Calculate the number of samples for each set
    total_samples = len(df)
    train_size = int(total_samples * train_ratio)
    val_size = int(total_samples * val_ratio)

    # Split the DataFrame
    train_df = df.iloc[:train_size]
    val_df = df.iloc[train_size:train_size + val_size]
    test_df = df.iloc[train_size + val_size:]

    return train_df, val_df, test_df

In [None]:
train_df_squat, val_df_squat, test_df_squat = process_action_data(base_path, 'squat')
train_df_dead, val_df_dead, test_df_dead = process_action_data(base_path, 'deadlift')
train_df_lunge, val_df_lunge, test_df_lunge = process_action_data(base_path, 'lunges')


# Define the actions
actions = ['squat', 'deadlift', 'lunges']

# Process data for each action and store results in dictionaries
splits = {action: process_action_data(base_path, action) for action in actions}

# Concatenate and shuffle DataFrames for each split
train_df = pd.concat([splits[action][0] for action in actions]).sample(frac=1, random_state=1).reset_index(drop=True)
val_df = pd.concat([splits[action][1] for action in actions]).sample(frac=1, random_state=1).reset_index(drop=True)
test_df = pd.concat([splits[action][2] for action in actions]).sample(frac=1, random_state=1).reset_index(drop=True)

In [None]:
import torchvision.models.video as models

class PretrainedResNet3D(nn.Module):
    def __init__(self, pretrained=True):
        super(PretrainedResNet3D, self).__init__()
        # Load the pretrained ResNet3D model
        self.resnet3d = models.r3d_18(pretrained=pretrained)
        # Replace the final fully connected layer with an identity layer
        self.resnet3d.fc = nn.Identity()

        for name, param in self.resnet3d.named_parameters():
            if "layer3" not in name and "layer4" not in name:
                param.requires_grad = False
    def forward(self, x):
        return self.resnet3d(x)

class DualInputResNet3D(nn.Module):
    def __init__(self, output_size= eFatureNum, feature_dim=512, dropout_rate=0.3):
        super(DualInputResNet3D, self).__init__()
        # Pretrained ResNet3D streams for frontal and lateral inputs
        self.resnet3d_frontal = PretrainedResNet3D()
        self.resnet3d_lateral = PretrainedResNet3D()

        # Fully connected layers
        self.fc1 = nn.Linear(feature_dim * 2, 1024)  # Combine features from both inputs
        self.bn_fc1 = nn.BatchNorm1d(1024)
        self.fc2 = nn.Linear(1024, output_size)

        # Activation and dropout
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, frontal, lateral):
        frontal = frontal.permute(0, 2, 1, 3, 4).contiguous()
        lateral = lateral.permute(0, 2, 1, 3, 4).contiguous()

        # Process frontal input through ResNet3D
        x_f = self.resnet3d_frontal(frontal)
        x_f = x_f.view(x_f.size(0), -1)  # Flatten features

        # Process lateral input through ResNet3D
        x_l = self.resnet3d_lateral(lateral)
        x_l = x_l.view(x_l.size(0), -1)  # Flatten features

        # Concatenate features from frontal and lateral streams
        x = torch.cat((x_f, x_l), dim=1)

        # Pass through fully connected layers
        x = self.relu(self.bn_fc1(self.fc1(x)))
        x = self.dropout(x)
        x = self.fc2(x)

        return x


In [None]:
class ResidualBlock(nn.Module):
    """
    A Residual Block with 1D convolutions, BatchNorm, and ReLU activations.
    """
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU()

        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm1d(out_channels)

        self.conv3 = nn.Conv1d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm1d(out_channels)

        # Shortcut connection
        self.shortcut = nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=stride)
        self.shortcut_bn = nn.BatchNorm1d(out_channels)

    def forward(self, x):
        shortcut = self.shortcut(x)
        shortcut = self.shortcut_bn(shortcut)

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)

        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu(x)

        x = x + shortcut
        x = self.relu(x)
        return x

class Pose_Model(nn.Module):
    """
    Pose Model using 1D Convolutions, Residual Blocks, and GRU for temporal aggregation.
    """
    def __init__(self, input_channels=5984, residual_channels1=4096 , residual_channels2=1024, residual_channels3=512, final_channels=256, gru_hidden_size=128):
        super(Pose_Model, self).__init__()

        # Initial 1D Convolution
        self.initial_conv = nn.Sequential(
            nn.Conv1d(input_channels, residual_channels1, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm1d(residual_channels1),
            nn.ReLU()
        )

        # Two Residual Blocks
        self.residual_block1 = ResidualBlock(residual_channels1, residual_channels2, stride=2)
        self.residual_block2 = ResidualBlock(residual_channels2, residual_channels3, stride=2)

        # # Adaptive Average Pooling
        # self.avg_pool = nn.AdaptiveAvgPool1d(1)

        # Final 1D Convolution to reduce channels
        self.final_conv = nn.Sequential(
            nn.Conv1d(residual_channels3, final_channels, kernel_size=1),
            nn.BatchNorm1d(final_channels),
            nn.ReLU()
        )

        # GRU for temporal aggregation
        self.gru = nn.GRU(input_size=final_channels, hidden_size=gru_hidden_size, batch_first=True)

    def forward(self, x):
        """
        x: Pose landmarks tensor [batch_size, num_frames=16, input_channels=528]
        """
        batch_size, num_frames, channels = x.shape
        x = x.permute(0, 2, 1)  # Reshape to [batch_size, input_channels, num_frames]

        # Pass through the convolutional layers
        x = self.initial_conv(x)
        x = self.residual_block1(x)
        x = self.residual_block2(x)
        # x = self.avg_pool(x)  # Shape: [batch_size, residual_channels, 1]

        x = self.final_conv(x)  # Shape: [batch_size, final_channels, 16]

        # x = x.squeeze(-1)  # Shape: [batch_size, final_channels]

        # Prepare for GRU
        x = x.permute(0, 2, 1)  # Reshape to [batch_size, 16, final_channels]

        # GRU for temporal features
        _, pooled_features = self.gru(x)  # Shape: [1, batch_size, gru_hidden_size]
        return pooled_features.squeeze(0)  # Shape: [batch_size, gru_hidden_size]


class DualInputPose(nn.Module):
    def __init__(self, pose_input_channels=5984, hidden_size=128, output_size=7):
        super(DualInputPose, self).__init__()

        # Pose models for front and lateral views
        self.front_model = Pose_Model(input_channels=pose_input_channels, gru_hidden_size=hidden_size)
        self.lat_model = Pose_Model(input_channels=pose_input_channels, gru_hidden_size=hidden_size)

        # Fully connected layers for prediction
        self.fc = nn.Sequential(
            nn.Linear(hidden_size * 2, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, output_size)
        )

    def forward(self, front_input, lat_input):
        """
        front_input: Front view pose data [batch_size, num_frames=16, pose_input_channels=5984]
        lat_input: Lateral view pose data [batch_size, num_frames=16, pose_input_channels=5984]
        """
        # Pass through the front and lateral Pose models
        front_features = self.front_model(front_input)  # Shape: [batch_size, hidden_size]
        lat_features = self.lat_model(lat_input)        # Shape: [batch_size, hidden_size]

        # Concatenate features
        combined_features = torch.cat((front_features, lat_features), dim=1)  # Shape: [batch_size, hidden_size * 2]

        # Predict criteria
        output = self.fc(combined_features)  # Shape: [batch_size, output_size]
        return output

In [None]:
class MultiModalModel(nn.Module):
    """
    Multi-modal model that combines pose-based and image-based inputs.
    """
    def __init__(self,
                 pose_input_channels=5984,
                 image_feature_dim=512,
                 pose_hidden_size=128,
                 output_size=FeatureNum):
        super(MultiModalModel, self).__init__()

        # Image model (DualInputResNet3D)
        self.vision_model = DualInputResNet3D(output_size=image_feature_dim)

        # Pose model (DualInputPose)
        self.pose_model = DualInputPose(
            pose_input_channels=pose_input_channels,
            hidden_size=pose_hidden_size,
            output_size=pose_hidden_size
        )

        # Fusion layer
        combined_feature_dim = image_feature_dim + pose_hidden_size
        self.fc = nn.Sequential(
            nn.Linear(combined_feature_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, output_size)
        )

    def forward(self, image_frontal, image_lateral, pose_frontal, pose_lateral):
        """
        Forward pass for multi-modal model.

        Args:
        - image_frontal: Frontal view images [batch_size, num_frames=16, 3, 224, 224]
        - image_lateral: Lateral view images [batch_size, num_frames=16, 3, 224, 224]
        - pose_frontal: Frontal view pose data [batch_size, num_frames=16, pose_input_channels]
        - pose_lateral: Lateral view pose data [batch_size, num_frames=16, pose_input_channels]

        Returns:
        - Output predictions [batch_size, output_size]
        """
        # Extract image features
        image_features = self.vision_model(image_frontal, image_lateral)  # Shape: [batch_size, 512]

        # Extract pose features
        pose_features = self.pose_model(pose_frontal, pose_lateral)  # Shape: [batch_size, 128]

        # Concatenate features
        combined_features = torch.cat((image_features, pose_features), dim=1)  # Shape: [batch_size, combined_feature_dim]

        # Pass through fusion layers
        output = self.fc(combined_features)  # Shape: [batch_size, output_size]
        return output


In [None]:
def compute_pairwise_distances(points_tensor):
    """
    Compute pairwise distances between 33 pose landmarks for each frame.

    Args:
        points_tensor (torch.Tensor): Tensor of shape [num_frames, 33, 3],
                                       where each row is a point (x, y, z) for each frame.

    Returns:
        torch.Tensor: Tensor of shape [num_frames, 528], containing pairwise distances for each frame.
    """
    num_frames, num_points, _ = points_tensor.size()

    # Generate index pairs for the upper triangle of a matrix (excluding diagonal)
    pairs = torch.combinations(torch.arange(num_points), r=2, with_replacement=False)  # Shape: [528, 2]

    # Gather the coordinates for each pair of points
    point1 = points_tensor[:, pairs[:, 0]]  # Shape: [num_frames, 528, 3]
    point2 = points_tensor[:, pairs[:, 1]]  # Shape: [num_frames, 528, 3]

    # Compute pairwise Euclidean distances
    pairwise_distances = torch.norm(point1 - point2, dim=2)  # Shape: [num_frames, 528]

    return pairwise_distances


def compute_distances_and_angles_combined(points_tensor):
    """
    Compute and concatenate pairwise distances and angles between every three points for 33 pose landmarks for each frame.

    Args:
        points_tensor (torch.Tensor): Tensor of shape [num_frames, 33, 3],
                                       where each row is a point (x, y, z) for each frame.

    Returns:
        torch.Tensor: Tensor of shape [num_frames, 528 + comb(33, 3)],
                      containing pairwise distances and angles for each frame.
    """
    num_frames, num_points, _ = points_tensor.size()

    # Precompute all pairs and triplets of indices
    pairs = torch.combinations(torch.arange(num_points), r=2, with_replacement=False)
    triplets = torch.combinations(torch.arange(num_points), r=3, with_replacement=False)

    # Compute pairwise distances
    point_diffs = points_tensor[:, pairs[:, 0]] - points_tensor[:, pairs[:, 1]]  # [num_frames, num_pairs, 3]
    pairwise_distances = torch.norm(point_diffs, dim=2)  # [num_frames, num_pairs]

    # Compute angles between triplets
    vec1 = points_tensor[:, triplets[:, 0]] - points_tensor[:, triplets[:, 1]]  # [num_frames, num_triplets, 3]
    vec2 = points_tensor[:, triplets[:, 2]] - points_tensor[:, triplets[:, 1]]  # [num_frames, num_triplets, 3]
    dot_products = torch.sum(vec1 * vec2, dim=2)  # [num_frames, num_triplets]
    norms = torch.norm(vec1, dim=2) * torch.norm(vec2, dim=2)  # [num_frames, num_triplets]
    cos_angles = dot_products / (norms + 1e-8)  # Add epsilon to avoid division by zero
    # angles = torch.acos(cos_angles)  # [num_frames, num_triplets]

    # Concatenate distances and angles
    combined_features = torch.cat([pairwise_distances, cos_angles], dim=1)  # [num_frames, 5984]
    return combined_features

In [None]:
class PreprocessedPoseVideoDataset(Dataset):

    def __init__(self, df, num_frames, preprocessed_dir):
        self.df = df
        self.num_frames = num_frames
        self.preprocessed_dir = preprocessed_dir

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        # Get frontal and lateral video identifiers
        num_video_frontal = row['Num Video Frontal']
        num_video_lateral = row['Num Video Lateral']
        num_idx = row['NumIdx']
        action = row['Action']

        # Load preprocessed frames
        frontal_frames = self._load_preprocessed_frames(num_video_frontal, action, num_idx)
        lateral_frames = self._load_preprocessed_frames(num_video_lateral, action, num_idx)

        # Normalize images
        image_frontal = torch.tensor(frontal_frames, dtype=torch.float32).permute(0, 3, 1, 2) / 255.0
        image_lateral = torch.tensor(lateral_frames, dtype=torch.float32).permute(0, 3, 1, 2) / 255.0

        label_class = torch.tensor(row['class'], dtype=torch.long)
        ratings = self._process_ratings(train_df_lunge, row)
        ratings = torch.tensor(ratings, dtype=torch.float32) if ratings is not None else None

        # Extract pose landmarks from video frames
        pose_landmarks_frontal = row['front_pose']
        pose_landmarks_lateral = row['lat_pose']
        pose_landmarks_tensor_frontal = torch.tensor(pose_landmarks_frontal).float()
        pose_landmarks_tensor_lateral = torch.tensor(pose_landmarks_lateral).float()
        pose_frontal = compute_distances_and_angles_combined(pose_landmarks_tensor_frontal)
        pose_lateral = compute_distances_and_angles_combined(pose_landmarks_tensor_lateral)

        return (image_frontal, image_lateral, pose_frontal, pose_lateral, label_class, ratings)

    def _load_preprocessed_frames(self, num_video, action, num_idx):
        frames = []
        for i in range(1, self.num_frames + 1):
            # Construct the preprocessed file path
            file_name = f"{num_video}_idx_{num_idx}_{i}.npy"
            file_path = os.path.join(self.preprocessed_dir, action, file_name)

            # Load the preprocessed .npy file
            frame = np.load(file_path)
            frames.append(frame)
        return np.stack(frames, axis=0)

    def _process_ratings(self, df, row):
        relevant_columns = [col for col in df.columns if col.endswith('F') or col.endswith('L')]
        scores = row[relevant_columns].values
        thresholded_scores = np.where(scores >= 0.5, 1, 0)
        return thresholded_scores.tolist()


In [None]:
train_df['Num Video Frontal'][0]

103

In [None]:
def create_dataloader(df, num_frames, batch_size=16, shuffle=True):
    dataset = PreprocessedPoseVideoDataset(df, num_frames, preprocessed_dir=os.path.join(base_path, 'preprocessed_images'))
    return DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        pin_memory=True           # Optimize for GPU training
    )

In [None]:
def train_combined_model(CustomModel, train_dataloader, eval_dataloader, epochs=1000, lr=1e-4,
                         device='cpu', clip_grad_norm=1.0, patience=10):
    # TensorBoard setup
    writer = SummaryWriter(log_dir=LOG_DIR)

    CustomModel.to(device)

    # Set up optimizer and loss functions
    optimizer = optim.Adam(list(CustomModel.parameters()), lr=lr)
    feature_loss = nn.BCEWithLogitsLoss()  # Loss for rating task (binary)

    # Learning rate scheduler
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)
    best_eval_loss = float('inf')
    patience_counter = 0
    print("Training the model...")
    a = 0
    for epoch in range(epochs):
        CustomModel.train()
        running_loss = 0.0
        print(f'Start of Epoch {epoch+1}')
        for batch_idx, batch in enumerate(train_dataloader):
            # Calculate progress percentage
            progress = (batch_idx + 1) / len(train_dataloader) * 100
            # Unpack batch data and move to the specified device
            (image_frontal, image_lateral, pose_frontal, pose_lateral, label_class, ratings) = [tensor.to(device) for tensor in batch]
            optimizer.zero_grad()
            # Forward pass through the classification model
            ratings_output = CustomModel(image_frontal, image_lateral, pose_frontal, pose_lateral).to(device)

            loss = feature_loss(ratings_output, ratings)


            # Backward pass and optimization
            loss.backward()
            torch.nn.utils.clip_grad_norm_(list(CustomModel.parameters()), clip_grad_norm)
            optimizer.step()

            running_loss += loss.item()

            # Print progress during each epoch
            print(f"Epoch [{epoch + 1}/{epochs}], Progress: {progress:.2f}%, Batch [{batch_idx + 1}/{len(train_dataloader)}], Loss: {loss.item():.4f}")

        avg_loss = running_loss / len(train_dataloader)

        # Log training loss to TensorBoard
        writer.add_scalar("Loss/Train", avg_loss, epoch)

        # Evaluate both models after each epoch
        eval_loss, hamming_distances, metrics = evaluate_combined_model(CustomModel, eval_dataloader, feature_loss, device)
        mean_hamming_distance = sum(hamming_distances.values()) / len(hamming_distances) if len(hamming_distances) > 0 else 0.0


        tp = metrics['TP']
        tn = metrics['TN']
        fp = metrics['FP']
        fn = metrics['FN']

        # Class 1 metrics
        precision_1 = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall_1 = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1_score_1 = 2 * (precision_1 * recall_1) / (precision_1 + recall_1) if (precision_1 + recall_1) > 0 else 0

        # Class 0 metrics
        precision_0 = tn / (tn + fn) if (tn + fn) > 0 else 0
        recall_0 = tn / (tn + fp) if (tn + fp) > 0 else 0
        f1_score_0 = 2 * (precision_0 * recall_0) / (precision_0 + recall_0) if (precision_0 + recall_0) > 0 else 0

        jaccard_index = tp / (tp + fp + fn) if (tp + fp + fn) > 0 else 0.0

        # Log validation metrics to TensorBoard
        writer.add_scalar("Loss/Validation", eval_loss, epoch)
        writer.add_scalar("Loss/Hamming Loss", mean_hamming_distance, epoch)
        writer.add_scalar(f"Metrics/Precision", (precision_1 +precision_0)/2, epoch)
        writer.add_scalar(f"Metrics/Recall", (recall_1 +recall_0)/2, epoch)
        writer.add_scalar(f"Metrics/F1-Score", (f1_score_1 +f1_score_0)/2, epoch)
        writer.add_scalar(f"Metrics/jaccard index", jaccard_index, epoch)

        # Print summary for each epoch
        print(f"Epoch [{epoch + 1}/{epochs}] with lr: {lr} Summary: "
              f"Train Loss: {avg_loss:.4f}, Eval Loss: {eval_loss:.4f},"
              f"Precision: {(precision_1 +precision_0)/2:.4f}, Recall: {(recall_1 +recall_0)/2:.4f},"
              f" F1-Score 0: {f1_score_0:.4f}, F1-Score 1: {f1_score_1:.4f}, F1-Score: {(f1_score_1 +f1_score_0)/2:.4f}"
              )
        print(f"Hamming Loss: {mean_hamming_distance:.4f}, jaccard index: {jaccard_index:.4f}", )

        scheduler.step(eval_loss)

        # Early stopping and model saving
        if eval_loss < best_eval_loss:
            best_eval_loss = eval_loss
            patience_counter = 0
            torch.save(CustomModel.state_dict(), f"best_rating_model_epoch_{epoch + 1}.pt")
            print("Model checkpoint saved.")
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered.")
                break

    writer.close()  # Close TensorBoard writer
    print("Training complete.")


In [None]:
def evaluate_combined_model(CustomSwinTransformerModel, dataloader, feature_loss, device):
    """
    Evaluates the model and computes evaluation metrics for each feature.

    Args:
        CustomSwinTransformerModel (torch.nn.Module): The rating prediction model.
        dataloader (DataLoader): A DataLoader providing the evaluation data.
        feature_loss (nn.Module): The loss function for ratings.
        device (str): The device to perform evaluation on ('cpu' or 'cuda').

    Returns:
        float: Average evaluation loss.
        dict: Hamming distance for squat features.
        dict: TP, TN, FP, FN for each feature.
    """
    CustomSwinTransformerModel.eval()
    total_loss = 0.0
    num_features = FeatureNum
    metrics = {
            feature_idx: {'TP': 0, 'TN': 0, 'FP': 0, 'FN': 0} for feature_idx in range(num_features)
        }
    total_samples = {feature_idx: 0 for feature_idx in range(num_features)}

    with torch.no_grad():
        a=0
        for batch_idx, batch in enumerate(dataloader):
            # Move batch data to the device
            print(f"a ={a},  batch_idx = {batch_idx}")
            a=a+1
            image_frontal, image_lateral, pose_frontal, pose_lateral, label_class, ratings = [tensor.to(device) for tensor in batch]

            # Predict ratings using the model
            ratings_output = CustomSwinTransformerModel(image_frontal, image_lateral, pose_frontal, pose_lateral).to(device)
            total_loss += feature_loss(ratings_output, ratings)

            # Sigmoid activation for binary predictions
            predicted_ratings = torch.sigmoid(ratings_output) > 0.5
            actual_ratings = ratings.byte()

            # Compute TP, TN, FP, FN for each feature
            for feature_idx in range(num_features):
                preds = predicted_ratings[:, feature_idx]
                trues = actual_ratings[:, feature_idx]

                metrics[feature_idx]['TP'] += (preds & trues).sum().item()
                metrics[feature_idx]['TN'] += (~preds & ~trues).sum().item()
                metrics[feature_idx]['FP'] += (preds & ~trues).sum().item()
                metrics[feature_idx]['FN'] += (~preds & trues).sum().item()
                total_samples[feature_idx] += trues.numel()

    # Calculate Hamming distance for each feature
    hamming_distances = {}
    for feature_idx in range(num_features):
          hamming_distances[feature_idx] = (
              metrics[feature_idx]['FP'] + metrics[feature_idx]['FN']
          ) / total_samples[feature_idx] if total_samples[feature_idx] > 0 else 0.0

    avg_loss = total_loss / len(dataloader)

    total_metrics = {'TP': 0, 'TN': 0, 'FP': 0, 'FN': 0}
    # Print TP, TN, FP, FN for each feature
    for feature_idx in range(num_features):
        print(f"Feature {feature_idx}:")
        print(f"  TP: {metrics[feature_idx]['TP']}")
        print(f"  TN: {metrics[feature_idx]['TN']}")
        print(f"  FP: {metrics[feature_idx]['FP']}")
        print(f"  FN: {metrics[feature_idx]['FN']}")
        print(f"  Hamming Loss: {hamming_distances[feature_idx]:.4f}")
        total_metrics['TP'] += metrics[feature_idx]['TP']
        total_metrics['TN'] += metrics[feature_idx]['TN']
        total_metrics['FP'] += metrics[feature_idx]['FP']
        total_metrics['FN'] += metrics[feature_idx]['FN']

    return avg_loss, hamming_distances, total_metrics


In [None]:
dataloader = create_dataloader(train_df_lunge, 16, batch_size=2)
eval_dataloader = create_dataloader(val_df_lunge, 16, batch_size=2)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
print(len(dataloader.dataset))  # Check dataset length
print(dataloader.dataset[0][0].shape)  # Try accessing the first element


74
torch.Size([16, 3, 224, 224])


In [None]:
# Clear previous TensorBoard runs if needed
!rm -rf /content/drive/MyDrive/Vision_GYM_Research/tensorboard_logs

# Start TensorBoard in Colab
%load_ext tensorboard
%tensorboard --logdir $LOG_DIR

<IPython.core.display.Javascript object>

In [None]:
%tensorboard --logdir $LOG_DIR

In [None]:
torch.cuda.empty_cache()

In [None]:
# Initialize the model
rating_model = MultiModalModel()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(device)

# Train the model
train_combined_model(rating_model, dataloader, eval_dataloader, epochs=1000, lr=4e-4, device=device)



cpu
Training the model...
Start of Epoch 1
Epoch [1/1000], Progress: 2.70%, Batch [1/37], Loss: 0.7046


In [None]:
train_combined_model(rating_model, dataloader, eval_dataloader, epochs=1000, lr=1e-4, device=device)