In [3]:
import time, math, torch, gc
from torch.utils.data import DataLoader
from torch import optim
from torch import nn, randn, exp, sum
import numpy as np
import pandas as pd
np.set_printoptions(suppress=True)
torch.set_printoptions(sci_mode=False)
import warnings
warnings.filterwarnings('ignore')

In [4]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(DEVICE)

cuda


In [5]:
"""读取train data and label"""
import os
import cv2
import numpy as np
from torch.utils.data import Dataset

class LoadPoseData(Dataset):

    def __init__(self, thePath, is_train = True):
        super(LoadPoseData, self).__init__()
        self.dataset = []

        sub_dir = "train" if is_train else "valid"

        for tag in os.listdir(f"{thePath}/{sub_dir}"):
            file_dir = f"{thePath}/{sub_dir}/{tag}"
            for img_file in os.listdir(file_dir):
                img_path = f"{file_dir}/{img_file}"
                if tag == 'fall':
                    self.dataset.append((img_path, 0))
                else:
                    self.dataset.append((img_path, 1))

    def __len__(self):

        return len(self.dataset)

    def __getitem__(self, item):
        
        data = self.dataset[item]

        img = cv2.imread(data[0],cv2.IMREAD_GRAYSCALE) #以灰度图形式读数据

        img = img.reshape(-1)
        img = img/255 #把数据转成[0,1]之间的数据

        tag_one_hot = np.zeros(2)
        tag_one_hot[int(data[1])] = 1

        return np.float32(img),np.float32(tag_one_hot)

In [6]:
import os
import cv2
import numpy as np
import random
from torch.utils.data import Dataset
from imgaug import augmenters as iaa  # 数据增强库

class LoadPoseDataEnhance(Dataset):
    def __init__(self, thePath, is_train=True, augment_minority=True, argument_per = 0.3):
        super(LoadPoseDataEnhance, self).__init__()
        self.dataset = []
        self.augment_minority = augment_minority  # 是否增强少数类
        self.minority_class = 0  # 少数类标签（'fall'）
        self.argument_per = argument_per
        
        sub_dir = "train" if is_train else "valid"

        # 统计各类样本数量
        class_counts = {0: 0, 1: 0}
        for tag in os.listdir(f"{thePath}/{sub_dir}"):
            file_dir = f"{thePath}/{sub_dir}/{tag}"
            for img_file in os.listdir(file_dir):
                img_path = f"{file_dir}/{img_file}"
                label = 0 if tag == 'fall' else 1
                self.dataset.append((img_path, label))
                class_counts[label] += 1

        # 如果需要增强少数类且训练集
        if is_train and augment_minority:
            minority_count = class_counts[self.minority_class]
            majority_count = class_counts[1 - self.minority_class]
            needed_augmentations = int(self.argument_per * majority_count) - minority_count  # 仅补充到30%

            # 从少数类样本中随机选择需要增强的样本
            minority_samples = [x for x in self.dataset if x[1] == self.minority_class]
            augment_samples = random.choices(minority_samples, k=needed_augmentations)

            # 对选中的样本进行增强并添加到数据集
            for sample in augment_samples:
                img_path, label = sample
                img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
                augmented_img = self._augment_image(img)  # 数据增强
                self.dataset.append((augmented_img, label))  # 存储增强后的图像（非路径）

    def _augment_image(self, img):
        """对图像应用随机增强"""

        aug = iaa.Sequential([
            iaa.Affine(
                rotate=(-10, 10),       # 减小旋转范围
                scale=(0.9, 1.1),       # 轻微缩放
                translate_px=(-5, 5)    # 小幅平移
            ),
            iaa.Crop(percent=(0, 0.05)) # 最小化裁剪
            ], random_order = True)

        augmented_img = aug(image=img)
        return augmented_img

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, item):
        data = self.dataset[item]
        
        # 如果存储的是增强后的图像（直接是numpy数组）
        if isinstance(data[0], np.ndarray):
            img = data[0]
        else:
            img = cv2.imread(data[0], cv2.IMREAD_GRAYSCALE)
        
        img = img.reshape(-1)  # 展平 (128,128) -> (16384,)
        img = img / 255.0  # 归一化
        
        tag_one_hot = np.zeros(2)
        tag_one_hot[int(data[1])] = 1
        
        return np.float32(img), np.float32(tag_one_hot)

