In [7]:
# Copyright 2024 ichibanmikan
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#     https://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F

class encoder_acc(nn.Module):
    """
    CNN layers applied on acc sensor data to generate pre-softmax
    ---
    params for __init__():
        input_size: e.g. 1
        num_classes: e.g. 6
    forward():
        Input: data
        Output: pre-softmax
    """
    def __init__(self, input_size):
        super().__init__()

        # Extract features, 2D conv layers
        self.features = nn.Sequential(
            nn.Conv2d(input_size, 64, 2),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Dropout(),

            nn.Conv2d(64, 64, 2),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Dropout(),

            nn.Conv2d(64, 32, 1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Dropout(),

            nn.Conv2d(32, 16, 1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True),

            )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), 16, -1)#[bsz, 16, 1, 198]

        return x


class encoder_gyr(nn.Module):
    """
    CNN layers applied on acc sensor data to generate pre-softmax
    ---
    params for __init__():
        input_size: e.g. 1
        num_classes: e.g. 6
    forward():
        Input: data
        Output: pre-softmax
    """
    def __init__(self, input_size):
        super().__init__()

        # Extract features, 2D conv layers
        self.features = nn.Sequential(
            nn.Conv2d(input_size, 64, 2),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Dropout(),

            nn.Conv2d(64, 64, 2),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Dropout(),

            nn.Conv2d(64, 32, 1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Dropout(),

            nn.Conv2d(32, 16, 1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True),

            )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), 16, -1)

        return x


class Encoder(nn.Module):
    def __init__(self, input_size):
        super().__init__()

        self.encoder_acc = encoder_acc(input_size)
        self.encoder_gyr = encoder_gyr(input_size)

    def forward(self, x1, x2):
        
        acc_output = self.encoder_acc(x1)
        gyro_output = self.encoder_gyr(x2)

        return acc_output, gyro_output



class FMModel(nn.Module):
    """Model for human-activity-recognition."""
    def __init__(self, input_size, num_classes):
        super().__init__()

        self.encoder = Encoder(input_size)

        self.gru = nn.GRU(198, 120, 2, batch_first=True)

        # Classify output, fully connected layers
        self.classifier = nn.Sequential(

            nn.Linear(1920, 1280),
            nn.BatchNorm1d(1280),
            nn.ReLU(inplace=True),

            nn.Linear(1280, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),

            nn.Linear(128, num_classes),
            )

    def forward(self, data_1, data_2):
        # last_dim_size = data_1.size(-1)
        # split_size = last_dim_size // 2
        # data_acc, data_gyr = torch.split(data_1, split_size, dim=-1)

        acc_output, gyro_output = self.encoder(data_1, data_2)

        fused_feature = (acc_output + gyro_output) / 2 # weighted sum

        fused_feature, _ = self.gru(fused_feature)
        fused_feature = fused_feature.contiguous().view(fused_feature.size(0), 1920)

        output = self.classifier(fused_feature)

        return output
            

关于数据：计划先全部读取出来，然后随机分配


In [8]:
import random
import numpy as np

class rdata:
    def __init__(self, data_dir):
        data_list_1 = []
        data_list_2 = []
        labels_list = []
        for d in os.listdir(data_dir):
            if os.path.isdir(os.path.join(data_dir, d)):
                x_tr_1 = np.load(os.path.join(data_dir, d, 'x_train_1.npy'))
                x_te_1 = np.load(os.path.join(data_dir, d, 'x_test_1.npy'))
                x_tr_2 = np.load(os.path.join(data_dir, d, 'x_train_2.npy'))
                x_te_2 = np.load(os.path.join(data_dir, d, 'x_test_2.npy'))
                y_tr = np.load(os.path.join(data_dir, d, 'y_train.npy'))
                y_te = np.load(os.path.join(data_dir, d, 'y_test.npy'))
                for i in range(len(x_tr_1)):
                    data_list_1.append(x_tr_1[i])
                    labels_list.append(y_tr[i])
                    data_list_2.append(x_tr_2[i])
                for i in range(len(x_te_1)):
                    data_list_1.append(x_te_1[i])
                    labels_list.append(y_te[i])
                    data_list_2.append(x_te_2[i])
        self.data_1 = np.array(data_list_1)
        self.data_2 = np.array(data_list_2)
        self.data_1 = self.data_1.astype("float")
        self.data_2 = self.data_2.astype("float")
        self.labels = labels_list  # 150个整数

class data_set(Dataset):
    def __init__(self, data_1, data_2, data_labels):
        super().__init__()
        data_1 = data_1.unsqueeze(1) 
        data_2 = data_2.unsqueeze(1) 
        self.data_1 = data_1
        self.data_2 = data_2
        self.labels = data_labels

    def __len__(self):
        return len(self.data_1)
    
    def __getitem__(self, index):  
        return self.data_1[index], self.data_2[index], self.labels[index]
    
