In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau
from sklearn.model_selection import train_test_split

In [None]:
# 定义基本模块
def resnet_block(inputs, num_filters=16, kernel_size=3, strides=1, activation='relu', weight_decay=0.0005):
    x = nn.Conv2d(inputs, num_filters, kernel_size=kernel_size, stride=strides, padding='same', bias=False)
    x = nn.BatchNorm2d(x)
    if activation == 'relu':
        x = nn.ReLU(x)
    return x

# 建一个20层的ResNet网络
class ResNet(nn.Module):
    def __init__(self, input_shape=(3, 32, 32)):
        super(ResNet, self).__init__()
        self.conv1 = resnet_block(input_shape[0], num_filters=16)
        self.blocks2_7 = self._make_layer(resnet_block, 16, 6)
        self.blocks8_13 = self._make_layer(resnet_block, 32, 6, stride=2)
        self.blocks14_19 = self._make_layer(resnet_block, 64, 6, stride=2)
        self.avgpool = nn.AvgPool2d(kernel_size=2)
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(64 * 4 * 4, 10)

    def _make_layer(self, block, num_filters, num_blocks, stride=1):
        layers = []
        layers.append(block(num_filters, kernel_size=3, strides=stride))
        for _ in range(1, num_blocks):
            layers.append(block(num_filters, kernel_size=3))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.blocks2_7(x)
        x = self.blocks8_13(x)
        x = self.blocks14_19(x)
        x = self.avgpool(x)
        x = self.flatten(x)
        x = self.fc(x)
        return x

# 标准化
def standardization(x_train, x_test):
    mean = np.mean(x_train, axis=(0, 2, 3))
    std = np.std(x_train, axis=(0, 2, 3))
    x_train = (x_train - mean.reshape(1, -1, 1, 1)) / (std.reshape(1, -1, 1, 1) + 1e-7)
    x_test = (x_test - mean.reshape(1, -1, 1, 1)) / (std.reshape(1, -1, 1, 1) + 1e-7)
    return x_train, x_test

# 归一化
def normalize(x_train, x_test):
    x_train_max = np.max(x_train)
    x_train_min = np.min(x_train)
    x_test_max = np.max(x_test)
    x_test_min = np.min(x_test)
    x_train = (x_train - x_train_min) / (x_train_max - x_train_min)
    x_test = (x_test - x_test_min) / (x_test_max - x_test_min)
    return x_train, x_test


In [None]:
# 训练参数
batch_size = 128
max_epochs = 100
learning_rate = 0.01
lr_decay = 1e-6
lr_drop = 20
num_classes = 10
weight_decay = 0.0005
x_shape = [32, 32, 3]

# 数据载入
transform = transforms.Compose([
    transforms.ToTensor(),
])

train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

x_train, x_val, y_train, y_val = train_test_split(
    train_dataset.data, train_dataset.targets, test_size=0.1, random_state=42)

# 数据处理
x_train, x_val = normalize(x_train, x_val)

# 转换数据为PyTorch的TensorDataset
train_dataset = TensorDataset(torch.Tensor(x_train), torch.LongTensor(y_train))
val_dataset = TensorDataset(torch.Tensor(x_val), torch.LongTensor(y_val))
test_dataset = TensorDataset(torch.Tensor(test_dataset.data), torch.LongTensor(test_dataset.targets))

# 转换数据为PyTorch的DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 动态学习率
def lr_scheduler(epoch):
    return learning_rate * (0.5 ** (epoch // lr_drop))

# 数据增强
transform_augmented = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ToTensor(),
])

train_dataset_augmented = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_augmented)
train_loader_augmented = DataLoader(train_dataset_augmented, batch_size=batch_size, shuffle=True)

# 模型搭建
model = ResNet(input_shape=(3, 32, 32))

# 优化器设置
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=weight_decay)
scheduler = StepLR(optimizer, step_size=lr_drop, gamma=0.5)

# 损失函数和评价指标
criterion = nn.CrossEntropyLoss()

# 回调函数设置
checkpoint = torch.save(model.state_dict(), './resnet.pth')
lr_scheduler = optim.lr_scheduler.LambdaLR(optimizer, lr_scheduler)
lr_reducer = ReduceLROnPlateau(optimizer, mode='max', factor=0.2, patience=5, min_lr=1e-3, verbose=True)
callbacks = [checkpoint, lr_scheduler, lr_reducer]

In [None]:
# 模型训练
def train_model(model, train_loader, criterion, optimizer, scheduler, num_epochs, device):
    model.to(device)
    best_accuracy = 0.0

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        train_loss = running_loss / len(train_loader)
        train_accuracy = correct / total

        print(f'Epoch {epoch + 1}/{num_epochs}, '
              f'Training Loss: {train_loss:.4f}, '
              f'Training Accuracy: {100 * train_accuracy:.2f}%')

        if train_accuracy > best_accuracy:
            best_accuracy = train_accuracy
            torch.save(model.state_dict(), './resnet.pth')

        scheduler.step()

# 数据载入
transform = transforms.Compose([
    transforms.ToTensor(),
])

train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

x_train, x_val, y_train, y_val = train_test_split(
    train_dataset.data, train_dataset.targets, test_size=0.1, random_state=42)

# 数据处理
x_train, x_val = normalize(x_train, x_val)

# 转换数据为PyTorch的TensorDataset
train_dataset = TensorDataset(torch.Tensor(x_train), torch.LongTensor(y_train))
val_dataset = TensorDataset(torch.Tensor(x_val), torch.LongTensor(y_val))
test_dataset = TensorDataset(torch.Tensor(test_dataset.data), torch.LongTensor(test_dataset.targets))

# 转换数据为PyTorch的DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 模型搭建
model = ResNet(input_shape=(3, 32, 32))

# 优化器设置
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=weight_decay)
scheduler = StepLR(optimizer, step_size=lr_drop, gamma=0.5)

# 损失函数和评价指标
criterion = nn.CrossEntropyLoss()

# 模型训练
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_model(model, train_loader, criterion, optimizer, scheduler, max_epochs, device)


In [None]:
#输出准确率
# 模型评估
def evaluate_model(model, test_loader, device):
    model.eval()
    all_predictions = []
    true_labels = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)

            all_predictions.extend(predicted.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(true_labels, all_predictions)
    print(f'Test Accuracy: {100 * accuracy:.2f}%')
print('Test loss:',scores[0])
print('Test accuracy:',scores[1])

# 绘图
train_acc = hist.history['accuracy']
val_acc = hist.history['val_accuracy']
train_loss = hist.history['loss']
val_loss = hist.history['val_loss']

epochs = range(1, len(train_acc)+1)
plt.plot(epochs, train_acc, 'b', label = 'Training acc')
plt.plot(epochs, val_acc, 'r', label = 'Validation acc')
plt.title('RestNet-Training and validation accuracy')
plt.legend()
plt.savefig("resnet_accuracy.png")
plt.figure()
plt.plot(epochs, train_loss, 'b', label = 'Training loss')
plt.plot(epochs, val_loss, 'r', label = 'Validation loss')
plt.title('RestNet-Training and validation loss')
plt.legend()
plt.savefig("resnet_loss.png")