In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision.models import resnet101
import torch.nn.functional as F
import os
import cv2
import numpy as np
from torchvision import transforms
import re
import torchvision.models as models
import random
from sklearn.metrics import roc_auc_score, accuracy_score, recall_score, f1_score
import matplotlib.pyplot as plt

## Data Processing

In [None]:
# Pre-processing steps for the images
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((112, 112)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def has_less_than_30_images(folder_path):
    # Supported image file extensions
    image_extensions = {'.jpg'}

    # Count the number of image files
    image_count = 0
    for file in os.listdir(folder_path):
        if any(file.endswith(ext) for ext in image_extensions):
            image_count += 1

            # If 30 or more images are found, return False
            if image_count >= 30:
                return False
    # If fewer than 30 images, return True
    return True

# Extract the number from the file name
def extract_numbers_from_filename(filename):
    return [int(num) for num in re.findall(r'\d+', filename)]


def extract_frame(data_dir):
    samples = []
    # Loop over the folders in the dataset
    folders = sorted(os.listdir(data_dir), key=extract_numbers_from_filename)

    for image_folder in folders:

        print("image_folder : ", image_folder)

        if "hand_flapping" in image_folder:
            label = 0 # hand flapping
        elif "arm_flapping" in image_folder:
            label = 1 # arm flapping

        image_folder_path = os.path.join(data_dir, image_folder)
        # print("image_folder_path : ", image_folder_path)

        frames = []
        for image_file in sorted(os.listdir(image_folder_path), key=extract_numbers_from_filename):
            if has_less_than_30_images(image_folder_path):
                continue

            if image_file.endswith(('.png', '.jpg', '.jpeg')):  # check for image files
                image_path = os.path.join(image_folder_path, image_file)
                image = cv2.imread(image_path)
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                image = transform(image)

                frames.append(image)

        # Stack the frames into a tensor of shape (num_frames, 3, 112, 112)
        frames_tensor = torch.stack(frames, dim=0)
        samples.append((frames_tensor, label))

    np.random.shuffle(samples)
    return samples

# Define the file path to your dataset
train_data_dir = '/kaggle/input/7015-dataset/train/train'
test_data_dir = '/kaggle/input/7015-dataset/test/test'

train_samples = extract_frame(train_data_dir)
test_samples = extract_frame(test_data_dir)

train_features, train_labels = zip(*train_samples)
test_features, test_labels = zip(*test_samples)

## Model

In [None]:
# random seed
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# Custom Dataset class
class CustomDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

class CNNLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, timesteps):
        super(CNNLSTM, self).__init__()
        self.resnet = resnet101(pretrained=True)
        self.resnet.fc = nn.Sequential(nn.Linear(self.resnet.fc.in_features, input_size))
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)

        self.fc1 = nn.Linear(hidden_size, num_classes)  # fully connection

    def forward(self, x_3d):
        batch_size, timesteps, channels, height, width = x_3d.size()
        c_in = x_3d.view(batch_size * timesteps, channels, height, width)

        with torch.no_grad():
            c_out = self.resnet(c_in)

        r_in = c_out.view(batch_size, timesteps, -1)

        hidden = None
        out, hidden = self.lstm(r_in, hidden)

        x = self.fc1(out[:, -1, :])
        return x

# parameter
learning_rate = 0.001
batch_size = 32
input_size = 64
hidden_size = 32
num_layers = 2
num_classes = 2
timesteps = 30

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = CNNLSTM(input_size, hidden_size, num_layers, num_classes,timesteps).to(device)

# Define criterion and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Create dataset instances
train_dataset = CustomDataset(train_features, train_labels)
test_dataset = CustomDataset(test_features, test_labels)
# Create DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# train
num_epochs = 30
train_losses = []
train_accuracies = []
test_losses = []
test_accuracies = []
test_predicted_probs = []

# early_stopping
early_stopping_patience = 10
early_stopping_counter = 0
best_test_loss = float('inf')

# List used to store performance metric values
accuracy_values = []
f1_score_values = []
auc_values = []

for epoch in range(num_epochs):
    # Training
    test_predicted_probs = []
    model.train()
    train_loss = 0.0
    train_total = 0
    train_correct = 0
    for batch_features, batch_labels in train_loader:
        batch_features = batch_features.to(device)
        batch_labels = batch_labels.to(device)

        optimizer.zero_grad()
        outputs = model(batch_features)
        loss = criterion(outputs, batch_labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        train_total += batch_labels.size(0)
        train_correct += (predicted == batch_labels).sum().item()
    train_loss /= len(train_loader)
    train_accuracy = 100 * train_correct / train_total

    # Testing loop
    model.eval()
    test_loss = 0.0
    test_correct = 0
    test_total = 0
    test_true_labels = []
    test_predicted_labels = []

    print(f'Epoch {epoch+1}/{num_epochs}')
    with torch.no_grad():
        for batch_features, batch_labels in test_loader:
            batch_features = batch_features.to(device)
            batch_labels = batch_labels.to(device)

            outputs = model(batch_features)
            loss = criterion(outputs, batch_labels)
            test_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            test_total += batch_labels.size(0)
            test_correct += (predicted == batch_labels).sum().item()

            test_true_labels.extend(batch_labels.cpu().numpy())
            test_predicted_labels.extend(predicted.cpu().numpy())
            test_predicted_probs.extend(outputs.cpu().numpy())

    test_loss /= len(test_loader)
    test_accuracy = 100 * test_correct / test_total

    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)
    test_losses.append(test_loss)
    test_accuracies.append(test_accuracy)

    # Convert the list to a numpy array for roc_auc_score
    test_predicted_probs = np.array(test_predicted_probs)

    # Compute AUC for each class
    auc_scores = []
    for class_idx in range(test_predicted_probs.shape[1]):
        auc = roc_auc_score(np.array(test_true_labels) == class_idx, test_predicted_probs[:, class_idx])
        auc_scores.append(auc)

    avg_auc = np.mean(auc_scores)


    accuracy_values.append(test_accuracy / 100)
    f1_score_values.append(f1_score(test_true_labels, test_predicted_labels, average='weighted'))
    auc_values.append(avg_auc)


    # Print testing loss and accuracy
    print(f'Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}% - Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%')
     # Early stopping check
    if test_loss < best_test_loss:
        best_test_loss = test_loss
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1

    # Check if early stopping criteria met
    if early_stopping_counter >= early_stopping_patience:
        print("Early stopping.")
        break

# calculate accuracy、recall、F1score
test_accuracy = accuracy_score(test_true_labels, test_predicted_labels)
test_accuracy = test_accuracy*100
test_recall = recall_score(test_true_labels, test_predicted_labels, average='weighted')
test_f1 = f1_score(test_true_labels, test_predicted_labels, average='weighted')

accuracy_variance = np.var(accuracy_values)
f1_score_variance = np.var(f1_score_values)
auc_variance = np.var(auc_values)

print(f'Final Test Accuracy: {test_accuracy:.2f}%, Test Recall: {test_recall:.2f}, Test F1 Score: {test_f1:.2f}, Average AUC: {avg_auc:.4f}')
print("Accuracy Variance:", accuracy_variance)
print("F1-score Variance:", f1_score_variance)
print("AUC Variance:", auc_variance)
# Plotting
plt.figure(figsize=(12, 5))

# Plot training and testing losses
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.title('Training and Testing Loss per Epoch')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

# Plot training and testing accuracies
plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(test_accuracies, label='Test Accuracy')
plt.title('Training and Testing Accuracy per Epoch')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()

plt.tight_layout()
plt.show()
