**工作记录 - CIFAR100InstanceSample函数调试与自定义数据集读取**

**日期：** 2024年3月29日

**目标：**

调试CIFAR100InstanceSample函数，使其能够成功读取并处理自定义的数据集。

**工作内容：**

1. **理解函数原理**
   - 查阅CIFAR100InstanceSample函数代码，理解其读取和处理CIFAR-100数据集的方式。

2. **准备自定义数据集**
   - 整理自定义数据集的文件夹结构，确保与CIFAR-100格式一致。
   - 检查数据集中的图像文件，确保格式正确。

3. **修改函数以适应自定义数据集**
   - 修改数据集的路径为自定义数据集的存放位置。
   - 调整数据加载逻辑，以匹配自定义数据集的格式。
   - 更新数据预处理步骤，以符合自定义数据集的特点。

4. **测试与调试**
   - 运行修改后的CIFAR100InstanceSample函数，测试读取自定义数据集的功能。
   - 根据测试结果调试代码，解决读取过程中遇到的问题。

**备注：**

- 调试过程中，主要关注数据集的路径、格式和加载逻辑。
- 后续可进一步优化数据预处理步骤，提高数据处理效率。

In [1]:
from __future__ import print_function

import os
import socket
import numpy as np
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from PIL import Image

"""
mean = {
    'cifar100': (0.5071, 0.4867, 0.4408),
}

std = {
    'cifar100': (0.2675, 0.2565, 0.2761),
}
"""


def get_data_folder():
    """
    return server-dependent path to store the data
    """
    hostname = socket.gethostname()
    if hostname.startswith('visiongpu'):
        data_folder = '/data/vision/phillipi/rep-learn/datasets'
    elif hostname.startswith('yonglong-home'):
        data_folder = '/home/yonglong/Data/data'
    else:
        data_folder = './data/'

    if not os.path.isdir(data_folder):
        os.makedirs(data_folder)

    return data_folder


class CIFAR100Instance(datasets.CIFAR100):
    """CIFAR100Instance Dataset.
    """
    def __getitem__(self, index):
        img, target = super().__getitem__(index)
        return img, target, index


def get_cifar100_dataloaders(batch_size=128, num_workers=8, is_instance=False):
    """
    cifar 100
    """
    data_folder = get_data_folder()

    train_transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761)),
    ])
    test_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761)),
    ])

    if is_instance:
        train_set = CIFAR100Instance(root=data_folder,
                                     download=True,
                                     train=True,
                                     transform=train_transform)
        n_data = len(train_set)
    else:
        train_set = datasets.CIFAR100(root=data_folder,
                                      download=True,
                                      train=True,
                                      transform=train_transform)
    train_loader = DataLoader(train_set,
                              batch_size=batch_size,
                              shuffle=True,
                              num_workers=num_workers)

    test_set = datasets.CIFAR100(root=data_folder,
                                 download=True,
                                 train=False,
                                 transform=test_transform)
    test_loader = DataLoader(test_set,
                             batch_size=int(batch_size/2),
                             shuffle=False,
                             num_workers=int(num_workers/2))

    if is_instance:
        return train_loader, test_loader, n_data
    else:
        return train_loader, test_loader


class CIFAR100InstanceSample(datasets.CIFAR100):
    def __init__(self, root, train=True,
                 transform=None, target_transform=None,
                 download=False, k=4096, mode='exact', is_sample=True, percent=1.0):
        super().__init__(root=root, train=train, download=download,
                         transform=transform, target_transform=target_transform)
        self.k = k
        self.mode = mode
        self.is_sample = is_sample

        num_classes = 100
        num_samples = len(self.data)
        label = self.targets

        self.cls_positive = [[] for i in range(num_classes)]
        for i in range(num_samples):
            self.cls_positive[label[i]].append(i)

        self.cls_negative = [[] for i in range(num_classes)]
        for i in range(num_classes):
            for j in range(num_classes):
                if j == i:
                    continue
                self.cls_negative[i].extend(self.cls_positive[j])

        self.cls_positive = [np.asarray(self.cls_positive[i]) for i in range(num_classes)]
        self.cls_negative = [np.asarray(self.cls_negative[i]) for i in range(num_classes)]

        if 0 < percent < 1:
            n = int(len(self.cls_negative[0]) * percent)
            self.cls_negative = [np.random.permutation(self.cls_negative[i])[0:n] for i in range(num_classes)]

        self.cls_positive = np.asarray(self.cls_positive)
        self.cls_negative = np.asarray(self.cls_negative)

    def __getitem__(self, index):
        img, target = self.data[index], self.targets[index]

        img = Image.fromarray(img)  # 将图片从数组转换为PIL图像

        if self.transform is not None:
            img = self.transform(img)

        if self.target_transform is not None:
            target = self.target_transform(target)

        if not self.is_sample:
            return img, target, index  # 直接返回
        else:
            if self.mode == 'exact':
                pos_idx = index
            elif self.mode == 'relax':
                pos_idx = np.random.choice(self.cls_positive[target], 1)
                pos_idx = pos_idx[0]
            else:
                raise NotImplementedError(self.mode)

            replace = True if self.k > len(self.cls_negative[target]) else False
            neg_idx = np.random.choice(self.cls_negative[target], self.k, replace=replace)
            sample_idx = np.hstack((np.asarray([pos_idx]), neg_idx))
            return img, target, index, sample_idx


