In [8]:
import numpy as np
import torch
import os
import random
from torch.utils.data import Dataset
import torch.nn.functional as F

random.seed(1)
rmb_label = {"ASD": 0, "TD": 1}      # 设置标签

class ABIDEtxtDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        """
        ABIDE_db的Dataset
        :param data_dir: str, 数据集所在路径
        :param transform: torch.transform，数据预处理
        """
        self.label_name = {"ASD": 0, "TD": 1}
        self.data_info = self.get_txt_info(data_dir)  # data_info存储所有txt路径和标签，在DataLoader中通过index读取样本
        self.transform = transform
        print('Number of samples:', len(self.data_info))
        print('Sample info:', self.data_info[0])
        
    def __getitem__(self, index):
        path_txt, label = self.data_info[index]
        print(path_txt)
        txt_data = np.loadtxt(path_txt)
        txt_data_tensor = torch.from_numpy(txt_data)
        print(txt_data_tensor.shape)
        cor = np.corrcoef(txt_data.T)
        # 将对称矩阵转换成上三角矩阵
        tri_matrix = np.triu(cor)
        # 转换为上三角矩阵
        upper_triangular = np.triu(tri_matrix,1)
        idx = np.nonzero(upper_triangular)
        # 将非零数据保存在1维张量中
        non_zero_values = upper_triangular[idx]
        # 将cor转换为Tensor
        cor = torch.from_numpy(non_zero_values).float()
        
        if self.transform is not None:
            txt = self.transform(txt)   # 在这里做transform，转为tensor等等

        return cor, label

    def __len__(self):
        return len(self.data_info)

    @staticmethod
    def get_txt_info(data_dir):
        data_info = list()
        for root, dirs, _ in os.walk(data_dir):
            # 遍历类别
            for sub_dir in dirs:
                txt_names = os.listdir(os.path.join(root, sub_dir))
                txt_names = list(filter(lambda x: x.endswith('.1D'), txt_names))

                # 遍历txt
                for i in range(len(txt_names)):
                    txt_name = txt_names[i]
                    path_txt = os.path.join(root, sub_dir, txt_name)
                    label = rmb_label[sub_dir]
                    data_info.append((path_txt, int(label)))

        return data_info

In [2]:
class SelfAttention(nn.Module):
    def __init__(self, input_channels):
        super(SelfAttention, self).__init__()
        self.query = nn.Conv2d(input_channels, input_channels, kernel_size=1)
        self.key = nn.Conv2d(input_channels, input_channels, kernel_size=1)
        self.value = nn.Conv2d(input_channels, input_channels, kernel_size=1)

    def forward(self, x):
        q = self.query(x)
        k = self.key(x)
        v = self.value(x)

        b, c, h, w = q.size()
        q = q.view(b, c, -1)
        k = k.view(b, c, -1)
        v = v.view(b, c, -1)

        attention_weights = F.softmax(torch.bmm(q.permute(0, 2, 1), k), dim=-1)
        out = torch.bmm(v, attention_weights.permute(0, 2, 1))
        out = out.view(b, c, h, w)
        
        return out
class MyCNN(nn.Module):
    def __init__(self):
        super(MyCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.relu1 = nn.ReLU(inplace=True)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.relu2 = nn.ReLU(inplace=True)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.relu3 = nn.ReLU(inplace=True)
        self.attention = SelfAttention(64)  # Add the attention layer
        self.fc1 = nn.Linear(64 * 116 * 116, 512)
        self.relu4 = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(512, 2)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.conv3(x)
        x = self.relu3(x)
        x = self.attention(x)  # Apply the attention layer
        x = x.view(-1, 64 * 116 * 116)
        x = self.fc1(x)
        x = self.relu4(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x


NameError: name 'nn' is not defined

In [None]:
class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNNModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True, dropout=0.5)
        self.dropout = nn.Dropout(p=0.5)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.prelu = nn.PReLU()

    def forward(self, x):
        # 初始化隐藏层状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        # 前向传播 RNN
        out, _ = self.rnn(x, h0)
        out = self.dropout(out)
        # 将输出结果展平
        out = out[:, -1, :]
        # 全连接层
        out = self.fc(out)
        return out


In [5]:
class SAE(nn.Module):
    def __init__(self, input_size, hidden_sizes):
        super(SAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_size, hidden_sizes[0]),
            nn.ReLU(),
            nn.Linear(hidden_sizes[0], hidden_sizes[1]),
            nn.ReLU(),
            nn.Linear(hidden_sizes[1], hidden_sizes[2]),
            nn.ReLU(),
            nn.Linear(hidden_sizes[2], hidden_sizes[3]),
            nn.ReLU(),
            nn.Linear(hidden_sizes[3], hidden_sizes[4])
        )
        self.decoder = nn.Sequential(
            nn.Linear(hidden_sizes[4], hidden_sizes[3]),
            nn.ReLU(),
            nn.Linear(hidden_sizes[3], hidden_sizes[2]),
            nn.ReLU(),
            nn.Linear(hidden_sizes[2], hidden_sizes[1]),
            nn.ReLU(),
            nn.Linear(hidden_sizes[1], hidden_sizes[0]),
            nn.ReLU(),
            nn.Linear(hidden_sizes[0], input_size)
        )
        self.fc = nn.Linear(hidden_sizes[4], 2)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.encoder(x)
        encoded = x
        x = self.decoder(x)
        x = self.fc(x)
        x = self.sigmoid(x)
        return x

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

