In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# class CustomCNN(nn.Module):
#     def __init__(self, num_classes=10, num_conv_layers=3, in_channels=3, hidden_channels=64):
#         super(CustomCNN, self).__init__()
#         self.num_conv_layers = num_conv_layers
#         self.in_channels = in_channels
# 
#         layers = []
#         for i in range(num_conv_layers):
#             layers.append(
#                 nn.Conv2d(
#                     in_channels=in_channels if i == 0 else hidden_channels,
#                     out_channels=hidden_channels,
#                     kernel_size=3,
#                     stride=1,
#                     padding=1
#                 )
#             )
#             layers.append(nn.ReLU())
#             layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
# 
#         self.conv_layers = nn.Sequential(*layers)
#         self.fc1 = nn.Linear(hidden_channels * (32 // (2 ** num_conv_layers)) ** 2, 32)
#         self.fc2 = nn.Linear(32, num_classes)
# 
#     def forward(self, x):
#         x = self.conv_layers(x)
#         x = torch.flatten(x, 1)
#         x = self.fc1(x)
#         x = nn.ReLU(x)
#         x = self.fc2(x)
#         return x

# # Initialize a model with 5 convolutional layers, 3 input channels (e.g., RGB images), and 10 output classes.
# model = CustomCNN(num_classes=3, num_conv_layers=5, in_channels=3, hidden_channels=64)

# print(model)


In [2]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader, ConcatDataset
import numpy as np
from torchvision import transforms

class CustomCSVDataset(Dataset):
    def __init__(self, root_dir, transform=None, fixed_length=500, included_classes=None):
        self.root_dir = root_dir
        self.transform = transform
        self.fixed_length = fixed_length
        self.data = []
        self.labels = []

        self.root = sorted(os.listdir(root_dir))  
        # self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}

        for subjects_idx, subjects in enumerate(self.root):
            subjects_dir = os.path.join(root_dir, subjects)
            
            if included_classes is not None:
                # for class_index, class_name in enumerate(os.listdir(subjects_dir)):
                for class_index, class_name in enumerate(included_classes):
                        user_dir = os.path.join(subjects_dir, class_name)
                        for csv_file in os.listdir(user_dir):
                            file_path = os.path.join(user_dir, csv_file)
                            # print(file_path)
                            data_frame = pd.read_csv(file_path, index_col=0, header=0, usecols= [0,1,2,3])
                            self.data.append(data_frame.values)
                            self.labels.append(class_index)
                            # print(self.labels)
            else:
                 for class_index, class_name in enumerate(os.listdir(subjects_dir)):
                        user_dir = os.path.join(subjects_dir, class_name)
                        for csv_file in os.listdir(user_dir):
                            file_path = os.path.join(user_dir, csv_file)
                            # print(file_path)
                            data_frame = pd.read_csv(file_path, index_col=0, header=0, usecols= [0,1,2,3])
                            self.data.append(data_frame.values)
                            self.labels.append(class_index)
                            # print(self.labels)
                
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        label = self.labels[idx]

        if len(sample) > self.fixed_length:
            sample = sample[:self.fixed_length]
        else:
            padding = np.zeros((self.fixed_length - len(sample), sample.shape[1]))
            sample = np.vstack((sample, padding))

        if self.transform:
            sample = self.transform(sample)
        
        # (channels, height, width)
        sample = torch.tensor(sample, dtype=torch.float32).permute(1, 0) 
        sample = sample.unsqueeze(1)  # Add dimension

        return sample, label

class ToTensorAndNormalize:
    def __call__(self, sample):
        # 合并通道
        # sample = np.mean(sample, axis=1, keepdims=True)
        # 标准化
        sample = (sample - sample.mean()) / sample.std()
        return torch.tensor(sample, dtype=torch.float32)


