In [1]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data
import torchvision.utils as vutils
import numpy as np

In [2]:
batch_size = 16
n_epoch = 50
lr = 0.001
nz = 100
nch = 1
nch_g = 28
nch_d = 28
result_dir = "./gan_results"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device: {}".format(device))
try:
    os.makedirs(result_dir)
except OSError:
    pass

device: cuda


In [3]:
def loadData(img_path, label_path):
    img_np = np.load(img_path)["arr_0"].reshape(-1, 1, 28, 28)
    img_np = img_np/255
    label_np = np.load(label_path)["arr_0"]
    return torch.utils.data.TensorDataset(torch.from_numpy(img_np),
                                            torch.from_numpy(label_np))

train_data = loadData("kmnist/k49-train-imgs.npz", "kmnist/k49-train-labels.npz")
train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True)
test_data = loadData("kmnist/k49-test-imgs.npz", "kmnist/k49-test-labels.npz")
test_loader = torch.utils.data.DataLoader(test_data)

In [4]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:            
        m.weight.data.normal_(0.0, 0.02)
        m.bias.data.fill_(0)
    elif classname.find('Linear') != -1:
        m.weight.data.normal_(0.0, 0.02)
        m.bias.data.fill_(0)
    elif classname.find('BatchNorm') != -1:
        m.weight.data.normal_(1.0, 0.02)
        m.bias.data.fill_(0)

In [5]:
class Generator(nn.Module):
    def __init__(self, nz=100, nch_g=28, nch=1):
        super(Generator, self).__init__()
        self.layers = nn.ModuleDict({
            'layer0': nn.Sequential(
                nn.ConvTranspose2d(nz, nch_g * 8, 4, 1, 0), 
                nn.BatchNorm2d(nch_g * 8),    
                nn.ReLU()
            ),  # (100, 1, 1) -> (512, 4, 4)
            'layer1': nn.Sequential(
                nn.ConvTranspose2d(nch_g * 8, nch_g * 4, 4, 2, 1),
                nn.BatchNorm2d(nch_g * 4),
                nn.ReLU()
            ),  # (512, 4, 4) -> (256, 8, 8)
            'layer2': nn.Sequential(
                nn.ConvTranspose2d(nch_g * 4, nch_g * 2, 4, 2, 1),
                nn.BatchNorm2d(nch_g * 2),
                nn.ReLU()
            ),  # (256, 8, 8) -> (128, 16, 16)
            'layer3': nn.Sequential(
                nn.ConvTranspose2d(nch_g * 2, nch_g, 4, 2, 1),
                nn.BatchNorm2d(nch_g),
                nn.ReLU()
            ),  # (128, 16, 16) -> (64, 32, 32)
            'layer4': nn.Sequential(
                nn.ConvTranspose2d(nch_g, nch, 4, 2, 1),
                nn.Tanh()
            )   # (64, 32, 32) -> (3, 64, 64)
        })
    
    def forward(self, x):
        for layer in layers.values():
            x = layer(x)
        return x

net_g = Generator(nz=nz, nch_g=nch_g, nch=1).to(device)
net_g.apply(weights_init)
print(net_g)

