.
..............................................................................................................................................................................................................................## **Preprocessing** **part**

Install necessary function

In [None]:
!pip install torch torchvision
!pip install opencv-python

import json
import numpy as np
import cv2
import torch
import os
from torch.utils.data import Dataset
from torchvision import transforms
from google.colab import drive
from tqdm import tqdm

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Augmentation

In [None]:
base_dir = '/content/drive/My Drive/dataset/'
output_dir = base_dir + 'augmented-examples/'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
def keystoint(x):
    """ Convert dictionary keys to integers, useful for JSON keys. """
    return {int(k): v for k, v in x.items()}

def rotateVideo(path, output_dir, video_id, degree):
    """ Rotate the video by a specified degree and save it. """

    video = cv2.VideoCapture(path)
    frame_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = video.get(cv2.CAP_PROP_FPS)

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out_rotate = cv2.VideoWriter(output_dir + video_id + "_rotate_" + str(degree) + ".mp4", fourcc, fps, (frame_width, frame_height))

    while video.isOpened():
        success, frame = video.read()
        if not success:
            break

        rotation_matrix = cv2.getRotationMatrix2D((frame_width / 2, frame_height / 2), degree, 1)
        rotated_frame = cv2.warpAffine(frame, rotation_matrix, (frame_width, frame_height))
        out_rotate.write(rotated_frame)

    video.release()
    out_rotate.release()

def translateVideo(path, output_dir, video_id, translate=(0, 0)):
    """ Translate the video by specified pixels and save it. """
    video = cv2.VideoCapture(path)
    frame_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = video.get(cv2.CAP_PROP_FPS)

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out_translate = cv2.VideoWriter(output_dir + video_id + "_translate_" + str(translate[0]) + "_" + str(translate[1]) + ".mp4", fourcc, fps, (frame_width, frame_height))

    while video.isOpened():
        success, frame = video.read()
        if not success:
            break

        M = np.float32([[1, 0, translate[0]], [0, 1, translate[1]]])
        dst = cv2.warpAffine(frame, M, (frame_width, frame_height))
        out_translate.write(dst)

    video.release()
    out_translate.release()

def augmentVideo(annotation_dict, labels_dict, data_dir=base_dir + 'examples/', output_dir=base_dir + 'augmented-examples/'):
    """ Augment videos with rotation and translation transformations. """
    with open(annotation_dict, 'r') as f:
        annotation_dict = json.load(f)
        video_list = list(annotation_dict.items())

    with open(labels_dict, 'r') as f:
        labels_dict = json.load(f,object_hook=keystoint)

    # Count the distribution of actions
    count_dict = dict()
    for key, value in annotation_dict.items():
        action = labels_dict[value]
        if action in count_dict:
            count_dict[action] += 1
        else:
            count_dict[action] = 1

    # Find actions with fewer than 2000 examples
    filtered_actions = [action for action, count in count_dict.items() if count <= 2000]

    # Prepare to augment videos
    augmented_annotation = dict()
    pbar = tqdm(video_list)
    for video_id, action in pbar:
        if labels_dict[action] in filtered_actions:
            path = data_dir + video_id + ".mp4"

            # Rotate and translate the video, save augmented videos
            augmented_annotation[video_id + "_rotate_30"] = action
            rotateVideo(path, output_dir, video_id, 30)

            augmented_annotation[video_id + "_rotate_330"] = action
            rotateVideo(path, output_dir, video_id, 330)

            augmented_annotation[video_id + "_translate_32_0"] = action
            translateVideo(path, output_dir, video_id, (32, 0))

            augmented_annotation[video_id + "_translate_-32_0"] = action
            translateVideo(path, output_dir, video_id, (-32, 0))

    # Save new annotations
    with open(output_dir + 'augmented_annotation_dict.json', 'w') as fp:
        json.dump(augmented_annotation, fp)

if __name__ == "__main__":
    augmentVideo(base_dir + "annotation_dict.json", base_dir + "labels_dict.json")


Loading data

In [None]:
import json

def load_dict_from_json(file_path):
    with open(file_path) as f:
        return json.load(f)

def count_samples(annotation_dict_path, label_dict_path):
    annotation_dict = load_dict_from_json(annotation_dict_path)
    label_dict = load_dict_from_json(label_dict_path)

    total_samples = len(annotation_dict)
    class_counts = {label: 0 for label in set(label_dict.values()) if label != "discard"}

    for video_id, label_index in annotation_dict.items():
        label = label_dict[str(label_index)]
        if label != "discard":
            class_counts[label] += 1

    return total_samples, class_counts

# 設置 annotation_dict 和 label_dict 的路徑
annotation_dict_path = '/content/drive/My Drive/dataset/annotation_dict.json'
label_dict_path = '/content/drive/My Drive/dataset/labels_dict.json'

# 計算總樣本數和各個類別的數量
total_samples, class_counts = count_samples(annotation_dict_path, label_dict_path)

print("Total samples:", total_samples)
print("Class counts:")
for label, count in class_counts.items():
    print(f"{label}: {count}")


Total samples: 37085
Class counts:
block: 996
walk: 11749
defense: 3866
pass: 1070
no_action: 6490
run: 5924
shoot: 426
dribble: 3490
pick: 712
ball in hand: 2362


In [None]:
import json

def load_dict_from_json(file_path):
    with open(file_path) as f:
        return json.load(f)

