普遍宽度: 548 像素, 普遍高度: 822 像素


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torchvision.utils as vutils
from torch.utils.data import Dataset, DataLoader
import optuna
from PIL import Image
import os
from torch.utils.tensorboard import SummaryWriter

In [2]:
# Define other missing variables
nz = 100  # Size of the latent vector
ngf = 64  # Size of feature maps in the generator
ndf = 64  # Size of feature maps in the discriminator
nc = 3    # Number of channels (RGB images)
image_size = 64  # Size of the generated images

In [3]:
# 定义生成器（Generator）
class Generator(nn.Module):
    def __init__(self, nz, ngf, nc):
        super(Generator, self).__init__()
        self.nz = nz  # Add the 'nz' attribute
        self.main = nn.Sequential(
            nn.ConvTranspose2d(nz, ngf * 8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True),

            nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),

            nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),

            nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),

            nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False),
            nn.Tanh() # 输出在[-1, 1]范围内的像素值
        )

    def forward(self, x):
        return self.main(x)



# 定义判别器（Discriminator）
class Discriminator(nn.Module):
    def __init__(self, nc, ndf):
        super(Discriminator, self).__init__()
        self.main = nn.Sequential(
            nn.Conv2d(nc, ndf, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False), # 输出一个标量值，表示输入图像的真实性
            nn.Sigmoid() # 输出范围在0-1之间，表示输入图像为真实图像的概率
        )

    def forward(self, x):
        return self.main(x)

In [4]:
# 定义GAN模型部分
class GAN:
    def __init__(self, generator, discriminator, generator_optimizer, discriminator_optimizer, criterion):
        self.generator = generator
        self.discriminator = discriminator
        self.generator_optimizer = generator_optimizer
        self.discriminator_optimizer = discriminator_optimizer
        self.criterion = criterion

        self.discriminator_loss = []  # To track discriminator loss during training

    def train(self, data_loader, num_epochs, log_interval=100, save_img_interval=500):
        self.generator.train()
        self.discriminator.train()

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.generator.to(device)
        self.discriminator.to(device)

        fixed_noise = torch.randn(64, self.generator.nz, 1, 1, device=device)

        for epoch in range(num_epochs):
            for batch_idx, real_images in enumerate(data_loader):
                real_images = real_images.to(device)
                batch_size = real_images.size(0)

                # 训练判别器（Discriminator）
                self.discriminator_optimizer.zero_grad()
                # 生成假图片
                z = torch.randn(batch_size, self.generator.nz, 1, 1, device=device)
                fake_images = self.generator(z)
                # 计算判别器损失，注意需要detach()假图片，避免梯度传递到生成器
                real_preds = self.discriminator(real_images).view(-1)
                fake_preds = self.discriminator(fake_images.detach()).view(-1)
                d_loss = 0.5 * (torch.mean((real_preds - 1) ** 2) + torch.mean(fake_preds ** 2))
                d_loss.backward()
                self.discriminator_optimizer.step()

                # 训练生成器（Generator）
                self.generator_optimizer.zero_grad()
                # 生成假图片并计算判别器输出
                # Make sure the generator input (z) is also on the same device
                z = torch.randn(batch_size, self.generator.nz, 1, 1, device=device)
                fake_images = self.generator(z)
                fake_preds = self.discriminator(fake_images).view(-1)
                # 计算生成器损失
                g_loss = 0.5 * torch.mean((fake_preds - 1) ** 2)
                g_loss.backward()
                self.generator_optimizer.step()

                # Track discriminator loss
                self.discriminator_loss.append(d_loss.item())
                if batch_idx % log_interval == 0:
                    # Log generated images to TensorBoard
                    with torch.no_grad():
                        fake_images = self.generator(fixed_noise).detach().cpu()
                    fake_images_grid = vutils.make_grid(fake_images, padding=2, normalize=True)
                    self.writer.add_image("Generated Images", fake_images_grid, global_step=epoch * len(data_loader) + batch_idx)

                if batch_idx % save_img_interval == 0:
                    # Save a batch of generated images to a folder
                    with torch.no_grad():
                        fake_images_batch = self.generator(fixed_noise).detach().cpu()
                    os.makedirs(f"data/multiple_stages_images", exist_ok=True)
                    for i in range(fake_images_batch.size(0)):
                        vutils.save_image(fake_images_batch[i], f"data/multiple_stages_images/fake_epoch_{epoch}_batch_{batch_idx}_img_{i}.png")
                        
        self.writer.close()