Generator(
  (layers): ModuleDict(
    (layer0): Sequential(
      (0): ConvTranspose2d(100, 224, kernel_size=(4, 4), stride=(1, 1))
      (1): BatchNorm2d(224, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer1): Sequential(
      (0): ConvTranspose2d(224, 112, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): BatchNorm2d(112, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer2): Sequential(
      (0): ConvTranspose2d(112, 56, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): BatchNorm2d(56, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer3): Sequential(
      (0): ConvTranspose2d(56, 28, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): BatchNorm2d(28, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer4): Sequential(
      (0): ConvTranspose2d(28, 1, kernel

In [6]:
class Discriminator(nn.Module):
    def __init__(self, nch=1, nch_d=28):
        super(Discriminator, self).__init__()
        self.layers = nn.ModuleDict({
            'layer0': nn.Sequential(
                nn.Conv2d(nch, nch_d, 4, 2, 1),
                nn.LeakyReLU(negative_slope=0.2)    
            ),  # (3, 64, 64) -> (64, 32, 32)
            'layer1': nn.Sequential(
                nn.Conv2d(nch_d, nch_d * 2, 4, 2, 1),
                nn.BatchNorm2d(nch_d * 2),
                nn.LeakyReLU(negative_slope=0.2)
            ),  # (64, 32, 32) -> (128, 16, 16)
            'layer2': nn.Sequential(
                nn.Conv2d(nch_d * 2, nch_d * 4, 4, 2, 1),
                nn.BatchNorm2d(nch_d * 4),
                nn.LeakyReLU(negative_slope=0.2)
            ),  # (128, 16, 16) -> (256, 8, 8)
            'layer3': nn.Sequential(
                nn.Conv2d(nch_d * 4, nch_d * 8, 4, 2, 1),
                nn.BatchNorm2d(nch_d * 8),
                nn.LeakyReLU(negative_slope=0.2)
            ),  # (256, 8, 8) -> (512, 4, 4)
            'layer4': nn.Conv2d(nch_d * 8, 1, 4, 1, 0)
            # (512, 4, 4) -> (1, 1, 1)
        })
    
    def forward(self, x):
        for layer in self.layers.values():  
            x = layer(x)
        return x.squeeze()
    
net_d = Discriminator(nch, nch_d=nch_d).to(device)
net_d.apply(weights_init)
print(net_d)

Discriminator(
  (layers): ModuleDict(
    (layer0): Sequential(
      (0): Conv2d(1, 28, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): LeakyReLU(negative_slope=0.2)
    )
    (layer1): Sequential(
      (0): Conv2d(28, 56, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): BatchNorm2d(56, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (layer2): Sequential(
      (0): Conv2d(56, 112, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): BatchNorm2d(112, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (layer3): Sequential(
      (0): Conv2d(112, 224, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): BatchNorm2d(224, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (layer4): Conv2d(224, 1, kernel_size=(4, 4), stride=(1, 1))
  )
)


In [7]:
criterion = nn.MSELoss()
optimizer_g = torch.optim.Adam(net_g.parameters(), lr=lr)
optimizer_d = torch.optim.Adam(net_d.parameters(), lr=lr)

fixed_noise = torch.randn(batch_size, nz, 1, 1, device=device)

In [8]:
def train(epoch):
    for step, (data, target) in enumerate(train_loader, 0):
        real_image = data.to(device)
        
        sample_size = real_image.size(0)
        noise = torch.randn(sample_size, nz, 1, 1, device=device)
        
        real_target = torch.full((sample_size,), 1, device=device)
        fake_target = torch.full((sample_size,), 0, device=device)
        
        net_d.zero_grad()
        output = net_d(real_image)
        loss_d_real = criterion(output, real_target)
        d_x = output.mean().item()

        fake_image = net_g(noise)
        output = net_d(fake_image.detach())
        loss_d_fake = criterion(output, fake_target)
        d_g_z1 = output.mean().item()
        
        loss_d = loss_d_real + loss_d_fake
        loss_d.backward()
        optimizer_d.step()
        
        net_g.zero_grad()
        output = net_d(fake_image)
        loss_g = criterion(output, real_target)
        loss_g.backward()
        d_g_z2 = output.mean().item()
        optimizer_g.step()
        if((step-1) % batch_size == 0):
            print("Epoch:{}, Loss_G:{:.4}, Loss_D:{:.4}".format(epoch+1, loss_g.item(), loss_d.item()))
    fake_image = netG(fixed_noise_label)   
    vutils.save_image(fake_image.detach(), '{}/fake_samples_epoch_{:03d}.png'.format(result_dir, epoch + 1), normalize=True, nrow=7)

In [9]:
for epoch in range(50):
    train(epoch)

RuntimeError: Input type (torch.cuda.DoubleTensor) and weight type (torch.cuda.FloatTensor) should be the same