def count_samples(annotation_dict_path, label_dict_path):
    annotation_dict = load_dict_from_json(annotation_dict_path)
    label_dict = load_dict_from_json(label_dict_path)

    total_samples = len(annotation_dict)
    class_counts = {label: 0 for label in set(label_dict.values()) if label != "discard"}

    for video_id, label_index in annotation_dict.items():
        label = label_dict[str(label_index)]
        if label != "discard":
            class_counts[label] += 1

    return total_samples, class_counts

# 設置 annotation_dict 和 label_dict 的路徑
annotation_dict_path = '/content/drive/My Drive/dataset/augmented_annotation_dict.json'
label_dict_path = '/content/drive/My Drive/dataset/labels_dict.json'

# 計算總樣本數和各個類別的數量
total_samples, class_counts = count_samples(annotation_dict_path, label_dict_path)

print("Total samples:", total_samples)
print("Class counts:")
for label, count in class_counts.items():
    print(f"{label}: {count}")

Total samples: 12814
Class counts:
block: 3982
walk: 0
defense: 0
pass: 4280
no_action: 0
run: 0
shoot: 1704
dribble: 0
pick: 2848
ball in hand: 0


In [None]:
import json
import numpy as np
import cv2

import torch
from torch.utils.data import Dataset
from torchvision import transforms
base_dir = '/content/drive/My Drive/dataset/'
class BasketballDataset(Dataset):
    """SpaceJam: a Dataset for Basketball Action Recognition."""

    def __init__(self, annotation_dict, augmented_dict, video_dir=base_dir+"examples/", augmented_dir=base_dir+"augmented-examples/", augment=True, transform=None, poseData=False):
        with open(annotation_dict) as f:
            self.video_list = list(json.load(f).items())

        if augment == True:
            self.augment = augment
            with open(augmented_dict) as f:
                augmented_list = list(json.load(f).items())
            self.augmented_dir = augmented_dir
            # extend with augmented data
            self.video_list.extend(augmented_list)

        self.video_dir = video_dir
        self.poseData = poseData
        self.transform = transform

    def __len__(self):
        # return length of none-flipped videos in directory
        return len(self.video_list)

    def __getitem__(self, idx):
        video_id = self.video_list[idx][0]
        encoding = np.squeeze(np.eye(10)[np.array([0,1,2,3,4,5,6,7,8,9]).reshape(-1)])
        if self.poseData and self.augment==False:
            joints = np.load(self.video_dir + video_id + ".npy", allow_pickle=True)
            sample = {'video_id': video_id, 'joints': joints, 'action': torch.from_numpy(np.array(encoding[self.video_list[idx][1]])), 'class': self.video_list[idx][1]}
        else:
            video = self.VideoToNumpy(video_id)
            sample = {'video_id': video_id, 'video': torch.from_numpy(video).float(), 'action': torch.from_numpy(np.array(encoding[self.video_list[idx][1]])), 'class': self.video_list[idx][1]}

        return sample

    def keystoint(self, x):
        return {int(k): v for k, v in x.items()}

    def VideoToNumpy(self, video_id):
        # get video
        video = cv2.VideoCapture(self.video_dir + video_id + ".mp4")

        if not video.isOpened():
            video = cv2.VideoCapture(self.augmented_dir + video_id + ".mp4")
        if not video.isOpened():
            raise Exception("Video file not readable")

        video_frames = []
        while (video.isOpened()):
            # read video
            success, frame = video.read()
            if not success:
                break

            frame = np.asarray([frame[..., i] for i in range(frame.shape[-1])]).astype(float)
            video_frames.append(frame)

        video.release()
        assert len(video_frames) == 16
        return np.transpose(np.asarray(video_frames), (1,0,2,3))


if __name__ == "__main__":

    basketball_dataset = BasketballDataset(annotation_dict= base_dir +"annotation_dict.json",
                                           augmented_dict= base_dir+"augmented_annotation_dict.json")

    print(basketball_dataset[1]['action'])
    print(basketball_dataset[1]['class'])
    print(len(basketball_dataset))

tensor([0., 0., 0., 0., 0., 0., 0., 0., 1., 0.], dtype=torch.float64)
8
49899


## **Model**

Majority Class

In [None]:
import json

def load_annotations(paths):
    """ Load and combine annotation data from multiple JSON files. """
    combined_annotations = {}
    for path in paths:
        with open(path, 'r') as file:
            annotations = json.load(file)
            combined_annotations.update(annotations)
    return combined_annotations

def find_majority_class(annotations):
    """ Find the majority class in the annotation data. """
    class_count = {}
    for action in annotations.values():
        if action in class_count:
            class_count[action] += 1
        else:
            class_count[action] = 1
    majority_class = max(class_count, key=class_count.get)
    return majority_class, class_count[majority_class]

def evaluate_baseline(majority_class, annotations):
    """ Evaluate the baseline model based on the majority class. """
    correct_predictions = sum(1 for action in annotations.values() if action == majority_class)
    total_predictions = len(annotations)
    accuracy = correct_predictions / total_predictions
    return accuracy

if __name__ == "__main__":
    # Define the paths to your annotation files
    annotation_paths = [
        '/content/drive/My Drive/dataset/annotation_dict.json',
        '/content/drive/My Drive/dataset/augmented_annotation_dict.json'
    ]

    # Load and combine annotations from both files
    annotations = load_annotations(annotation_paths)

    # Find the majority class
    majority_class, count = find_majority_class(annotations)
    print(f"The majority class is '{majority_class}' with {count} instances.")

    # Evaluate the baseline model
    accuracy = evaluate_baseline(majority_class, annotations)
    print(f"Baseline accuracy using the majority class is {accuracy:.2%}.")

The majority class is '9' with 11749 instances.
Baseline accuracy using the majority class is 23.55%.


In [None]:
import json
from sklearn.metrics import f1_score, precision_score, recall_score

def load_annotations(paths):
    """ Load and combine annotation data from multiple JSON files. """
    combined_annotations = {}
    for path in paths:
        with open(path, 'r') as file:
            annotations = json.load(file)
            combined_annotations.update(annotations)
    return combined_annotations

def find_majority_class(annotations):
    """ Find the majority class in the annotation data. """
    class_count = {}
    for action in annotations.values():
        if action in class_count:
            class_count[action] += 1
        else:
            class_count[action] = 1
    majority_class = max(class_count, key=class_count.get)
    return majority_class, class_count[majority_class]

def evaluate_baseline(majority_class, annotations):
    """ Evaluate the baseline model based on the majority class. """
    y_true = list(annotations.values())
    y_pred = [majority_class] * len(y_true)  # Predicting majority class for all instances

    # Calculating precision, recall, and F1 score
    precision = precision_score(y_true, y_pred, average='macro', labels=[majority_class])
    recall = recall_score(y_true, y_pred, average='macro', labels=[majority_class])
    f1 = f1_score(y_true, y_pred, average='macro', labels=[majority_class])

    return precision, recall, f1

if __name__ == "__main__":
    # Define the paths to your annotation files
    annotation_paths = [
        '/content/drive/My Drive/dataset/annotation_dict.json',
        '/content/drive/My Drive/dataset/augmented_annotation_dict.json'
    ]

    # Load and combine annotations from both files
    annotations = load_annotations(annotation_paths)

    # Find the majority class
    majority_class, count = find_majority_class(annotations)
    print(f"The majority class is '{majority_class}' with {count} instances.")

    # Evaluate the baseline model
    precision, recall, f1 = evaluate_baseline(majority_class, annotations)
    print(f"Baseline precision for the majority class is {precision:.2%}.")
    print(f"Baseline recall for the majority class is {recall:.2%}.")
    print(f"Baseline F1 score for the majority class is {f1:.2%}.")


The majority class is '9' with 11749 instances.
Baseline precision for the majority class is 23.55%.
Baseline recall for the majority class is 100.00%.
Baseline F1 score for the majority class is 38.12%.


R(2+1)D CNN Model

In [None]:
import numpy as np
from sklearn.metrics import f1_score, precision_score, recall_score

def get_acc_f1_precision_recall(pred_classes, ground_truths, labels=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]):
    """
    Given two linear arrays of predicted classes and ground truths, return accuracy, f1 score, precision and recall
    :param pred_classes: classes predicted by model
    :param ground_truths: ground truths for predictions
    :return: tuple of accuracy, f1, precision, recall
    """

    print(pred_classes)
    print(ground_truths)

    accuracy = np.mean((pred_classes == ground_truths)).astype(np.float64)
    f1 = f1_score(ground_truths, pred_classes, labels=labels, average='micro')
    precision = precision_score(ground_truths, pred_classes, labels=labels, average='micro')
    recall = recall_score(ground_truths, pred_classes, labels=labels, average='micro')

    return accuracy, f1, precision, recall

In [None]:
pip install vidaug


In [None]:
from __future__ import print_function
from __future__ import division

import numpy as np
import copy
import time
from tqdm import tqdm
from easydict import EasyDict
from vidaug import augmentors as vidaug
from sklearn.metrics import confusion_matrix

import torch
from torch.nn import Dropout
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import models
from torch.utils.data import DataLoader, random_split
import matplotlib.pyplot as plt
import os
import sys

# 确保替换为您的utils文件夹所在的父目录的完整路径
sys.path.append('/content/drive/My Drive/dataset')

# 现在尝试导入模块
from utils.checkpoints import init_session_history, save_weights, load_weights, write_history

# 将CUDA设备设置为默认设备（如果可用）
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

args = EasyDict({
    'base_model_name': 'r2plus1d_multiclass',
    'pretrained': True,

    # training/model params
    'lr': 0.0001,
    'start_epoch': 1,
    'num_epochs': 6,
    'layers_list': ['layer3', 'layer4', 'fc'],
    'continue_epoch': False,
    'device': 'cuda' if torch.cuda.is_available() else 'cpu'
    # Dataset params
    'num_classes': 10,
    'batch_size': 25,
    'n_total': 49899,
    'test_n': 4990,
    'val_n': 9980,

    # Path params
    'annotation_path': base_dir+"annotation_dict.json",
    'augmented_annotation_path':base_dir+"augmented_annotation_dict.json",
    'model_path': base_dir+"model-checkpoints/",
    'history_path': base_dir+"histories/history3.txt"
})

def train_model(model, dataloaders, criterion, optimizer, args, start_epoch=1, num_epochs=25):
    """
    Trains the 3D CNN Model
    :param model: Model object that we will train
    :param base_model_name: The base name of the model
    :param dataloaders: A dictionary of train and validation dataloader
    :param criterion: Pytorch Criterion Instance
    :param optimizer: Pytorch Optimizer Instance
    :param num_epochs: Number of epochs during training
    :return: model, train_loss_history, val_loss_history, train_acc_history, val_acc_history, train_f1_score, val_f1_score, plot_epoch
    """

    # Initializes Session History in the history file
    init_session_history(args)
    since = time.time()

    train_acc_history = []
    val_acc_history = []
    train_loss_history = []
    val_loss_history = []
    train_f1_score = []
    val_f1_score = []
    plot_epoch = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(start_epoch, num_epochs):

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
                train_pred_classes = []
                train_ground_truths = []
            else:
                model.eval()  # Set model to evaluate mode
                val_pred_classes = []
                val_ground_truths = []

            running_loss = 0.0
            running_corrects = 0
            train_n_total = 1

            pbar = tqdm(dataloaders[phase])
            # Iterate over data.
            for sample in pbar:
                inputs = sample["video"].to(device)
                labels = sample["action"].to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, torch.max(labels, 1)[1])

                    _, preds = torch.max(outputs, 1)

                    if phase == 'train':
                        train_pred_classes.extend(preds.detach().cpu().numpy())
                        train_ground_truths.extend(torch.max(labels, 1)[1].detach().cpu().numpy())
                    else:
                        val_pred_classes.extend(preds.detach().cpu().numpy())
                        val_ground_truths.extend(torch.max(labels, 1)[1].detach().cpu().numpy())

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == torch.max(labels, 1)[1])

                pbar.set_description('Phase: {} || Epoch: {} || Loss {:.5f} '.format(phase, epoch, running_loss / train_n_total))
                train_n_total += 1

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            # Calculate elapsed time
            time_elapsed = time.time() - since
            print(phase, ' training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # For Checkpointing and Confusion Matrix
            if phase == 'val':
                val_acc_history.append(epoch_acc)
                val_loss_history.append(epoch_loss)
                val_pred_classes = np.asarray(val_pred_classes)
                val_ground_truths = np.asarray(val_ground_truths)
                val_accuracy, val_f1, val_precision, val_recall = get_acc_f1_precision_recall(
                    val_pred_classes, val_ground_truths
                )
                val_f1_score.append(val_f1)
                val_confusion_matrix = np.array_str(confusion_matrix(val_ground_truths, val_pred_classes, labels=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
                print('Epoch: {} || Val_Acc: {} || Val_Loss: {}'.format(
                    epoch, val_accuracy, epoch_loss
                ))
                print(f'val: \n{val_confusion_matrix}')

                # Deep Copy Model if best accuracy
                if epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(model.state_dict())

                # set current loss to val loss for write history
                val_loss = epoch_loss

            if phase == 'train':
                train_acc_history.append(epoch_acc)
                train_loss_history.append(epoch_loss)
                train_pred_classes = np.asarray(train_pred_classes)
                train_ground_truths = np.asarray(train_ground_truths)
                train_accuracy, train_f1, train_precision, train_recall = get_acc_f1_precision_recall(
                    train_pred_classes, train_ground_truths
                )
                train_f1_score.append(train_f1)
                train_confusion_matrix = np.array_str(confusion_matrix(train_ground_truths, train_pred_classes, labels=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
                print('Epoch: {} || Train_Acc: {} || Train_Loss: {}'.format(
                    epoch, train_accuracy, epoch_loss
                ))
                print(f'train: \n{train_confusion_matrix}')
                plot_epoch.append(epoch)

                # set current loss to train loss for write history
                train_loss = epoch_loss

        # Save Weights
        model_name = save_weights(model, args, epoch, optimizer)

        # Write History after train and validation phase
        write_history(
            args.history_path,
            model_name,
            train_loss,
            val_loss,
            train_accuracy,
            val_accuracy,
            train_f1,
            val_f1,
            train_precision,
            val_precision,
            train_recall,
            val_recall,
            train_confusion_matrix,
            val_confusion_matrix
        )

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, train_loss_history, val_loss_history, train_acc_history, val_acc_history, train_f1_score, val_f1_score, plot_epoch

def check_accuracy(loader, model):
    num_correct = 0
    num_samples = 0
    model.eval()

    with torch.no_grad():
        i = args.batch_size

        pbar = tqdm(loader)
        for sample in pbar:
            x = sample["video"].to(device=device)
            y = sample["action"].to(device=device)

            scores = model(x)
            predictions = scores.argmax(1)
            y = y.argmax(1)

            num_correct += (predictions == y).sum()
            num_samples += predictions.size(0)

            pbar.set_description('Progress: {}'.format(i/args.test_n))
            i += args.batch_size

        print(f'Got {num_correct} / {num_samples} with accuracy {float(num_correct)/float(num_samples)*100:.2f}')

    model.train()

if __name__ == "__main__":
    print("PyTorch Version: ", torch.__version__)
    print("Torchvision Version: ", torchvision.__version__)
    print("Current Device: ", torch.cuda.current_device())
    print("Device: ", torch.cuda.device(0))
    print("Cuda Is Available: ", torch.cuda.is_available())
    print("Device Count: ", torch.cuda.device_count())

    # Initialize R(2+1)D Model
    model = models.video.r2plus1d_18(pretrained=args.pretrained, progress=True)

    # change final fully-connected layer to output 10 classes
    for param in model.parameters():
        param.requires_grad = False

    # Get the number of features before modifying the fc layer
    num_ftrs = model.fc.in_features

    # Adding dropout layer before the final fully connected layer
    dropout_rate = 0.5  # You can adjust the dropout rate here
    model.fc = nn.Sequential(
        nn.Dropout(dropout_rate),
        nn.Linear(num_ftrs, args.num_classes)
    )


    for name, param in model.named_parameters():
        for layer in args.layers_list:
            if layer in name:
                param.requires_grad = True
    # New Model is trained with 128x176 images
    # Calculation:
    model.fc = nn.Linear(num_ftrs, args.num_classes, bias=True)
    print(model)

    params_to_update = model.parameters()
    print("Params to learn:")
    params_to_update = []
    for name, param in model.named_parameters():
        if param.requires_grad == True:
            params_to_update.append(param)
            print("\t", name)

    if device.type == 'cuda':
        print(torch.cuda.get_device_name(0))
        print('Memory Usage:')
        print('Allocated:', round(torch.cuda.memory_allocated(0) / 1024 ** 3, 1), 'GB')
        print('Cached:   ', round(torch.cuda.memory_reserved(0) / 1024 ** 3, 1), 'GB')
        print(" ")

    # Transforms
    sometimes = lambda aug: vidaug.Sometimes(0.5, aug)  # Used to apply augmentor with 50% probability
    video_augmentation = vidaug.Sequential([
        sometimes(vidaug.Salt()),
        sometimes(vidaug.Pepper()),
    ], random_order=True)

    #Load Dataset
    basketball_dataset = BasketballDataset(annotation_dict=args.annotation_path,
                                           augmented_dict=args.augmented_annotation_path)

    train_subset, test_subset = random_split(
    basketball_dataset, [args.n_total-args.test_n, args.test_n], generator=torch.Generator().manual_seed(1))

    train_subset, val_subset = random_split(
        train_subset, [args.n_total-args.test_n-args.val_n, args.val_n], generator=torch.Generator().manual_seed(1))

    train_loader = DataLoader(dataset=train_subset, shuffle=True, batch_size=args.batch_size)
    val_loader = DataLoader(dataset=val_subset, shuffle=False, batch_size=args.batch_size)
    test_loader = DataLoader(dataset=test_subset, shuffle=False, batch_size=args.batch_size)

    dataloaders_dict = {'train': train_loader, 'val': val_loader}

    # Train
    optimizer_ft = optim.Adam(params_to_update, lr=args.lr)

    criterion = nn.CrossEntropyLoss()

    if args.continue_epoch:
        model = load_weights(model, args)

    if torch.cuda.is_available():
        # Put model into device after updating parameters
        model = model.to(device)
        criterion = criterion.to(device)

    # Train and evaluate
    model, train_loss_history, val_loss_history, train_acc_history, val_acc_history, train_f1_score, val_f1_score, plot_epoch = train_model(model,
                                                                                                                                            dataloaders_dict,
                                                                                                                                            criterion,
                                                                                                                                            optimizer_ft,
                                                                                                                                            args,
                                                                                                                                            start_epoch=args.start_epoch,
                                                                                                                                            num_epochs=args.num_epochs)

        # 将数据从 GPU 转移至 CPU 并转换为 NumPy 数组，以便绘图
    train_acc = [x.cpu().numpy() for x in train_acc_history]
    val_acc = [x.cpu().numpy() for x in val_acc_history]
    epochs = range(1, len(train_acc_history) + 1)

    # 绘制训练和验证准确率图
    plt.figure(figsize=(10, 5))
    plt.subplot(121)
    plt.plot(epochs, train_acc, label='Train Accuracy')
    plt.plot(epochs, val_acc, label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()
    print("Best Validation Loss: ", min(val_loss_history), "Epoch: ", val_loss_history.index(min(val_loss_history)))
    print("Best Training Loss: ", min(train_loss_history), "Epoch: ", train_loss_history.index(min(train_loss_history)))


In [None]:
def check_accuracy(loader, model, device):
    model.eval()  # Set model to evaluation mode

    all_predictions = []
    all_labels = []

    with torch.no_grad():
        pbar = tqdm(loader, desc="Evaluating")
        for sample in pbar:
            videos = sample["video"].to(device)
            labels = sample["action"].to(device)

            # Compute model output
            outputs = model(videos)
            _, preds = torch.max(outputs, 1)

            # 如果标签是独热编码，需要转换它们为类别索引
            if labels.ndim > 1 and labels.size(1) > 1:
                labels = labels.argmax(dim=1)

            # Append predictions and labels
            all_predictions.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Compute metrics
    accuracy = accuracy_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions, average='macro', zero_division=0)
    recall = recall_score(all_labels, all_predictions, average='macro', zero_division=0)
    f1 = f1_score(all_labels, all_predictions, average='macro', zero_division=0)
    cm = confusion_matrix(all_labels, all_predictions)

    print(f"Accuracy: {accuracy * 100:.2f}%")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print("Confusion Matrix:")
    print(cm)

    model.train()  # Set model back to training mode

# Usage example
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

check_accuracy(test_loader, model, device)

CONV-LSTM Model

資料載入

In [None]:
import json
import numpy as np
import cv2

import torch
from torch.utils.data import Dataset
from torchvision import transforms

base_dir = '/content/drive/My Drive/dataset/'

class VideoDataset(Dataset):
    """SpaceJam: a Dataset for Basketball Action Recognition."""

    def __init__(self, annotation_dict, augmented_dict, video_dir=base_dir+"examples/", augmented_dir=base_dir+"augmented-examples/", augment=True, transform=None, poseData=False):
        with open(annotation_dict) as f:
            self.video_list = list(json.load(f).items())

        if augment:
            self.augment = augment
            with open(augmented_dict) as f:
                augmented_list = list(json.load(f).items())
            self.augmented_dir = augmented_dir
            # extend with augmented data
            self.video_list.extend(augmented_list)

        self.video_dir = video_dir
        self.poseData = poseData
        self.transform = transform

    def __len__(self):
        # 返回视频列表的长度作为数据集的长度
        return len(self.video_list)

    def __getitem__(self, idx):
      video_id = self.video_list[idx][0]
      encoding = int(self.video_list[idx][1])
      video = self.VideoToNumpy(video_id)
      if video is None:
        return None
      sample = {'video_id': video_id, 'video': torch.from_numpy(video).float(), 'action': encoding, 'class': encoding}
      return sample

    def VideoToNumpy(self, video_id):
      video_path = os.path.join(self.video_dir, video_id + ".mp4")
      video = cv2.VideoCapture(video_path)
      if not video.isOpened():
        augmented_video_path = os.path.join(self.augmented_dir, video_id + ".mp4")
        video = cv2.VideoCapture(augmented_video_path)
        if not video.isOpened():
            print(f"Cannot read video: {video_path} or {augmented_video_path}")
            return None

      video_frames = []
      while True:
        success, frame = video.read()
        if not success:
            break
        frame = frame.astype(np.float32) / 255.0
        frame = np.transpose(frame, (2, 0, 1))
        video_frames.append(frame)

      video.release()
      # 如果帧数不足16，重复最后一帧直到达到16帧
      while len(video_frames) < 16:
        video_frames.append(video_frames[-1])  # 假设视频至少有一帧

      return np.array(video_frames)


if __name__ == "__main__":
    video_dataset = VideoDataset(annotation_dict=base_dir + "annotation_dict.json",
                                 augmented_dict=base_dir + "augmented_annotation_dict.json")
    print(video_dataset[1]['action'])
    print(video_dataset[1]['class'])
    print(len(video_dataset))


8
8
49899


參數設置

In [None]:
from easydict import EasyDict

base_dir = '/content/drive/My Drive/dataset/'  # 确保这个路径是正确的

args = EasyDict({
    'base_model_name': 'conv-lstm',
    'pretrained': True,
    'lr': 0.0001,
    'start_epoch': 1,
    'num_epochs': 6,
    'layers_list': ['layer3', 'layer4', 'fc'],
    'continue_epoch': False,
    'num_classes': 10,
    'batch_size': 25,
    'n_total': 49899,
    'test_n': 4990,
    'val_n': 9980,
    'annotation_path': base_dir + "annotation_dict.json",
    'augmented_annotation_path': base_dir + "augmented_annotation_dict.json",
    'device': 'cuda' if torch.cuda.is_available() else 'cpu'
})


模型定義

In [None]:
import torch
import torch.nn as nn

class ConvLSTMCell(nn.Module):
    def __init__(self, input_dim, hidden_dim, kernel_size, bias):
        super(ConvLSTMCell, self).__init__()

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim

        self.kernel_size = kernel_size
        self.padding = kernel_size // 2
        self.bias = bias

        self.conv = nn.Conv2d(in_channels=self.input_dim + self.hidden_dim,
                              out_channels=4 * self.hidden_dim,
                              kernel_size=self.kernel_size,
                              padding=self.padding,
                              bias=self.bias)

    def forward(self, input_tensor, cur_state):
        h_cur, c_cur = cur_state

        combined = torch.cat([input_tensor, h_cur], dim=1)  # concatenate along channel axis
        combined_conv = self.conv(combined)
        cc_i, cc_f, cc_o, cc_g = torch.split(combined_conv, self.hidden_dim, dim=1)
        i = torch.sigmoid(cc_i)
        f = torch.sigmoid(cc_f)
        o = torch.sigmoid(cc_o)
        g = torch.tanh(cc_g)

        c_next = f * c_cur + i * g
        h_next = o * torch.tanh(c_next)

        return h_next, c_next

    def init_hidden(self, batch_size, image_size):
        height, width = image_size
        return (torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device),
                torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device))

In [None]:
class ConvLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, kernel_size, num_layers, batch_first=False, bias=True, return_all_layers=False):
        super(ConvLSTM, self).__init__()

        self._check_kernel_size_consistency(kernel_size)

        # Ensure hidden_dim and kernel_size are lists having len == num_layers
        hidden_dim = self._extend_for_multilayer(hidden_dim, num_layers)
        kernel_size = self._extend_for_multilayer(kernel_size, num_layers)
        if not len(hidden_dim) == len(kernel_size) == num_layers:
            raise ValueError('Inconsistent list length.')

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.kernel_size = kernel_size
        self.num_layers = num_layers
        self.batch_first = batch_first
        self.bias = bias
        self.return_all_layers = return_all_layers

        cell_list = []
        for i in range(0, self.num_layers):
            cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i - 1]

            cell_list.append(ConvLSTMCell(input_dim=cur_input_dim,
                                          hidden_dim=self.hidden_dim[i],
                                          kernel_size=self.kernel_size[i],
                                          bias=self.bias))

        self.cell_list = nn.ModuleList(cell_list)

    def forward(self, input_tensor, hidden_state=None):
        if not self.batch_first:
            # (time, batch, channel, height, width) -> (batch, time, channel, height, width)
            input_tensor = input_tensor.permute(1, 0, 2, 3, 4)

        batch_size, _, _, height, width = input_tensor.size()

        if hidden_state is None:
            hidden_state = self._init_hidden(batch_size=batch_size, image_size=(height, width))

        layer_output_list = []
        last_state_list = []

        seq_len = input_tensor.size(1)
        cur_layer_input = input_tensor

        for layer_idx in range(self.num_layers):

            h, c = hidden_state[layer_idx]
            output_inner = []
            for t in range(seq_len):
                h, c = self.cell_list[layer_idx](input_tensor=cur_layer_input[:, t, :, :, :],
                                                 cur_state=[h, c])
                output_inner.append(h)

            layer_output = torch.stack(output_inner, dim=1)
            cur_layer_input = layer_output

            layer_output_list.append(layer_output)
            last_state_list.append([h, c])

        if not self.return_all_layers:
            layer_output_list = layer_output_list[-1:]
            last_state_list = last_state_list[-1:]

        return layer_output_list, last_state_list

    def _init_hidden(self, batch_size, image_size):
        init_states = []
        for i in range(self.num_layers):
            init_states.append(self.cell_list[i].init_hidden(batch_size, image_size))
        return init_states

    @staticmethod
    def _check_kernel_size_consistency(kernel_size):
        if not (isinstance(kernel_size, tuple) or
                (isinstance(kernel_size, list) and all([isinstance(elem, tuple) for elem in kernel_size]))):
            raise ValueError('`kernel_size` must be tuple or list of tuples')

    @staticmethod
    def _extend_for_multilayer(param, num_layers):
        if not isinstance(param, list):
            param = [param] * num_layers
        return param


Dataloader設置

In [None]:
from torch.utils.data import DataLoader, random_split

# 假设args包含所有必要的参数
video_dataset = VideoDataset(annotation_dict=args.annotation_path, augmented_dict=args.augmented_annotation_path)

# 确定训练集、验证集和测试集的大小
train_size = 34929
val_size = 9980
test_size = 4990  # 测试集大小

# 确保总大小与分配匹配
total_size = train_size + val_size + test_size
assert total_size == len(video_dataset), "Size mismatch"

# 划分数据集
train_val_dataset, test_dataset = random_split(video_dataset, [train_size + val_size, test_size], generator=torch.Generator().manual_seed(42))
train_dataset, val_dataset = random_split(train_val_dataset, [train_size, val_size], generator=torch.Generator().manual_seed(42))

# 创建DataLoaders
train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=args.batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False)



