In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torchvision
import torchvision.transforms as transforms

# Define the diffusion model


class DiffusionModel(nn.Module):
    def __init__(self, noise_size=256, num_steps=1000):
        super(DiffusionModel, self).__init__()
        self.noise_size = noise_size
        self.num_steps = num_steps

        # Define the diffusion process
        self.diffusion = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(3, 64, 3, stride=1, padding=1),
                nn.ReLU(),
                nn.Conv2d(64, 64, 3, stride=1, padding=1),
                nn.ReLU(),
                nn.Conv2d(64, 3, 3, stride=1, padding=1)
            ) for _ in range(self.num_steps)
        ])

        # Define the noise generator
        self.noise_generator = nn.Sequential(
            nn.Linear(self.noise_size, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, 1024),
            nn.ReLU(),
            nn.Linear(1024, 3 * 64 * 64),
            nn.Tanh()
        )

    def forward(self, noise):
        # Generate the initial image from noise
        x = self.noise_generator(noise).view(-1, 3, 64, 64)

        # Iterate over the diffusion process
        for i in range(self.num_steps):
            noise_i = torch.randn_like(x)
            x = x + (np.sqrt(2.0) / np.sqrt(self.num_steps)) * noise_i
            x = self.diffusion[i](x)

        return x


# Define the UNet model
class UNetModel(nn.Module):
    def __init__(self):
        super(UNetModel, self).__init__()

        # Define the encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 32, 3, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 128, 3, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 256, 3, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 512, 3, stride=2, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU()
        )

        # Define the decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(512, 256, 4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 4, stride=2, padding=1),
            nn.Tanh()
        )

    def forward(self, x):
        # Encode the input image
        x = self.encoder(x)

        # Decode the encoded image
        x = self.decoder(x)

        return x

# Train the diffusion model with the UNet model


def train_diffusion_model_with_unet(dataloader):
    # Define the hyperparameters
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    noise_size = 256
    num_steps = 1000
    batch_size = 32
    num_epochs = 100
    learning_rate = 0.001

    # Define the models
    diffusion_model = DiffusionModel(noise_size, num_steps)
    unet_model = UNetModel()

    # Define the loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.Adam(list(diffusion_model.parameters(
    )) + list(unet_model.parameters()), lr=learning_rate)

    # Define the data loader
    # TODO: Define your own data loader

    # Train the models
    for epoch in range(num_epochs):
        for i, (images, _) in enumerate(dataloader):
            # Generate noise
            noise = torch.randn(batch_size, noise_size).to(device)

            # Generate images with the diffusion model
            generated_images = diffusion_model(noise)

            # Generate refined images with the UNet model
            refined_images = unet_model(generated_images)

            # Compute the loss
            loss = loss_fn(refined_images, images.to(device))

            # Backward pass and update the weights
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Print the loss
            if i % 100 == 0:
                print(
                    f"Epoch [{epoch+1}/{num_epochs}] Step [{i}/{len(dataloader)}] Loss: {loss.item():.4f}")


transform = transforms.Compose([
    # transforms.Grayscale(),
    transforms.ToTensor(),
    # transforms.Normalize((0.5,), (0.5,))
])

train_set = torchvision.datasets.ImageFolder(
    './data/char', transform=transform)
train_loader = torch.utils.data.DataLoader(
    train_set, batch_size=None, shuffle=True, num_workers=4)

train_diffusion_model_with_unet(train_loader)


RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cpu and cuda:0! (when checking argument for argument mat1 in method wrapper_addmm)