In [None]:
import os
import sys

local_path = os.getcwd()
# 设置工作目录为项目的主目录
os.chdir(os.path.join(local_path, "../"))  # 使用相对路径将工作目录切换到 project 文件夹
print("Current working directory:", os.getcwd())
project_path = os.path.abspath(os.path.join(local_path, "../../"))
sys.path.append(project_path)   #将模块查找路径切换

In [None]:
import torch
# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 设置随机种子

In [None]:
import numpy as np
import random

def set_seed(seed):
    # 设置 Python 内置 random 库的随机种子
    random.seed(seed)
    
    # 设置 NumPy 的随机种子
    np.random.seed(seed)
    
    # 设置 PyTorch 的随机种子
    torch.manual_seed(seed)
    
    # 如果使用 GPU 进行训练，则需设置以下两个以保证完全的可重复性
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  # 如果使用多块 GPU
    
    # 设置 CuDNN 后端以确保结果一致性
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# 使用示例
set_seed(42)  # 42 是一个示例种子数，您可以根据需求更改

# 加载数据集

## MNIST

In [None]:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# 定义数据变换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# 下载并加载训练数据集
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# 下载并加载测试数据集
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 检查数据加载是否成功
data_iter = iter(train_loader)
images, labels = next(data_iter)
print(f"Images batch shape: {images.shape}")
print(f"Labels batch shape: {labels.shape}")

In [None]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import numpy as np

# 定义数据变换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# 加载完整的 MNIST 数据集
full_train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
# 下载并加载测试数据集
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 初始化每个类别的索引列表
class_indices = {i: [] for i in range(10)}

# 遍历数据集，记录每个类别的索引
for idx, (_, label) in enumerate(full_train_dataset):
    if len(class_indices[label]) < 100:  # 每个类别最多选取 100 个样本
        class_indices[label].append(idx)
    if all(len(class_indices[i]) == 100 for i in range(10)):
        break  # 当每个类别都有 100 个样本时停止

# 合并所有类别的索引，形成所需的子集
selected_indices = [idx for indices in class_indices.values() for idx in indices]

# 创建自定义的训练数据集
small_train_dataset = Subset(full_train_dataset, selected_indices)
train_loader = DataLoader(small_train_dataset, batch_size=64, shuffle=True)

# 检查数据加载是否成功
data_iter = iter(train_loader)
images, labels = next(data_iter)
print(f"Images batch shape: {images.shape}")
print(f"Labels batch shape: {labels.shape}")
print(f"Sample labels: {labels}")


## FashionMNIST

In [None]:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# 定义数据变换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# 下载并加载训练数据集
train_dataset = datasets.FashionMNIST(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# 下载并加载测试数据集
test_dataset = datasets.FashionMNIST(root='./data', train=False, transform=transform, download=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 检查数据加载是否成功
data_iter = iter(train_loader)
images, labels = next(data_iter)
print(f"Images batch shape: {images.shape}")
print(f"Labels batch shape: {labels.shape}")


## CIFAR-10

In [None]:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# 定义数据变换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # 对 RGB 三个通道分别进行归一化
])