In [5]:
# 数据加载和预处理

class CustomDataset(Dataset):
    def __init__(self, data_root, transform=None):
        self.data_root = data_root
        self.image_files = [f for f in os.listdir(data_root) if f.lower().endswith(".tif")]
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = os.path.join(self.data_root, self.image_files[idx])

        try:
            image = Image.open(img_name).convert("RGB")

            if self.transform:
                image = self.transform(image)

            # 返回数据（不返回标签）
            return image
        except Exception as e:
            print(f"Error loading image {img_name}: {e}")
            # 返回一个空tensor，表示数据加载失败
            return torch.empty((3, 64, 64))

def get_data_loader(data_root, batch_size):
    transform = transforms.Compose([
        transforms.Resize(64),
        transforms.CenterCrop(64),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

    dataset = CustomDataset(data_root, transform=transform)
    data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=0)

    return data_loader

In [6]:
# Global variable to track the best discriminator loss and the corresponding generator
best_discriminator_loss = float('inf')
best_generator = None

In [7]:
# 主函数
def objective(trial):
    global best_discriminator_loss
    global best_generator

    batch_size = 16
    num_epochs = 200
    lr_g = trial.suggest_float("lr_g", 1e-5, 1e-2, log=True)
    lr_d = trial.suggest_float("lr_d", 1e-5, 1e-2, log=True)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    data_loader = get_data_loader("data/Plaster_side", batch_size)

    nz = 100  # Size of the latent vector

    generator = Generator(nz, ngf, nc).to(device)
    discriminator = Discriminator(nc, ndf).to(device)

    generator_optimizer = optim.Adam(generator.parameters(), lr=lr_g, betas=(0.5, 0.999))
    discriminator_optimizer = optim.Adam(discriminator.parameters(), lr=lr_d, betas=(0.5, 0.999))

    criterion = nn.BCELoss()

    # 定义GAN实例并添加TensorBoard writer
    gan = GAN(generator, discriminator, generator_optimizer, discriminator_optimizer, criterion)
    gan.writer = SummaryWriter(log_dir=f"runs/{trial.number}")

    gan.train(data_loader, num_epochs)

    # Calculate the average discriminator loss as the optimization objective
    avg_discriminator_loss = sum(gan.discriminator_loss) / len(gan.discriminator_loss)

    # Save the best generator's images if the current model performs better
    if avg_discriminator_loss < best_discriminator_loss:
        best_discriminator_loss = avg_discriminator_loss
        best_generator = gan.generator

    return avg_discriminator_loss

In [8]:
# 可视化训练过程
def visualize_training(study):
    # 使用Tensorboard可视化训练过程
    writer = SummaryWriter(log_dir="runs")

    # Get the trials as a DataFrame
    df = study.trials_dataframe()

    # If the 'params' column is not present, use a fallback for hyperparameters
    if 'params' not in df.columns:
        for i in range(len(df)):
            value = df.loc[i, "value"]
            writer.add_scalar("Loss/Discriminator", value, i)
    else:
        for i in range(len(df)):
            value = df.loc[i, "value"]
            params = df.loc[i, "params"]

            writer.add_scalar("Loss/Discriminator", value, i)
            for name, param in params.items():
                writer.add_scalar("Parameters/" + name, param, i)

    writer.close()


In [2]:
%load_ext tensorboard
%tensorboard --logdir 'runs'

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard
