In [3]:
import os
import cv2
import numpy as np

# Define the path to your dataset directory.
dataset_dir = r"D:\HARenv\HAR_Dataset"

# Define constants for video frame preprocessing.
frame_height = 224
frame_width = 224
frame_count = 64 

In [4]:
# Create a function to preprocess a single video.
def preprocess_video(video_path):
    frames = []
    
    # Open the video file.
    cap = cv2.VideoCapture(video_path)
    
    # Check if the video opened successfully.
    if not cap.isOpened():
        raise Exception(f"Failed to open video: {video_path}")
    
    # Get the total number of frames in the video.
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Calculate the step size to evenly sample frames.
    step = max(total_frames // frame_count, 1)
    
    for i in range(frame_count):
        # Set the frame position to the current step.
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * step)
        
        # Read the frame.
        ret, frame = cap.read()
        
        if ret:
            # Resize the frame to a consistent size.
            frame = cv2.resize(frame, (frame_width, frame_height))
            # Normalize pixel values to [0, 1] or [-1, 1] depending on your model requirements.
            frame = frame / 255.0  # Normalize to [0, 1]
            # Add the frame to the list.
            frames.append(frame)
    
    # Close the video file.
    cap.release()
    
    # Convert the list of frames to a NumPy array.
    frames = np.array(frames)
    
    return frames

In [6]:
def load_and_preprocess_dataset(dataset_dir):
    data = []
    
    # Loop through the folders in the dataset directory, each representing an activity category.
    for activity_folder in os.listdir(dataset_dir):
        activity_path = os.path.join(dataset_dir, activity_folder)
        
        if os.path.isdir(activity_path):
            for video_file in os.listdir(activity_path):
                video_path = os.path.join(activity_path, video_file)
                
                if video_path.endswith('.mp4'):
                    # Preprocess the video and add it to the data list along with its label.
                    frames = preprocess_video(video_path)
                    label = activity_folder  # You may need to encode labels as integers.
                    data.append((frames, label))
    
    return data
# Load and preprocess the dataset.
dataset = load_and_preprocess_dataset(dataset_dir)

In [7]:
dataset