def get_cifar100_dataloaders_sample(batch_size=128, num_workers=8, k=4096, mode='exact',
                                    is_sample=True, percent=1.0):
    data_folder = get_data_folder()
    # 用于下面的transform参数，这里暂时不使用传统增强技术
    train_transform = transforms.Compose([
        # transforms.RandomCrop(32, padding=4),
        # transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761)),
    ])
    test_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761)),
    ])
    # 原：CIFAR100InstanceSample —> 需要修改成自己的版本
    train_set = CIFAR100InstanceSample(root=data_folder,
                                       download=True,
                                       train=True,
                                       transform=train_transform,
                                       k=k,
                                       mode=mode,
                                       is_sample=is_sample,
                                       percent=percent)
    n_data = len(train_set)
    # 查看n_data情况
    # print("n_data",n_data)
    train_loader = DataLoader(train_set,
                              batch_size=batch_size,
                              shuffle=True,
                              num_workers=num_workers)

    test_set = datasets.CIFAR100(root=data_folder,
                                 download=True,
                                 train=False,
                                 transform=test_transform)
    test_loader = DataLoader(test_set,
                             batch_size=int(batch_size/2),
                             shuffle=False,
                             num_workers=int(num_workers/2))

    return train_loader, test_loader, n_data


In [2]:
import os
import numpy as np
from PIL import Image
from torch.utils.data import Dataset

class CustomDatasetWithSampling(Dataset):
    def __init__(self, root_dir, k=4096, mode='exact', is_sample=True, percent=1.0, transform=None):
        """
        Args:
            root_dir (string): 数据集目录路径。
            k (int): 负样本的采样数。
            mode (string): 正样本采样模式 ('exact'或'relax')。
            is_sample (bool): 是否进行样本采样。
            percent (float): 负样本采样百分比。
            transform (callable, optional): 应用于样本的可选变换。
        """
        self.root_dir = root_dir
        self.k = k
        self.mode = mode
        self.is_sample = is_sample
        self.transform = transform

        # 遍历数据集目录，收集所有图像的路径和相应的标签
        self.images = []
        self.labels = []
        for label in range(5):  # 假设有5个类别
            label_folder = os.path.join(self.root_dir, str(label))
            for img_file in os.listdir(label_folder):
                if img_file.endswith('.png') or img_file.endswith('.jpg'):
                    self.images.append(os.path.join(label_folder, img_file))
                    self.labels.append(label)

        # 创建正样本和负样本的索引
        self.cls_positive = [[] for _ in range(5)]
        self.cls_negative = [[] for _ in range(5)]
        for idx, label in enumerate(self.labels):
            self.cls_positive[label].append(idx)
            for other_label in range(5):
                if other_label != label:
                    self.cls_negative[label].append(idx)

        # 转换为numpy数组，以便于采样
        self.cls_positive = [np.array(self.cls_positive[i]) for i in range(5)]
        self.cls_negative = [np.array(self.cls_negative[i]) for i in range(5)]

        # 调整负样本的数量
        if 0 < percent < 1:
            for i in range(5):
                n = int(len(self.cls_negative[i]) * percent)
                self.cls_negative[i] = np.random.permutation(self.cls_negative[i])[:n]

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        img_path = self.images[index]
        image = Image.open(img_path).convert('L')  # 转换为单通道图像
        label = self.labels[index]

        if self.transform:
            image = self.transform(image)

        if not self.is_sample:
            return image, label, index

        # 根据mode选择正样本索引
        if self.mode == 'exact':
            pos_idx = index
        elif self.mode == 'relax':
            pos_idx = np.random.choice(self.cls_positive[label], 1)[0]
        else:
            raise NotImplementedError("Sampling mode not implemented.")

        # 选择负样本索引
        replace = self.k > len(self.cls_negative[label])
        neg_idx = np.random.choice(self.cls_negative[label], self.k, replace=replace)
        sample_idx = np.hstack((np.array([pos_idx]), neg_idx))

        return image, label, index, sample_idx