訓練

In [None]:
!pip install torchmetrics

In [None]:
!pip install tqdm

In [None]:
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import DataLoader, random_split
from torchmetrics import ConfusionMatrix, Accuracy, Precision, Recall, F1Score
from tqdm import tqdm
import os
base_dir = '/content/drive/My Drive/dataset/'
# 初始化模型、优化器和损失函数
model = ConvLSTM(num_classes=args.num_classes).to(args.device)
optimizer = Adam(model.parameters(), lr=args.lr)
criterion = nn.CrossEntropyLoss()

# 初始化评价指标
metrics = {
    'accuracy': Accuracy(num_classes=args.num_classes, average='macro', task='multiclass').to(args.device),
    'precision': Precision(num_classes=args.num_classes, average='macro', task='multiclass').to(args.device),
    'recall': Recall(num_classes=args.num_classes, average='macro', task='multiclass').to(args.device),
    'f1': F1Score(num_classes=args.num_classes, average='macro', task='multiclass').to(args.device)
}

# 准备数据加载器
train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=args.batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False)

# 准备存储训练历史的文件夹和文件
history_dir = os.path.join(base_dir+"histories")
if not os.path.exists(history_dir):
    os.makedirs(history_dir)
history_path = os.path.join(history_dir, "historyLSTM.txt")

# 添加精确度、召回率、F1分数和混淆矩阵到头部信息
with open(history_path, "w") as f:
    f.write("epoch,train_loss,train_accuracy,train_precision,train_recall,train_f1,validation_loss,validation_accuracy,validation_precision,validation_recall,validation_f1\n")


def process_epoch(mode, model, loader, optimizer, criterion, device, metrics, history_file, epoch=None):
    if mode == 'train':
        model.train()
    else:
        model.eval()

    total_loss = 0
    confusion_matrix = ConfusionMatrix(num_classes=args.num_classes, task="multiclass").to(device)
    progress_bar = tqdm(loader, desc=f"Epoch {epoch} [{mode.upper()}]", total=len(loader))

    for data in progress_bar:
        videos = data['video'].to(device)
        labels = data['action'].to(device)

        if mode == 'train':
            optimizer.zero_grad()

        with torch.set_grad_enabled(mode == 'train'):
            outputs = model(videos)
            loss = criterion(outputs, labels)
            if mode == 'train':
                loss.backward()
                optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        confusion_matrix.update(predicted, labels)

        for metric in metrics.values():
            metric(outputs, labels)

        progress_bar.set_postfix(loss=loss.item())

    avg_loss = total_loss / len(loader)
    metrics_results = {name: metric.compute().item() for name, metric in metrics.items()}
    metrics_results['loss'] = avg_loss
    cm = confusion_matrix.compute()

    if history_file and epoch is not None:
        history_file.write(f"{epoch},{avg_loss},{metrics_results['accuracy']},{metrics_results['precision']},{metrics_results['recall']},{metrics_results['f1']}\n")

    return metrics_results, cm