# dataset = CustomCSVDataset(root_dir='datasets/Merge_fold/ByNum_spline5', transform=ToTensorAndNormalize(), fixed_length=600, included_classes=['a1t'])
# dataset = CustomCSVDataset(root_dir='datasets/Merge_fold/ByNum_spline5', transform=ToTensorAndNormalize(), fixed_length=600, included_classes=['a1t', 'a7t','a8t'])

# dataloader = DataLoader(dataset, batch_size=8, shuffle=True)
# (batch_size, channels, height, width)

# for inputs, labels in dataloader:
#     print(labels)
    # print(inputs.shape, labels.shape)



In [3]:
import random
#   设置种子

def seed_everything(seed=11):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything()



In [4]:
class CustomCNN(nn.Module):
    def __init__(self, num_classes=3):
        super(CustomCNN, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=(3, 3), padding=(1, 1)),  #(batch_size, 32, 600, 3)
            # nn.Conv2d(1, 32, kernel_size=(3, 3), padding=(1, 1)),  # (batch_size, 32, 600, 1)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 2)),  # (batch_size, 32, 300, 3)
            nn.Conv2d(32, 64, kernel_size=(3, 3), padding=(1, 1)),  # (batch_size, 64, 300, 3)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 2)),  #(batch_size, 64, 150, 3)
            
            nn.Conv2d(64, 128, kernel_size=(3, 3), padding=(1, 1)),  #(batch_size, 128, 150, 3)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 2))  # (batch_size, 128, 75, 3)
        )
        self.fc = nn.Sequential(
            nn.Linear(128 * 75, 512), # (out1 * out 2)
            nn.ReLU(),
            # nn.Linear(512,256),
            # nn.ReLU(),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        # print(x.shape)
        x = torch.flatten(x, 1)
        # print(x.shape)
        x = self.fc(x)
        return x
    


In [5]:
import torch.optim as optim
from torch.utils.data import random_split




# model = CustomCNN(num_classes=3, num_conv_layers=5, in_channels=3, hidden_channels=64)

model = CustomCNN(num_classes=3)
criterion = nn.CrossEntropyLoss()

train_dataset0 = CustomCSVDataset(root_dir='datasets/Merge_fold_0713/ByNum_spline3', transform=ToTensorAndNormalize(), fixed_length=600, included_classes=['a1t', 'a7t','a8t'])
train_dataset1 = CustomCSVDataset(root_dir='datasets/Merge_fold_0713/ByNum_spline4', transform=ToTensorAndNormalize(), fixed_length=600, included_classes=['a1t', 'a7t','a8t'])
train_dataset2 = CustomCSVDataset(root_dir='datasets/Merge_fold_0713/ByNum_spline5', transform=ToTensorAndNormalize(), fixed_length=600, included_classes=['a1t', 'a7t','a8t'])


finetune_dataset = CustomCSVDataset(root_dir='datasets/USC-HAD-clean', transform=ToTensorAndNormalize(), fixed_length=600, included_classes=['a1t', 'a7t','a8t'])
test_dataset = CustomCSVDataset(root_dir='datasets/USC-HAD-clean', transform=ToTensorAndNormalize(), fixed_length=600, included_classes=['a1t', 'a7t','a8t'])


ori_train_dataset = CustomCSVDataset(root_dir='datasets/Merge_fold_0720/ByNum_spline3', transform=ToTensorAndNormalize(), fixed_length=600, included_classes=['a1t', 'a7t','a8t'])

# Create data loaders
train_dataloaderO = DataLoader(train_dataset0, batch_size=8, shuffle=True)
train_dataloader1 = DataLoader(train_dataset1, batch_size=8, shuffle=True)
train_dataloader2 = DataLoader(train_dataset2, batch_size=8, shuffle=True)
finetune_dataloader = DataLoader(finetune_dataset, batch_size=8, shuffle=True)


dataset1 = train_dataloaderO.dataset


test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False)

ori_dataloader = DataLoader(ori_train_dataset, batch_size=8, shuffle=True)

train_ratio = 0.6
test_ratio = 0.4