In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, activation='relu'):
        super(ConvBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        self.bn = nn.BatchNorm2d(out_channels)
        self.activation = activation

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        if self.activation == 'relu':
            return F.relu(x)
        elif self.activation == 'mish':
            return x * torch.tanh(F.softplus(x))  # Mish activation function
        elif self.activation == 'hard_swish':
            return x * F.relu6(x + 3) / 6  # Hard Swish activation function
        else:
            return x

class HierarchicalSplitBlock(nn.Module):
    def __init__(self, channels):
        super(HierarchicalSplitBlock, self).__init__()
        self.split_channels = channels // 2  # Assuming an equal split for simplicity
        
        # Doubling the paths or increasing the channel output
        self.path1 = ConvBlock(self.split_channels, self.split_channels * 2, 3, padding=1, activation='mish')
        self.path2 = ConvBlock(self.split_channels, self.split_channels * 2, 3, padding=1, activation='mish')

    def forward(self, x):
        # Split feature maps
        x1, x2 = x.chunk(2, dim=1)
        
        # Process through different paths
        x1 = self.path1(x1)
        x2 = self.path2(x2)
        
        # Concatenate results
        out = torch.cat([x1, x2], dim=1)
        return out

class Network(nn.Module):
    def __init__(self, num_classes):
        super(Network, self).__init__()
        self.initial_conv = ConvBlock(3, 64, 1)  # Adjust according to input channel size
        self.hierarchical_block = HierarchicalSplitBlock(64)
        self.final_conv = ConvBlock(128, 128, 1)  # Adjusted to maintain channel depth
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))  # Global Average Pooling
        self.classifier = nn.Linear(128, num_classes)  # Classifier layer

    def forward(self, x):
        x = self.initial_conv(x)
        x = self.hierarchical_block(x)
        x = self.final_conv(x)
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)  # Flatten the output for the classifier
        x = self.classifier(x)
        return x

# Example use of the network
num_classes = 10  # Assuming 10 classes for this example
model = Network(num_classes)
print(model)

# Example input tensor of shape (1, 3, 32, 32)
input_tensor = torch.rand(1, 3, 32, 32)

# Forward pass through the network
output = model(input_tensor)
print("Output shape:", output.shape)  # Should be [1, num_classes] indicating the class scores