#训练和验证循环调用
with open(history_path, "a") as f:
    for epoch in range(args.start_epoch, args.num_epochs + 1):
        train_metrics, train_cm = process_epoch('train', model, train_loader, optimizer, criterion, args.device, metrics, None, epoch)
        val_metrics, val_cm = process_epoch('validate', model, val_loader, None, criterion, args.device, metrics, None, epoch)

        # 将训练和验证的结果写入文件
        f.write(f"{epoch},{train_metrics['loss']},{train_metrics['accuracy']},{train_metrics['precision']},{train_metrics['recall']},{train_metrics['f1']},")
        f.write(f"{val_metrics['loss']},{val_metrics['accuracy']},{val_metrics['precision']},{val_metrics['recall']},{val_metrics['f1']}\n")

        print(f"Train CM Epoch {epoch}:\n{train_cm}")
        print(f"Validation CM Epoch {epoch}:\n{val_cm}")



# 选项：完成所有训练后进行最终测试评估
def evaluate(model, loader, criterion, device, metrics):
    model.eval()
    total_loss = 0
    confusion_matrix = ConfusionMatrix(num_classes=args.num_classes, task="multiclass").to(device)

    with torch.no_grad():
        for data in loader:
            videos = data['video'].to(device)
            labels = data['action'].to(device)
            outputs = model(videos)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            confusion_matrix.update(predicted, labels)
            for metric in metrics.values():
                metric(outputs, labels)

    avg_loss = total_loss / len(loader)
    metrics_results = {name: metric.compute().item() for name, metric in metrics.items()}
    metrics_results['loss'] = avg_loss
    cm = confusion_matrix.compute()

    return metrics_results, cm