class data_factory:
    def __init__(self, data_dir):
        self.rdata = rdata(data_dir)
    def get_dataset(self):
        board_0 = round(len(self.rdata.data_1) * 0.7)
        board_1 = round(len(self.rdata.data_1) * 0.7)+round(len(self.rdata.data_1) * 0.15)
        
        train_data_1 = torch.tensor(self.rdata.data_1[:board_0])
        train_data_2 = torch.tensor(self.rdata.data_2[:board_0])
        train_labels = torch.tensor(self.rdata.labels[:board_0])
        
        test_data_1 = torch.tensor(self.rdata.data_1[board_0 : board_1])
        test_data_2 = torch.tensor(self.rdata.data_2[board_0 : board_1])
        test_labels = torch.tensor(self.rdata.labels[board_0 : board_1])  
              
        valid_data_1 = torch.tensor(self.rdata.data_1[board_1:])
        valid_data_2 = torch.tensor(self.rdata.data_2[board_1:])
        valid_labels = torch.tensor(self.rdata.labels[board_1:])  
        datasets = [data_set(train_data_1, train_data_2, train_labels), data_set(test_data_1, test_data_2, test_labels), data_set(valid_data_1, valid_data_2, valid_labels)]
        dataloaders = [DataLoader(datasets[0], shuffle=True, batch_size=64), DataLoader(datasets[1], shuffle=True, batch_size=64), DataLoader(datasets[2], batch_size=64)]
        # return datasets, dataloaders
        return dataloaders


In [9]:
import math
import torch
import numpy as np
import torch.optim as optim
from __future__ import print_function

class TwoCropTransform:
    """Create two crops of the same image"""
    def __init__(self, transform):
        self.transform = transform

    def __call__(self, x):
        return [self.transform(x), self.transform(x)]