Network(
  (initial_conv): ConvBlock(
    (conv): Conv2d(3, 64, kernel_size=(1, 1), stride=(1, 1))
    (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (hierarchical_block): HierarchicalSplitBlock(
    (path1): ConvBlock(
      (conv): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (path2): ConvBlock(
      (conv): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
  )
  (final_conv): ConvBlock(
    (conv): Conv2d(128, 128, kernel_size=(1, 1), stride=(1, 1))
    (bn): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (global_avg_pool): AdaptiveAvgPool2d(output_size=(1, 1))
  (classifier): Linear(in_features=128, out_features=10, bias=True)
)
Output shape: torch.Size([1, 10])


In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# SE注意力机制模块
class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super(SEBlock, self).__init__()
        self.fc1 = nn.Conv2d(channels, channels // reduction, kernel_size=1)
        self.fc2 = nn.Conv2d(channels // reduction, channels, kernel_size=1)

    def forward(self, x):
        out = x.mean(dim=(2, 3), keepdim=True)
        out = F.relu(self.fc1(out))
        out = torch.sigmoid(self.fc2(out))
        return x * out

class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, activation='relu'):
        super(ConvBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        self.bn = nn.BatchNorm2d(out_channels)
        self.activation = activation

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        if self.activation == 'relu':
            return F.relu(x)
        elif self.activation == 'mish':
            return x * torch.tanh(F.softplus(x))  # Mish activation function
        elif self.activation == 'hard_swish':
            return x * F.relu6(x + 3) / 6  # Hard Swish activation function
        else:
            return x

class HierarchicalSplitBlock(nn.Module):
    def __init__(self, channels):
        super(HierarchicalSplitBlock, self).__init__()
        self.split_channels = channels // 2
        self.path1 = ConvBlock(self.split_channels, self.split_channels, 1, activation='relu')
        self.path2 = ConvBlock(self.split_channels, self.split_channels, 1, activation='relu')

    def forward(self, x):
        x1, x2 = x.chunk(2, dim=1)
        x1 = self.path1(x1)
        x2 = self.path2(x2)
        out = torch.cat([x1, x2], dim=1)
        return out

class BalancedLayer(nn.Module):
    def __init__(self, channels):
        super(BalancedLayer, self).__init__()
        self.fc = nn.Conv2d(channels, channels, kernel_size=1)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        weights = self.softmax(self.fc(x.mean(dim=(2, 3), keepdim=True)))
        return x * weights

class Hsnet(nn.Module):
    def __init__(self, num_classes):
        super(Hsnet, self).__init__()
        self.initial_conv = ConvBlock(3, 32, 1)  # Reduced initial channels
        self.hierarchical_block = HierarchicalSplitBlock(32)
        self.se_block = SEBlock(32)
        self.balanced_layer = BalancedLayer(32)
        self.final_conv = ConvBlock(32, 64, 1)  # Adjusted final channels
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.classifier = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.initial_conv(x)
        x = self.hierarchical_block(x)
        x = self.se_block(x)
        x = self.balanced_layer(x)
        x = self.final_conv(x)
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

# 示例训练循环
def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(train_loader)}")

# 数据增强
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
])

# 创建自定义数据集和数据加载器
data = torch.randn(1000, 3, 32, 32)  # 示例数据
labels = torch.randint(0, 10, (1000,))  # 示例标签
dataset = CustomDataset(data, labels, transform=transform)
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

# 计算权重并创建损失函数
class_weights = calculate_class_weights(labels)
criterion = WeightedCrossEntropyLoss(class_weights)

# 创建模型和优化器
model = Hsnet(num_classes=10)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 训练模型
train_model(model, train_loader, criterion, optimizer)


NameError: name 'transforms' is not defined

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import timm

class CustomModel(nn.Module):
    def __init__(self, base_model):
        super(CustomModel, self).__init__()
        self.base_model = base_model

    def forward(self, x, before=False):
        # 获取模型的最终输出
        final_output = self.base_model(x)
        
        if before:
            # 假设soft_label是经过softmax的概率分布
            soft_label = F.softmax(final_output, dim=1)
            
            # 假设soft_no_softmax是未经处理的logits
            soft_no_softmax = final_output
            
            return final_output, soft_label, soft_no_softmax
        else:
            return final_output

# 创建一个自定义模型实例
base_model = timm.create_model('resnet50', pretrained=False)
model = CustomModel(base_model)

# 输入图像的模拟张量
images = torch.randn(8, 3, 224, 224)

# 调用模型并获取输出
outputs = model(images, before=True)
output, soft_label, soft_no_softmax = outputs

print("Final Output:", output)
print("Soft Label:", soft_label)
print("Soft No Softmax:", soft_no_softmax)


Final Output: tensor([[-0.0382,  0.1532,  0.1824,  ...,  0.2673, -0.1966,  0.3453],
        [-0.0425,  0.2310,  0.0764,  ...,  0.1513, -0.1323,  0.4988],
        [-0.0453,  0.2080,  0.0991,  ...,  0.1463, -0.1402,  0.3995],
        ...,
        [-0.0988,  0.1952,  0.2048,  ...,  0.3187, -0.0922,  0.3970],
        [-0.0826,  0.1793,  0.0706,  ...,  0.2799, -0.1421,  0.3789],
        [-0.1023,  0.2133,  0.1069,  ...,  0.2155, -0.1124,  0.4104]],
       grad_fn=<AddmmBackward0>)
Soft Label: tensor([[0.0009, 0.0011, 0.0011,  ..., 0.0013, 0.0008, 0.0014],
        [0.0009, 0.0012, 0.0010,  ..., 0.0011, 0.0008, 0.0016],
        [0.0009, 0.0012, 0.0011,  ..., 0.0011, 0.0008, 0.0014],
        ...,
        [0.0009, 0.0012, 0.0012,  ..., 0.0013, 0.0009, 0.0014],
        [0.0009, 0.0011, 0.0010,  ..., 0.0013, 0.0008, 0.0014],
        [0.0009, 0.0012, 0.0011,  ..., 0.0012, 0.0009, 0.0014]],
       grad_fn=<SoftmaxBackward0>)
Soft No Softmax: tensor([[-0.0382,  0.1532,  0.1824,  ...,  0.2673, -0.196