total_length = len(finetune_dataset)
train_length = int(total_length * train_ratio)
test_length = total_length - train_length

finetune_train_dataset, finetune_test_dataset = random_split(finetune_dataset, [train_length, test_length])

finetune_train_dataloader = DataLoader(finetune_train_dataset, batch_size=8, shuffle=True)


dataset2 = finetune_train_dataloader.dataset
finetune_test_dataloader = DataLoader(finetune_test_dataset, batch_size=8, shuffle=False)


combined_dataset = ConcatDataset([dataset1, dataset2])

# 创建新的 DataLoader
combined_dataloader = DataLoader(combined_dataset, batch_size=8, shuffle=True)
# 
# # Training loop
# num_epochs = 10
# for epoch in range(num_epochs):
#     for inputs, labels in dataloader:
#         # Zero the parameter gradients
#         optimizer.zero_grad()
# 
#         # Forward pass
#         outputs = model(inputs)
#         loss = criterion(outputs, labels)
# 
#         # Backward pass and optimize
#         loss.backward()
#         optimizer.step()
# 
#     print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}")


In [6]:

def validate(model, dataloader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    model.train()
    return correct / total

optimizer = optim.AdamW(model.parameters(), lr=0.001)

model.train()
num_epochs = 40
best_accuracy = 0.0
# 
# best_model_path = 'virtual_train_model.pth'
# model_path = os.path.join('Model', best_model_path)
# for epoch in range(num_epochs):
#     # for inputs, labels in train_dataloaderO:
#     for inputs, labels in combined_dataloader:
#     # for inputs, labels in finetune_dataloader:
#     #     print(inputs.shape)
#         optimizer.zero_grad()
#         outputs = model(inputs)
#         # print(outputs, labels)
#         loss = criterion(outputs, labels)
#         loss.backward()
#         optimizer.step()
# 
#     train_accuracy = validate(model, combined_dataloader)
#     test_accuracy = validate(model, finetune_test_dataloader)
#     print(f"0Pretraining Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}, Train Accuracy: {train_accuracy:.4f}, Test Accuracy: {test_accuracy:.4f}")
# 
#     if test_accuracy > best_accuracy:
#         best_accuracy = test_accuracy
#         torch.save(model.state_dict(), model_path)
#         print(f"New best model saved with accuracy: {best_accuracy:.4f}")


# model.load_state_dict(torch.load(model_path))
# # optimizer = optim.AdamW(model.fc.parameters(), lr=0.001)
# num_epochs = 20

# best_accuracy = 0.0
# for epoch in range(num_epochs):
#     for inputs, labels in ori_dataloader:
#     # for inputs, labels in finetune_dataloader:
#         optimizer.zero_grad()
#         outputs = model(inputs)
#         # print(outputs, labels)
#         loss = criterion(outputs, labels)
#         loss.backward()
#         optimizer.step()
# 
#     train_accuracy = validate(model, ori_dataloader)
#     test_accuracy = validate(model, finetune_test_dataloader)
#     print(f"1Pretraining Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}, Train Accuracy: {train_accuracy:.4f}, Test Accuracy: {test_accuracy:.4f}")
#     
#     if test_accuracy > best_accuracy:
#         best_accuracy = test_accuracy
#         torch.save(model.state_dict(), model_path)
#         print(f"New best model saved with accuracy: {best_accuracy:.4f}")

#
# 

# best_accuracy = 0.0
# best_model_path = 'ori_data_model.pth'
# model_path = os.path.join('Model', best_model_path)
# # num_epochs = 10
# for epoch in range(num_epochs):
#     for inputs, labels in ori_dataloader:
#     # for inputs, labels in finetune_dataloader:
#         optimizer.zero_grad()
#         outputs = model(inputs)
#         # print(outputs, labels)
#         loss = criterion(outputs, labels)
#         loss.backward()
#         optimizer.step()
# 
#     train_accuracy = validate(model, ori_dataloader)
#     test_accuracy = validate(model, finetune_test_dataloader)
#     print(f"Ori Data Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}, Train Accuracy: {train_accuracy:.4f}, Test Accuracy: {test_accuracy:.4f}")
# 
#     if test_accuracy > best_accuracy:
#         best_accuracy = test_accuracy
#         torch.save(model.state_dict(), model_path)
#         print(f"New best model saved with accuracy: {best_accuracy:.4f}")

# model.load_state_dict(torch.load(model_path))
# 
# for name, param in model.named_parameters():  # 查看可优化的参数有哪些
#     # if param.requires_grad:
#     # if 'conv_layers.0' in name:
#     #     print(name)
#     #     continue
#     if 'conv_layers.3' in name:
#         print(name)
#         continue
#     if 'fc' in name:
#         print(name)
#         continue
#     param.requires_grad = False
# for name, param in model.named_parameters(): 
#     if param.requires_grad:
#         print(name)

model_name = 'finetune.pth'
model_path = os.path.join('Model', model_name)
# optimizer = optim.AdamW(model.fc.parameters(), lr=0.001)
# # num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    for inputs, labels in finetune_train_dataloader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    train_accuracy = validate(model, finetune_test_dataloader)
    test_accuracy = validate(model, finetune_test_dataloader)
    print(f"Fine-tuning Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}, Train Accuracy: {train_accuracy:.4f}, Test Accuracy: {test_accuracy:.4f}")

    if test_accuracy > best_accuracy:
        best_accuracy = test_accuracy
        torch.save(model.state_dict(), model_path)
        print(f"New best model saved with accuracy: {best_accuracy:.4f}")


# torch.save(model.state_dict(), model_path)

  sample = torch.tensor(sample, dtype=torch.float32).permute(1, 0)


Fine-tuning Epoch 1/40, Loss: 0.3905, Train Accuracy: 0.8095, Test Accuracy: 0.8095
New best model saved with accuracy: 0.8095
Fine-tuning Epoch 2/40, Loss: 0.0401, Train Accuracy: 0.9048, Test Accuracy: 0.9048
New best model saved with accuracy: 0.9048
Fine-tuning Epoch 3/40, Loss: 0.7020, Train Accuracy: 0.8095, Test Accuracy: 0.8095
Fine-tuning Epoch 4/40, Loss: 0.2140, Train Accuracy: 0.9524, Test Accuracy: 0.9524
New best model saved with accuracy: 0.9524
Fine-tuning Epoch 5/40, Loss: 0.0030, Train Accuracy: 0.9762, Test Accuracy: 0.9762
New best model saved with accuracy: 0.9762
Fine-tuning Epoch 6/40, Loss: 0.0015, Train Accuracy: 0.9762, Test Accuracy: 0.9762
Fine-tuning Epoch 7/40, Loss: 0.0088, Train Accuracy: 0.9762, Test Accuracy: 0.9762
Fine-tuning Epoch 8/40, Loss: 0.0025, Train Accuracy: 0.9762, Test Accuracy: 0.9762
Fine-tuning Epoch 9/40, Loss: 0.0024, Train Accuracy: 0.9762, Test Accuracy: 0.9762
Fine-tuning Epoch 10/40, Loss: 0.0162, Train Accuracy: 0.9762, Test Accu

In [7]:
# model = CustomCNN(num_classes=3)
# model.load_state_dict(torch.load(model_path))
# model.eval()
# 
# num_epochs = 5
# with torch.no_grad():
#     for epoch in range(num_epochs):
#         for inputs, labels in finetune_test_dataset:
#             optimizer.zero_grad()
#             outputs = model(inputs)
#             
#         train_accuracy = validate(model, finetune_test_dataset)
#         test_accuracy = validate(model, test_dataloader)
#         print(f"Fine-tuning Epoch {epoch + 1}/{num_epochs}, Loss: {train_accuracy:.4f}, Train Accuracy: {train_accuracy:.4f}, Test Accuracy: {test_accuracy:.4f}")