In [None]:
import torch
import pandas as pd
import numpy as np
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import ToTensor
from torchvision import transforms
from sklearn.model_selection import train_test_split
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter(log_dir='./runs/fashion_mnist_experiment_2')

# 1，获取训练设备
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)

# 2，加载训练集和测试集
train_data = pd.read_csv('csv_book/train.csv')
test_data = pd.read_csv('csv_book/test.csv')

# 3，数据预处理
X = train_data.drop('label', axis=1).values
y = train_data['label'].values
X = X / 255.0
X = X.reshape(-1, 28, 28, 1)

# 4，训练集和验证集划分
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# 5，定义 MNISTDataset 类
class MNISTDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)

        image = image.float()
        return image, label

 # 6, 定义数据预处理（transform）
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# 7, 创建 DataLoader 对象
train_dataset = MNISTDataset(X_train, y_train, transform=transform)
test_dataset = MNISTDataset(X_val, y_val, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 8, 定义神经网络模型
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# 9, 训练函数（train_loop）
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        pred = model(X)
        loss = loss_fn(pred, y)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * batch_size + len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

        # 每个batch记录一次训练损失
        writer.add_scalar('Loss/train', loss, epoch * len(dataloader) + batch)


        # 记录输入图像
        if batch == 0:  # 只在第一个batch记录
            img_grid = utils.make_grid(X)
            writer.add_image('images', img_grid, epoch)

        # 记录学习率
        for g in optimizer.param_groups:
            writer.add_scalar('Learning Rate', g['lr'], epoch * len(dataloader) + batch)


# 10, 测试函数（test_loop）
def test_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

    #  记录测试集的损失和准确率
    writer.add_scalar('Loss/test', test_loss, epoch)
    writer.add_scalar('Accuracy/test', 100 * correct, epoch)

    # 记录每个epoch结束后的测试结果
    writer.add_text('Test Info', f'Epoch {epoch} - Test Accuracy: {100 * correct:.2f}%, Avg Loss: {test_loss:.4f}', epoch)

# 11, 训练模型
model = NeuralNetwork()
loss_fn = nn.CrossEntropyLoss()
learning_rate = 0.01
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

epochs = 3

# 记录模型计算图
dummy_input = torch.randn(1, 1, 28, 28) # 假设输入为 28x28 的灰度图像
writer.add_graph(model, dummy_input)

for epoch in range(epochs):
    print(f"Epoch {epoch+1}\n-------------------------------")
    train_loop(train_loader, model, loss_fn, optimizer)
    test_loop(test_loader, model, loss_fn)

# 训练完成，关闭 writer
writer.close()

print("完成!")