[(array([[[[0.72941176, 0.68235294, 0.69019608],
           [0.72941176, 0.68235294, 0.69019608],
           [0.7372549 , 0.69019608, 0.69803922],
           ...,
           [0.9254902 , 0.91764706, 0.90196078],
           [0.9254902 , 0.91764706, 0.90196078],
           [0.9254902 , 0.91764706, 0.90196078]],
  
          [[0.7372549 , 0.69019608, 0.69803922],
           [0.74117647, 0.69411765, 0.70196078],
           [0.74117647, 0.69411765, 0.70196078],
           ...,
           [0.9254902 , 0.91764706, 0.90196078],
           [0.9254902 , 0.91764706, 0.90196078],
           [0.9254902 , 0.91764706, 0.90196078]],
  
          [[0.74117647, 0.69411765, 0.70196078],
           [0.74117647, 0.69411765, 0.70196078],
           [0.74117647, 0.69411765, 0.70196078],
           ...,
           [0.9254902 , 0.91764706, 0.90196078],
           [0.9254902 , 0.91764706, 0.90196078],
           [0.9254902 , 0.91764706, 0.90196078]],
  
          ...,
  
          [[0.29019608, 0.17254902, 0.14

In [12]:
# import torch
# from torch.utils.data import DataLoader, random_split

# # Define batch size and other relevant parameters
# batch_size = 32  # Adjust as needed
# dataset_split_ratios = [0.7, 0.15, 0.15]  # Train, Validation, Test split ratios

# # Calculate the number of samples for each split based on ratios
# num_samples = len(dataset)
# split_sizes = [int(ratio * num_samples) for ratio in dataset_split_ratios]

# # Ensure that the splits sum to the total number of samples
# split_sizes[-1] = num_samples - sum(split_sizes[:-1])

# # Split the dataset into training, validation, and test sets
# train_data, val_data, test_data = random_split(dataset, split_sizes)

# # Create data loaders for training, validation, and test sets
# train_loader = DataLoader(CustomVideoDataset(train_data), batch_size=batch_size, shuffle=True)
# val_loader = DataLoader(CustomVideoDataset(val_data), batch_size=batch_size)
# test_loader = DataLoader(CustomVideoDataset(test_data), batch_size=batch_size)

# # Optionally, print the number of samples in each split
# print(f"Number of training samples: {len(train_data)}")
# print(f"Number of validation samples: {len(val_data)}")
# print(f"Number of test samples: {len(test_data)}")

Number of training samples: 73
Number of validation samples: 15
Number of test samples: 17


In [19]:
import torch
import torch.nn as nn
from torchvision.models.video import r3d_18

class SlowFast(nn.Module):
    def __init__(self, num_classes, pretrained_backbone=True):
        super(SlowFast, self).__init__()

        # Slow pathway: ResNet-18
        self.slow_backbone = r3d_18(pretrained=pretrained_backbone)
        
        # Fast pathway: ResNet-18
        self.fast_backbone = r3d_18(pretrained=pretrained_backbone)

        # Combine both pathways
        self.slowfast_fusion = self._create_fusion_layer()

        # Classifier
        self.fc = nn.Linear(512, num_classes)  # Adjust the input size based on your backbone

    def _create_fusion_layer(self):
        # Create the fusion layer to combine the slow and fast pathways.
        # You can implement different fusion strategies based on your requirements.
        # Common methods include concatenation or weighted combination.
        # Here, we'll concatenate the outputs along the channel dimension.
        return nn.Sequential(
            nn.Conv3d(1024, 512, kernel_size=(1, 1, 1), stride=(1, 1, 1), padding=(0, 0, 0)),
            nn.ReLU(inplace=True))

    def forward(self, slow_inputs, fast_inputs):
        # Forward pass through the slow pathway
        slow_features = self.slow_backbone(slow_inputs)

        # Forward pass through the fast pathway
        fast_features = self.fast_backbone(fast_inputs)

        # Combine the slow and fast features using the fusion layer
        combined_features = torch.cat((slow_features, fast_features), dim=2)  # Concatenate along the channel dimension
        fused_features = self.slowfast_fusion(combined_features)

        # Global average pooling
        fused_features = fused_features.mean([3, 4, 2])  # Spatial dimensions

        # Classifier
        logits = self.fc(fused_features)

        return logits

# Initialize the SlowFast model
num_classes = 7  # Replace with the number of classes in your dataset
slowfast_model = SlowFast(num_classes=num_classes, pretrained_backbone=True)

# Print the model architecture
print(slowfast_model)




SlowFast(
  (slow_backbone): VideoResNet(
    (stem): BasicStem(
      (0): Conv3d(3, 64, kernel_size=(3, 7, 7), stride=(1, 2, 2), padding=(1, 3, 3), bias=False)
      (1): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
    )
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Sequential(
          (0): Conv3DSimple(64, 64, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1), bias=False)
          (1): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
        (conv2): Sequential(
          (0): Conv3DSimple(64, 64, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1), bias=False)
          (1): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
        (relu): ReLU(inplace=True)
      )
      (1): BasicBlock(
        (conv1): Sequential(
          (0): Conv3DSimple(64, 64, kernel_size=(

In [20]:
from sklearn.model_selection import train_test_split

# Split the dataset into training, validation, and test sets
train_data, val_test_data = train_test_split(dataset, test_size=0.3, random_state=42)
val_data, test_data = train_test_split(val_test_data, test_size=0.5, random_state=42)

In [22]:
import torch
import torch.nn as nn
from torchvision import transforms  # Import the transforms module
from torchvision.models.video import r3d_18
# Data augmentation transforms
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomResizedCrop(size=256, scale=(0.8, 1.0)),
    transforms.RandomApply([transforms.ColorJitter(0.1, 0.1, 0.1, 0.1)], p=0.2),
    transforms.RandomGrayscale(p=0.2),
    transforms.ToTensor(),
])

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_data, batch_size=batch_size, num_workers=4)


In [23]:
criterion = nn.CrossEntropyLoss()

In [None]:
num_classes = 7  # Replace with the number of classes in your dataset
model = SlowFast(num_classes=num_classes, pretrained_backbone=True)

# Define the number of training epochs
num_epochs = 10  # You can adjust this value based on your training needsfor epoch in range(num_epochs):
for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    
    # Adjust learning rate
    scheduler.step()



In [None]:
best_val_acc = 0.0
for epoch in range(num_epochs):
    model.eval()
    val_loss = 0.0
    val_corrects = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            val_corrects += torch.sum(preds == labels.data)
        val_acc = val_corrects.double() / len(val_data)

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            # Save the best model checkpoint
            torch.save(model.state_dict(), 'best_model.pth')

In [None]:
model.load_state_dict(torch.load('best_model.pth'))
model.eval()
test_corrects = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        test_corrects += torch.sum(preds == labels.data)
test_acc = test_corrects.double() / len(test_data)