train_losses = []
train_accs = []
valid_losses = []
valid_accs = []
split_dir = os.path.join("/Users/yangzongxian/Study_profiles/ABIDE_Data/ABIDE_roiaal/aal_split")  
train_dir = os.path.join(split_dir, "train")
valid_dir = os.path.join(split_dir, "valid")
test_dir  = os.path.join(split_dir, "test") 
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')





# 定义超参数
input_size = 116
batch_size = 128
learning_rate = 0.001
num_epochs = 50
hidden_size = [512, 256, 128, 64, 32]
num_layers = 8
num_classes = 2

def collate_fn(batch):
    data = [d[0] for d in batch]
    label = [d[1] for d in batch]
    # 为数据添加一个额外的维度，以便将其转换为形状为 [batch_size, 1, height, width] 的张量
    data = [d.unsqueeze(0) for d in data]
    # 将多个数据合并为一个batch
    data = torch.stack(data, dim=0)
    label = torch.tensor(label)
    return data, label






# 实例化数据集和数据加载器
train_dataset = ABIDEtxtDataset(train_dir, transform=None)
valid_dataset = ABIDEtxtDataset(valid_dir, transform=None)
#CNN 图像模型dataloader
# train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,collate_fn = collate_fn)
# valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False,collate_fn = collate_fn)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

# 实例化模型、损失函数和优化器
# model = SqueezeNet2D(2)
# model = MyCNN()
# model = RNNModel(input_size,hidden_size,num_layers,num_classes)
model = SAE(input_size,hidden_size)


model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

print('训练')
# 训练和验证
for epoch in range(num_epochs):
    # 训练阶段
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for i, (data, label) in enumerate(train_loader):
        data, label = data.to(device), label.to(device)
        optimizer.zero_grad()
        outputs = model(data)
        loss = criterion(outputs, label)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * data.size(0)
        _, predicted = torch.max(outputs.data, 1)
        total += label.size(0)
        correct += (predicted == label).sum().item()

    train_loss = running_loss / len(train_dataset)
    train_acc = correct / total
    train_losses.append(train_loss)
    train_accs.append(train_acc)

    # 验证阶段
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        total = 0
        correct = 0
        for data, label in valid_loader:
            data, label = data.to(device), label.to(device)
            outputs = model(data)
            loss = criterion(outputs, label)
            running_loss += loss.item() * data.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += label.size(0)
            correct += (predicted == label).sum().item()

    valid_loss = running_loss / len(valid_dataset)
    valid_acc = correct / total
    valid_losses.append(valid_loss)
    valid_accs.append(valid_acc)

    # 打印训练过程中的指标
    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}, Validation Loss: {valid_loss:.4f}, Validation Accuracy: {valid_acc:.4f}")


# 绘制训练和验证指标的图表
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(train_losses, label='Train Loss')
ax.plot(train_accs, label='Train Accuracy')
ax.plot(valid_losses, label='Validation Loss')
ax.plot(valid_accs, label='Validation Accuracy')
ax.legend()
ax.set_xlabel('Epoch')
ax.set_ylabel('Value')
ax.set_title('Training and Validation Metrics')
plt.show()


Number of samples: 705
Sample info: ('/Users/yangzongxian/Study_profiles/ABIDE_Data/ABIDE_roiaal/aal_split/train/ASD/USM_0050497_rois_aal.1D', 0)
Number of samples: 89
Sample info: ('/Users/yangzongxian/Study_profiles/ABIDE_Data/ABIDE_roiaal/aal_split/valid/ASD/UM_1_0050277_rois_aal.1D', 0)
训练


  c /= stddev[:, None]
  c /= stddev[None, :]


RuntimeError: mat1 and mat2 shapes cannot be multiplied (128x6670 and 116x512)