metrics, test_cm = evaluate(model, test_loader, criterion, args.device, metrics)
print(f"Final Test Metrics: {metrics}")
print(f"Test Confusion Matrix:\n{test_cm}")




Error Analysis(R(2+1)D CNN Model)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix

# 定义你的混淆矩阵和标签
conf_matrix = np.array([
    [477, 6, 5, 0, 7, 3, 3, 2, 0, 6],
    [5, 487, 3, 3, 3, 4, 0, 2, 0, 0],
    [1, 0, 529, 4, 2, 1, 4, 2, 0, 54],
    [0, 5, 16, 311, 0, 5, 4, 0, 0, 6],
    [2, 4, 0, 0, 218, 1, 0, 0, 0, 0],
    [4, 18, 8, 14, 2, 203, 2, 1, 6, 14],
    [0, 3, 16, 3, 0, 5, 254, 4, 40, 70],
    [2, 1, 2, 0, 0, 0, 2, 316, 2, 11],
    [3, 4, 4, 0, 0, 2, 29, 4, 434, 138],
    [3, 4, 69, 1, 0, 4, 27, 4, 59, 1013]
])

labels = ['block', 'pass', 'run', 'dribble', 'shoot', 'ball in hand', 'defense', 'pick', 'no_action', 'walk']

# 模拟实际标签和预测标签
y_true = []
y_pred = []
for i, row in enumerate(conf_matrix):
    for j, count in enumerate(row):
        y_true.extend([i] * count)
        y_pred.extend([j] * count)

# 生成分类报告
report = classification_report(y_true, y_pred, target_names=labels, output_dict=True)

# 计算每个类别的准确率
accuracies = conf_matrix.diagonal() / conf_matrix.sum(axis=1)

# 打印分类报告并加入每个类别的准确率
print(f"{'Class':<15}{'Precision':<10}{'Recall':<10}{'F1-Score':<10}{'Accuracy':<10}")
for label in labels:
    precision = report[label]['precision']
    recall = report[label]['recall']
    f1_score = report[label]['f1-score']
    accuracy = accuracies[labels.index(label)]
    print(f"{label:<15}{precision:<10.2f}{recall:<10.2f}{f1_score:<10.2f}{accuracy:<10.2f}")

# 绘制混淆矩阵
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix for R(2+1)D CNN Model on Test Set')
plt.show()


Error Analysis(CONV-LSTM Model)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix

# 定义你的混淆矩阵和标签
conf_matrix = np.array([
    [469,   9,   4,   0,   5,   2,   7,  12,   4,   6],
        [  5, 524,   5,   4,   4,  13,   0,   4,   2,   3],
        [  0,   0, 450,   5,   0,   0,   3,   0,   0,  96],
        [  0,   3,   4, 318,   0,  27,   1,   0,   0,   3],
        [  6,   7,   0,   0, 181,   2,   0,   1,   0,   0],
        [  3,  12,   6,   5,   1, 184,   2,   0,   1,   5],
        [  0,   1,   7,   4,   0,   7, 252,   0,  42,  63],
        [  1,  11,   4,   0,   0,   3,   2, 302,   5,  11],
        [  2,   3,   1,   1,   0,   6,  22,   1, 488, 145],
        [  2,   1,  53,   5,   0,   5,  31,   3, 108, 990]
])

labels = ['block', 'pass', 'run', 'dribble', 'shoot', 'ball in hand', 'defense', 'pick', 'no_action', 'walk']

# 模拟实际标签和预测标签
y_true = []
y_pred = []
for i, row in enumerate(conf_matrix):
    for j, count in enumerate(row):
        y_true.extend([i] * count)
        y_pred.extend([j] * count)

# 生成分类报告
report = classification_report(y_true, y_pred, target_names=labels, output_dict=True)

# 计算每个类别的准确率
accuracies = conf_matrix.diagonal() / conf_matrix.sum(axis=1)

# 打印分类报告并加入每个类别的准确率
print(f"{'Class':<15}{'Precision':<10}{'Recall':<10}{'F1-Score':<10}{'Accuracy':<10}")
for label in labels:
    precision = report[label]['precision']
    recall = report[label]['recall']
    f1_score = report[label]['f1-score']
    accuracy = accuracies[labels.index(label)]
    print(f"{label:<15}{precision:<10.2f}{recall:<10.2f}{f1_score:<10.2f}{accuracy:<10.2f}")

# 绘制混淆矩阵
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix for CONV-LSTM Model on Test Set')
plt.show()