# 下载并加载训练数据集
train_dataset = datasets.CIFAR10(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# 下载并加载测试数据集
test_dataset = datasets.CIFAR10(root='./data', train=False, transform=transform, download=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 检查数据加载是否成功
data_iter = iter(train_loader)
images, labels = next(data_iter)
print(f"Images batch shape: {images.shape}")
print(f"Labels batch shape: {labels.shape}")


## KMNIST

In [None]:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# 定义数据变换
transform = transforms.Compose([
    transforms.ToTensor(),
])

# 下载并加载训练数据集
train_dataset = datasets.KMNIST(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# 下载并加载测试数据集
test_dataset = datasets.KMNIST(root='./data', train=False, transform=transform, download=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 检查数据加载是否成功
data_iter = iter(train_loader)
images, labels = next(data_iter)
print(f"Images batch shape: {images.shape}")
print(f"Labels batch shape: {labels.shape}")

## EMNIST Balanced
存在一些问题

In [None]:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# 定义数据变换
transform = transforms.Compose([
    transforms.ToTensor(),
])

# 下载并加载训练数据集
train_dataset = datasets.EMNIST(root='./data', split='balanced', train=True, transform=transforms.ToTensor(), download=True)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# 下载并加载测试数据集
test_dataset = datasets.EMNIST(root='./data', split='balanced', train=False, transform=transforms.ToTensor(), download=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 检查数据加载是否成功
data_iter = iter(train_loader)
images, labels = next(data_iter)
print(f"Images batch shape: {images.shape}")
print(f"Labels batch shape: {labels.shape}")

## 通用的数据加载器

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

def get_dataloader(dataset_name, batch_size=64, root='./data', train=True, selected_classes=None, class_counts=None):
    """
    根据 dataset_name 加载不同的数据集，并应用对应的 Normalize 参数。
    
    参数:
        dataset_name (str): 数据集名称，可选 'MNIST', 'FashionMNIST', 'KMNIST', 'CIFAR10', 'CIFAR100'。
        batch_size (int): 数据加载的 batch size。
        root (str): 数据集下载或加载的路径。
        train (bool): 是否加载训练集，False 表示加载测试集。
        selected_classes (list, optional): 需要选择的类别列表，如果为 None，则选择所有类别。
        class_counts (list, optional): 每个类别选择的样本数量，和 selected_classes 一一对应。如果为 None，则选择每个类别的全部样本。
        
    返回:
        DataLoader: 配置好的数据加载器。
    """
    # 定义 Normalize 参数
    normalize_params = {
        'MNIST': ((0.1307,), (0.3081,)),
        'FashionMNIST': ((0.2860,), (0.3530,)),
        'KMNIST': ((0.1904,), (0.3475,)),
        'CIFAR10': ((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        'CIFAR100': ((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761))
    }

    # 检查指定的数据集名称是否在支持的列表中
    if dataset_name not in normalize_params:
        raise ValueError(f"Unsupported dataset name: {dataset_name}")

    # 获取对应的 Normalize 参数
    mean, std = normalize_params[dataset_name]

    # 定义数据变换
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ])

    # 根据 dataset_name 选择数据集
    if dataset_name == 'MNIST':
        dataset = datasets.MNIST(root=root, train=train, transform=transform, download=True)
    elif dataset_name == 'FashionMNIST':
        dataset = datasets.FashionMNIST(root=root, train=train, transform=transform, download=True)
    elif dataset_name == 'KMNIST':
        dataset = datasets.KMNIST(root=root, train=train, transform=transform, download=True)
    elif dataset_name == 'CIFAR10':
        dataset = datasets.CIFAR10(root=root, train=train, transform=transform, download=True)
    elif dataset_name == 'CIFAR100':
        dataset = datasets.CIFAR100(root=root, train=train, transform=transform, download=True)
    else:
        raise ValueError(f"Unsupported dataset name: {dataset_name}")

    # 筛选指定类别和数量的样本
    if selected_classes is not None and class_counts is not None:
        # 创建一个空的列表，用于存储符合条件的索引
        selected_indices = []
        class_counts_dict = {cls: count for cls, count in zip(selected_classes, class_counts)}

        # 遍历数据集，筛选出符合条件的样本
        for idx, (_, label) in enumerate(dataset):
            if label in selected_classes and class_counts_dict[label] > 0:
                selected_indices.append(idx)
                class_counts_dict[label] -= 1

            # 如果每个类别的数据量已经达到要求，跳过
            if all(count == 0 for count in class_counts_dict.values()):
                break

        # 创建一个 Subset，返回选择的样本
        dataset = Subset(dataset, selected_indices)

    # 如果没有指定类别，且需要筛选每个类别的数量
    elif selected_classes is None and class_counts is not None:
        # 创建一个空的列表，用于存储符合条件的索引
        selected_indices = []
        class_counts_dict = {i: count for i, count in zip(range(10), class_counts)}  # 默认为10类（MNIST，CIFAR-10，EMNIST等）

        # 遍历数据集，筛选出符合条件的样本
        for idx, (_, label) in enumerate(dataset):
            if class_counts_dict[label] > 0:
                selected_indices.append(idx)
                class_counts_dict[label] -= 1

            # 如果所有类别的数据量已经达到要求，跳过
            if all(count == 0 for count in class_counts_dict.values()):
                break

        # 创建一个 Subset，返回选择的样本
        dataset = Subset(dataset, selected_indices)

    # 创建 DataLoader
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=train)
    return dataloader

selected_classes = [0, 1]
class_counts = [100, 100]

# 假设我们选择 CIFAR-10 数据集，并且不指定类别，但是限制每种目标的数目
#class_counts = [100] * 10

train_loader = get_dataloader('CIFAR10', batch_size=64, train=True, selected_classes=selected_classes, class_counts=class_counts)
test_loader = get_dataloader('CIFAR10', batch_size=64, train=False)

# 检查数据加载是否成功
data_iter = iter(train_loader)
images, labels = next(data_iter)
print(f"Images batch shape: {images.shape}")
print(f"Labels batch shape: {labels.shape}")

# 定义FBM的单层网络

In [None]:
import torch.nn as nn
# 定义单层神经网络模型
class SingleLayerNN(nn.Module):
    def __init__(self, input_size, output_size):
        super(SingleLayerNN, self).__init__()
        self.input_size = input_size
        self.linear = nn.Linear(input_size, output_size)

    def forward(self, x):
        x = x.view(-1, self.input_size)  # 展平图像
        out = self.linear(x)
        out = (torch.tanh(out) + 1)/2.0
        return out

# 定义Loss函数

In [None]:
class FBMLoss(nn.Module):
    def __init__(self, input_size, lam, df, alpha):
        super(FBMLoss, self).__init__()
        self.len_out = input_size
        self.lam =lam
        self.d_f = df
        self.alpha = alpha

    def forward(self, inputs, targets, weights):
        normal = 0.5 * self.lam / self.len_out * torch.norm(weights, p=2) ** 2
        #print("normal", normal)
        loss = fast_FermiBose(inputs, targets, self.d_f, self.alpha) + normal
        #print("diff", fast_FermiBose(inputs, targets, self.d_f) - FermiBose(inputs, targets, self.d_f))
        return loss


## 通过向量化操作加速

In [None]:
def fast_FermiBose(sample, labels, d_f, alpha):
    batch, hidden1_features = sample.shape
    labels = F.one_hot(labels).float()
    # 使用广播计算每对样本之间的 L2 距离的平方和
    sample_diff = sample.unsqueeze(1) - sample.unsqueeze(0)  # 扩展维度并相减，得到 (batch, batch, outdim)
    D_matrix = torch.sum(sample_diff**2, dim=2)/hidden1_features  # 对最后一个维度求和，得到 (batch, batch) 矩阵

    # 计算phi(.)
    phi_matrix = F.relu(d_f-D_matrix)

    # 计算标签矩阵的乘积，结果是 (batch_size, batch_size)
    label_matrix = labels @ labels.T

    # 计算bose_loss
    bose_loss = torch.mul(D_matrix, label_matrix)

    # 计算fermi_loss
    fermi_loss = torch.mul(phi_matrix, 1-label_matrix)

    # 总loss
    loss_matrix = bose_loss + alpha*fermi_loss

    # 如果只需要上三角部分（排除重复项和自相似项）
    loss_matrix = torch.triu(loss_matrix, diagonal=1)
    #cal_count = (loss_matrix > 0.0).sum().item()+1  #统计真正计算的fermi-bose对数量

    # 将结果进行 sum（可以选择性地只关注上三角部分的和）
    total_loss = loss_matrix.sum()

    #return total_loss/cal_count
    return 2*total_loss/batch**2

## 通过循环直观理解

In [None]:
#用循环写这个
def FermiBose(sample, labels, d_f, alpha):
    batch, hidden1_features = sample.shape
    loss = 0
    distance = nn.PairwiseDistance(p=2,keepdim=True)
    ReLU = nn.ReLU()

    cal_count = 1

    for i in range(batch):
        for j in range(i+1, batch):
            D = distance(sample[i], sample[j]) ** 2 / hidden1_features
            if labels[i] == labels[j]:
                loss += D.sum()
                #print("bose", D)
                #if D > 0 : cal_count+=1
            else:
                loss += alpha * ReLU(d_f-D).sum()
                #print("fermi", D)
                #if ReLU(d_f-D).sum() > 0.0: 
                #    #print("dont, count", d_f, D, ReLU(d_f-D))
                #    cal_count+=1
            
    #return loss/cal_count
    return 2*loss/batch**2

# 训练FBM

In [None]:
import torch.optim as optim
import torch.nn.functional as F

# 定义超参数
input_size = 28 * 28  # MNIST图像大小是28x28
hidden_dim =1000
num_classes = 10      # MNIST有10个类别
learning_rate = 0.01
num_epochs = 20
batch_size = 64
alpha = 1.0
df = 0.02

# 实例化模型、定义损失函数和优化器
model = SingleLayerNN(input_size, hidden_dim).to(device)
criterion = FBMLoss(hidden_dim, 0.01, df, alpha)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

model.train()
# 训练模型
for epoch in range(num_epochs):
    for images, labels in train_loader:
        # 将图像和标签移动到 GPU 上
        images = images.view(-1, 28 * 28).to(device)  # 展平图像并转移到 GPU
        labels = labels.to(device)  # 标签移动到 GPU
        #labels_one_hot = F.one_hot(labels, num_classes=num_classes).float()
        
        # 前向传播
        outputs = model(images)
        #loss = criterion(outputs, labels_one_hot, model.linear.weight)
        loss = criterion(outputs, labels, model.linear.weight)
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# 定义最后的全连接层

In [None]:
import torch.nn as nn
# 定义单层神经网络模型
class MLP(nn.Module):
    def __init__(self, input_size, num_classes):
        super(MLP, self).__init__()
        self.input_size = input_size
        self.linear = nn.Linear(input_size, num_classes)

    def forward(self, x):
        x = x.view(-1, self.input_size)  # 展平图像
        out = self.linear(x)
        #out = F.softmax(out)
        return out

# 训练全连接层

In [None]:
model.eval()

# 定义损失函数和优化器
model2 = MLP(hidden_dim, num_classes).to(device)
criterion2 = nn.CrossEntropyLoss()  # 使用交叉熵损失
optimizer = optim.Adam(model2.parameters(), lr=learning_rate)  # 使用随机梯度下降优化器

model2.train()
# 训练模型
epochs = 30
for epoch in range(epochs):
    for images, labels in train_loader:
        # 将图像展平为一维向量，并将标签进行 one-hot 编码
        images = images.view(-1, 28 * 28).to(device)  # 展平图像
        labels_one_hot = F.one_hot(labels, num_classes=num_classes).float().to(device)  # 将标签转换为 one-hot 编码

        # 前向传播
        with torch.no_grad():
            deal_images = model(images)
        outputs = model2(deal_images)

        # 计算损失
        loss = criterion2(outputs, labels_one_hot)

        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

# 测试准确性

In [None]:
import torch

# 设置模型为评估模式
model.eval()
model2.eval()

# 准确率计数
correct = 0
total = 0

# 禁用梯度计算，加速测试过程
with torch.no_grad():
    for images, labels in test_loader:
        # 将数据加载到 GPU
        images = images.view(-1, 28 * 28).to(device)
        labels = labels.to(device)

        # 前向传播
        outputs = model(images)
        outputs = model2(outputs)
        
        # 获取预测结果
        _, predicted = torch.max(outputs, 1)
        
        # 更新计数
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

# 计算准确率
accuracy = 100 * correct / total
print(f'Accuracy on the test dataset: {accuracy:.2f}%')