def accuracy(output, target, topk=(1,)):
    """Computes the accuracy over the k top predictions for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        # print(correct)

        res = []
        for k in topk:
            correct_k = correct[:k].contiguous().view(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res
'''作用: 计算模型预测的准确率。支持计算多个 top-k 的准确率，比如 top-1 或 top-5。

output 是模型的输出，通常是未经处理的 logits。
target 是真实标签。
topk 指定需要计算的 k 个准确率，例如 topk=(1, 5) 会计算 top-1 和 top-5 准确率。
函数返回一个列表，包含每个 k 对应的准确率（以百分比表示）。'''


class train_tools:
    def __init__(self, model, config):
        self.model = model
        self.config = config
        self.set_optimizer()
        
    def set_optimizer(self):
        self.optimizer = optim.SGD(self.model.parameters(),
                        lr=self.config.learning_rate,
                        momentum=self.config.momentum,
                        weight_decay=self.config.weight_decay)

    
    def adjust_learning_rate(self, epoch):
        lr = self.config.learning_rate
        if self.config.cosine:
            eta_min = lr * (self.config.lr_decay_rate ** 3)
            lr = eta_min + (lr - eta_min) * (
                    1 + math.cos(math.pi * epoch / self.config.epochs)) / 2
        else:
            steps = np.sum(epoch > np.asarray(self.config.lr_decay_epochs))
            if steps > 0:
                lr = lr * (self.config.lr_decay_rate ** steps)

        for param_group in self.optimizer.param_groups:
            param_group['lr'] = lr


    def warmup_learning_rate(self, epoch, batch_id, total_batches):
        if self.config.warm and epoch <= self.config.warm_epochs:
            p = (batch_id + (epoch - 1) * total_batches) / \
                (self.config.warm_epochs * total_batches)
            lr = self.config.warmup_from + p * (self.config.warmup_to - self.config.warmup_from)

            for param_group in self.optimizer.param_groups:
                param_group['lr'] = lr


    def save_model(self, epoch, save_file):
        print('==> Saving...')
        state = {
            'opt': self.config,
            'model': self.model.state_dict(),
            'optimizer': self.optimizer.state_dict(),
            'epoch': epoch,
        }
        torch.save(state, save_file)
        del state


class AverageMeter:
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

In [10]:
import time

class Trainer:
    def __init__(self, config, train_loader, valid_loader, device):
        self.config = config
        self.device = device
        self.model = FMModel(1, 13).to(device)
        self.model.train()
        self.criterion = torch.nn.CrossEntropyLoss().to(device)
        self.train_tools = train_tools(self.model, config)
        self.train_loader = train_loader
        self.validater = Validater(self.model, valid_loader, config, self.criterion, device)
        self.best_acc = -100
    def every_epoch_train(self):
        batch_time = AverageMeter()
        data_time = AverageMeter()
        losses = AverageMeter()
        top1 = AverageMeter()
        
        end = time.time()
        for data_1, data_2, labels in self.train_loader:
            data_time.update(time.time() - end)
            bsz = data_1.shape[0]
            
            data_1 = data_1.to(self.device)
            data_2 = data_2.to(self.device)

            labels = labels.to(self.device)
            
            output = self.model(data_1, data_2)
            loss = self.criterion(output, labels)

            acc, _ = accuracy(output, labels, topk=(1, 5))

            # update metric
            losses.update(loss.item(), bsz)
            top1.update(acc[0], bsz)

            # SGD
            self.train_tools.optimizer.zero_grad()
            loss.backward()
            self.train_tools.optimizer.step()

            # measure elapsed time
            batch_time.update(time.time() - end)
            end = time.time()
        print("loss: %f", loss.item())
        return losses.avg
    
    def train(self):
        record_loss = np.zeros(self.config.epochs)
        record_acc = np.zeros(self.config.epochs)
        for epoch in range(0, self.config.epochs + 1):
            self.train_tools.adjust_learning_rate(epoch)
            time1 = time.time()
            loss = self.every_epoch_train()
            time2 = time.time()
            print('epoch {}, total time {:.2f}'.format(epoch, time2 - time1))
            record_loss[epoch-1] = loss
            # evaluation
            loss, val_acc, _ = self.validater.validate()
            record_acc[epoch-1] = val_acc
            if val_acc > self.best_acc:
                self.best_acc = val_acc
                # best_confusion = confusion
            if self.best_acc > 65.01:
                self.train_tools.save_model(epoch, os.path.join(os.getcwd(), 'model/best.pth'))
                break;
        print(record_acc)
class Validater:
    def __init__(self, model, valid_loader, config, criterion, device):
        self.model = model
        self.config = config
        self.criterion = criterion
        self.valid_loader = valid_loader
        self.device = device
    def validate(self):
        self.model.eval()
        batch_time = AverageMeter()
        losses = AverageMeter()
        top1 = AverageMeter()

        confusion = np.zeros((self.config.num_class, self.config.num_class))

        with torch.no_grad():
            end = time.time()
            for data_1, data_2, labels in self.valid_loader:
                
                bsz = labels.shape[0]
                data_1 = data_1.to(self.device)
                data_2 = data_2.to(self.device)
                labels = labels.to(self.device)
                # forward
                output = self.model(data_1, data_2)
                loss = self.criterion(output, labels)

                # update metric
                acc, _ = accuracy(output, labels, topk=(1, 5))
                losses.update(loss.item(), bsz)
                top1.update(acc[0], bsz)

                # calculate and store confusion matrix
                # rows = labels.cpu().numpy()
                # cols = output.max(1)[1].cpu().numpy()
                # for label_index in range(labels.shape[0][0]):
                #     confusion[rows[label_index], cols[label_index]] += 1

                # measure elapsed time
                batch_time.update(time.time() - end)
                end = time.time()

        return losses.avg, top1.avg, confusion

class Tester:
    def __init__(self, model, test_loader, device):
        self.model = model
        self.test_loader = test_loader
        self.device = device
        
    def test(self):
        self.model.eval()
        accs = AverageMeter()

        with torch.no_grad():
            for data_1, data_2, labels in self.test_loader:
                data_1 = data_1.to(self.device)
                data_2 = data_2.to(self.device)
                labels = labels.to(self.device)
                output = self.model(data_1, data_2)
                acc, _ = accuracy(output, labels, topk=(1, 5))

                # calculate and store confusion matrix
                accs.update(acc, data_1.size(0))

        return accs.avg


In [11]:
import json

class Config:
    def __init__(self, config_path) -> None:
        self.config_path = config_path
        self.load_config()

    def load_config(self) -> None:
        with open(self.config_path, 'r') as f:
            config_data = json.load(f)

        self.print_freq = config_data.get('print_freq', 5)
        self.save_freq = config_data.get('save_freq', 20)
        self.batch_size = config_data.get('batch_size', 16)
        self.num_workers = config_data.get('num_workers', 16)
        self.epochs = config_data.get('epochs', 99)
        self.learning_rate = config_data.get('learning_rate', 0.001)
        self.lr_decay_epochs = config_data.get('lr_decay_epochs', '50,100,150')
        self.lr_decay_rate = config_data.get('lr_decay_rate', 0.9)
        self.weight_decay = config_data.get('weight_decay', 0.0001)
        self.momentum = config_data.get('momentum', 0.9)
        self.cosine = config_data.get('cosine', True)
        self.num_class = config_data.get('num_class', 12)

    def __repr__(self) -> str:
        return f"Config({self.__dict__})"


In [None]:
if __name__ == "__main__":
    if torch.backends.mps.is_available():
        device = torch.device("mps")
    elif torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")
    config = Config(os.path.join(os.getcwd(), 'config.json'))
    data_f = data_factory(os.path.join(os.getcwd(), 'datasets/USC-data/'))
    train_loader, valid_loader, test_loader = data_f.get_dataset()
    tr = Trainer(config, train_loader, valid_loader, device)

    tr.train()
    print(tr.best_acc)
    te = Tester(tr.model, test_loader, device)
    acc = te.test()
    
    print(acc)
    