In [7]:
class CNN(nn.Module):
    def __init__(self):
        # 调用父类（nn.Module）的构造函数，确保模型继承并初始化
        super().__init__()
        self.sequential = nn.Sequential(
            nn.Linear(16384, 100),
            nn.ReLU(),
            nn.Linear(100, 2),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        return self.sequential(x)

    
cnn = CNN().to(DEVICE)
print(cnn)


CNN(
  (sequential): Sequential(
    (0): Linear(in_features=16384, out_features=100, bias=True)
    (1): ReLU()
    (2): Linear(in_features=100, out_features=2, bias=True)
    (3): Softmax(dim=1)
  )
)


In [8]:
weight_path = "/home/bhennelly/Documents/QIN/thesis_project/pytorch-openpose-direct-v2/weights"

checkpoint_name = 'FLIR_best_checkpoint.pth'

cnn.load_state_dict(torch.load(os.path.join(weight_path, checkpoint_name)))
cnn.to(DEVICE)

CNN(
  (sequential): Sequential(
    (0): Linear(in_features=16384, out_features=100, bias=True)
    (1): ReLU()
    (2): Linear(in_features=100, out_features=2, bias=True)
    (3): Softmax(dim=1)
  )
)

In [9]:
data_path = "/home/bhennelly/Documents/QIN/thesis_project/pytorch-openpose-direct-v2/datasets"

# train_datas = LoadPoseData(data_path, is_train=True)
# trainLoader = DataLoader(train_datas, batch_size = TRAIN_BATCH_SIZE, shuffle = True)

valid_datas = LoadPoseDataEnhance(thePath=data_path, is_train=False, augment_minority=True)
validLoader = DataLoader(valid_datas, batch_size = 10, shuffle=False)

# print(len(trainLoader))
print(len(validLoader))

54


In [10]:
from sklearn import metrics

In [11]:
from matplotlib import pyplot as plt

In [12]:
EPOCH = 10

epoch_loss = []
epoch_ap = []
epoch_f1 = []
epoch_acc = []

for epoch in range(1, EPOCH + 1):
    valid_sum_loss = 0.0
    valid_sum_ap = 0.0
    valid_sum_f1 = 0.0
    valid_sum_acc = 0.0

    for i, (valid_x, valid_y) in enumerate(validLoader):
        print('-'*10)
        valid_x, valid_y = valid_x.to(DEVICE), valid_y.to(DEVICE)

        cnn.eval()
        with torch.no_grad():  # 关闭梯度计算，节省内存
            valid_pred = cnn(valid_x)  # shape: [batch_size, 2]
        print('预测的概率')
        print(valid_pred)

        valid_loss = torch.mean((valid_y - valid_pred)**2)
        valid_sum_loss += valid_loss.cpu().detach().item()

        # 获取真实标签（假设 valid_y 是 one-hot 编码）
        valid_y_label = torch.argmax(valid_y, dim=1).cpu().numpy()  # shape: [batch_size]
        print('真实数据')
        print(valid_y_label)

        # 获取预测标签（0 或 1）
        pred_labels = torch.argmax(valid_pred, dim=1).cpu().numpy()  # shape: [batch_size]
        print('预测数据')
        print(pred_labels)

        valid_acc = (valid_y_label==pred_labels).sum().item()
        valid_acc = valid_acc/len(valid_y_label)
        valid_sum_acc += valid_acc

        # ======================= metrics ============================
        valid_precision, valid_recall, _ = metrics.precision_recall_curve(valid_y_label, pred_labels)
        valid_ap = metrics.average_precision_score(valid_y_label, pred_labels)
        valid_f1 = metrics.f1_score(valid_y_label, pred_labels)  # 需要类别预测，不是概率

        valid_sum_ap += valid_ap
        valid_sum_f1 += valid_f1
    
    valid_avg_loss = valid_sum_loss / len(validLoader)
    valid_avg_ap = valid_sum_ap / len(validLoader)
    valid_avg_f1 = valid_sum_f1 / len(validLoader)
    valid_avg_acc = valid_sum_acc / len(validLoader)

    print(f"Epoch {epoch}: Loss = {valid_avg_loss:.4f}, Acc = {valid_avg_acc:.4f}, \
          AP = {valid_avg_ap:.4f}, F1 = {valid_avg_f1:.4f}")
    
    epoch_loss.append(valid_avg_loss)
    epoch_ap.append(valid_avg_ap)
    epoch_f1.append(valid_avg_f1)
    epoch_acc.append(valid_avg_acc)

    gc.collect()


----------
预测的概率
tensor([[    0.3546,     0.6454],
        [    0.3546,     0.6454],
        [    0.3546,     0.6454],
        [    0.5129,     0.4871],
        [    0.0008,     0.9992],
        [    0.3546,     0.6454],
        [    0.3546,     0.6454],
        [    0.3546,     0.6454],
        [    0.3546,     0.6454],
        [    0.3546,     0.6454]], device='cuda:0')
真实数据
[0 0 0 0 0 0 0 0 0 0]
预测数据
[1 1 1 0 1 1 1 1 1 1]
----------
预测的概率
tensor([[    0.3546,     0.6454],
        [    0.1297,     0.8703],
        [    0.0016,     0.9984],
        [    0.0915,     0.9085],
        [    0.3546,     0.6454],
        [    0.1741,     0.8259],
        [    0.0129,     0.9871],
        [    0.8071,     0.1929],
        [    0.0000,     1.0000],
        [    0.1026,     0.8974]], device='cuda:0')
真实数据
[0 1 1 1 1 1 1 1 1 1]
预测数据
[1 1 1 1 1 1 1 0 1 1]
----------
预测的概率
tensor([[    0.0675,     0.9325],
        [    0.0000,     1.0000],
        [    0.0149,     0.9851],
        [    0.0472,   

In [13]:
for lst in [epoch_loss, epoch_ap, epoch_f1, epoch_acc]:
    print(len(lst))
    print('mean:', np.mean(lst))
    print('min: ', min(lst))
    print('max: ', max(lst))
    print('medium: ', np.median(lst))
    print('-'*10)

10
mean: 0.1530039199731416
min:  0.1530039199731416
max:  0.1530039199731416
medium:  0.1530039199731416
----------
10
mean: 0.9794467306812985
min:  0.9794467306812985
max:  0.9794467306812985
medium:  0.9794467306812985
----------
10
mean: 0.8862018881626718
min:  0.8862018881626716
max:  0.8862018881626716
medium:  0.8862018881626716
----------
10
mean: 0.8162037037037034
min:  0.8162037037037035
max:  0.8162037037037035
medium:  0.8162037037037035
----------
