In [1]:
!unzip /content/sample_data/archive.zip -d/content/sample_data/monet/monet_tfrec

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: /content/sample_data/monet/monet_tfrec/images/62764_2019.jpg  
  inflating: /content/sample_data/monet/monet_tfrec/images/62765_2019.jpg  
  inflating: /content/sample_data/monet/monet_tfrec/images/62766_2019.jpg  
  inflating: /content/sample_data/monet/monet_tfrec/images/62767_2019.jpg  
  inflating: /content/sample_data/monet/monet_tfrec/images/62768_2019.jpg  
  inflating: /content/sample_data/monet/monet_tfrec/images/62769_2019.jpg  
  inflating: /content/sample_data/monet/monet_tfrec/images/6276_2003.jpg  
  inflating: /content/sample_data/monet/monet_tfrec/images/62770_2019.jpg  
  inflating: /content/sample_data/monet/monet_tfrec/images/62771_2019.jpg  
  inflating: /content/sample_data/monet/monet_tfrec/images/62772_2019.jpg  
  inflating: /content/sample_data/monet/monet_tfrec/images/62773_2019.jpg  
  inflating: /content/sample_data/monet/monet_tfrec/images/62774_2019.jpg  
  inflating: /content/sa

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from torchvision.utils import save_image
from torchvision.utils import make_grid
from torch.utils.data import DataLoader
import torchvision
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from tqdm.notebook import tqdm
import math
import os
from glob import glob


In [3]:
DIR = '/content/sample_data/monet'
image_size = 128
batch_size = 256
stats = (0.5, 0.5, 0.5), (0.5, 0.5, 0.5)


In [4]:
transform = transforms.Compose([
    transforms.Resize(image_size),  # Đảm bảo kích thước đồng nhất
    transforms.CenterCrop(image_size),
    transforms.ToTensor(),
    transforms.Normalize(*stats)
])
# Create a Dataset object using your image directory and transforms
dataset = torchvision.datasets.ImageFolder(
    DIR,
    transform=transform
)

# Create a DataLoader object to load batches of data
train_dataset = DataLoader(
    dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=3,
    pin_memory=True
)



In [None]:
def denorm(img):
    return img* stats[1][0] + stats[0][0]

In [None]:
def show_images(images, nmax=49):
    fig, ax = plt.subplots(figsize=(10, 10))
    ax.set_xticks([]); ax.set_yticks([])
    imd = denorm(images.detach()[:nmax])
    ax.imshow(make_grid(imd, nrow=7).permute(1, 2, 0))

def show_batch(dl, nmax=49):
    for images, _ in dl:
        show_images(images, nmax)
        break

In [None]:
show_batch(train_dataset)

In [None]:
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')

def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device

    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl:
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [None]:
device = get_default_device()
device

In [None]:
train_dl = DeviceDataLoader(train_dataset, device)

In [None]:
discriminator = nn.Sequential(
    # in: 3 x 128 x 128

    nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(64),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 64 x 64 x 64

    nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 128 x 32 x 32

    nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(256),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 256 x 16 x 16

    nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(512),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 512 x 8 x 8

    nn.Conv2d(512, 512, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(512),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 512 x 4 x 4

    nn.Flatten(),
    nn.Linear(512 * 4 * 4, 200),
    nn.LeakyReLU(0.2, inplace=True),

    # Sigmoid activation for classification
    nn.Linear(200, 1),
    nn.Sigmoid()
)

In [None]:
discriminator = to_device(discriminator, device)

In [None]:
num_parameters = sum(p.numel() for p in discriminator.parameters())
print("Number of parameters:", num_parameters)

In [None]:
latent_size = 160

In [None]:
generator = nn.Sequential(
    # in: latent_size x 1 x 1

    nn.ConvTranspose2d(latent_size, 512, kernel_size=4, stride=1, padding=0, bias=False),
    nn.BatchNorm2d(512),
    nn.ReLU(True),
    # out: 512 x 4 x 4

    nn.ConvTranspose2d(512, 512, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(512),
    nn.ReLU(True),
    # out: 512 x 8 x 8

    nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(256),
    nn.ReLU(True),
    # out: 256 x 16 x 16

    nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.ReLU(True),
    # out: 128 x 32 x 32

    nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(64),
    nn.ReLU(True),
    # out: 64 x 64 x 64

    nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1, bias=False),
    nn.Tanh()
    # out: 3 x 128 x 128
)

In [None]:
num_parameters = sum(p.numel() for p in generator.parameters())
print("Number of parameters:", num_parameters)

In [None]:
xb = torch.randn(batch_size, latent_size, 1, 1) # random latent tensors
fake_images = generator(xb)
print(fake_images.shape)
show_images(fake_images)

In [None]:
generator = to_device(generator, device)

In [None]:
def train_discriminator(real_images, opt_d):
    # Clear discriminator gradients
    opt_d.zero_grad()

    # Pass real images through discriminator
    real_preds = discriminator(real_images)
    real_targets = torch.ones(real_images.size(0), 1, device=device)
    real_loss = F.binary_cross_entropy(real_preds, real_targets)
    real_score = torch.mean(real_preds).item()

    # Generate fake images
    latent = torch.randn(batch_size, latent_size, 1, 1, device=device)
    fake_images = generator(latent)

    # Pass fake images through discriminator
    fake_targets = torch.zeros(fake_images.size(0), 1, device=device)
    fake_preds = discriminator(fake_images)
    fake_loss = F.binary_cross_entropy(fake_preds, fake_targets)
    fake_score = torch.mean(fake_preds).item()

    # Update discriminator weights
    loss = real_loss + fake_loss
    loss.backward()
    opt_d.step()
    return loss.item(), real_score, fake_score

In [None]:
def train_generator(opt_g):
    # Clear generator gradients
    opt_g.zero_grad()

    # Generate fake images
    latent = torch.randn(batch_size, latent_size, 1, 1, device=device)
    fake_images = generator(latent)

    # Try to fool the discriminator
    preds = discriminator(fake_images)
    targets = torch.ones(batch_size, 1, device=device)
    loss = F.binary_cross_entropy(preds, targets)

    # Update generator weights
    loss.backward()
    opt_g.step()

    return loss.item()

In [None]:
sample_dir = 'generated'
os.makedirs(sample_dir, exist_ok=True)

In [None]:
def save_samples(index, latent_tensors, show=True):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    fake_images = generator(latent_tensors.to(device))
    fake_fname = 'generated-images-{0:0=4d}.png'.format(index)
    save_image(denorm(fake_images), os.path.join(sample_dir, fake_fname),nrow=7)
    print('Saving', fake_fname)

    if show:
        fig, ax = plt.subplots(figsize=(10, 10))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(make_grid(fake_images.cpu().detach(), nrow=7).permute(1, 2, 0))

In [None]:
fixed_latent = torch.randn(49, latent_size, 1, 1).to(device)

In [None]:
save_samples(0, fixed_latent)

In [None]:
def fit(epochs, lr, start_idx=1):
    torch.cuda.empty_cache()

    # Losses & scores
    losses_g = []
    losses_d = []
    real_scores = []
    fake_scores = []

    # Create optimizers
    opt_d = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))
    opt_g = torch.optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))

    for epoch in range(epochs):
        for real_images, _ in tqdm(train_dl):
            # Train discriminator
            loss_d, real_score, fake_score = train_discriminator(real_images, opt_d)
            # Train generator
            loss_g = train_generator(opt_g)

        # Record losses & scores
        losses_g.append(loss_g)
        losses_d.append(loss_d)
        real_scores.append(real_score)
        fake_scores.append(fake_score)

        # Log losses & scores (last batch)
        print("Epoch [{}/{}], loss_g: {:.4f}, loss_d: {:.4f}, real_score: {:.4f}, fake_score: {:.4f}".format(
            epoch+1, epochs, loss_g, loss_d, real_score, fake_score))

        # Save generated images
        save_samples(epoch+start_idx, fixed_latent, show=False)

    return losses_g, losses_d, real_scores, fake_scores

In [None]:
lr = 0.001
epochs = 20

In [None]:
history = fit(epochs, lr)

In [None]:
losses_g, losses_d, real_scores, fake_scores = history
torch.save(generator.state_dict(), 'G.pth')
torch.save(discriminator.state_dict(), 'D.pth')

In [None]:
from IPython.display import Image

In [None]:
Image('./generated/generated-images-0001.png')

In [None]:
Image('./generated/generated-images-0020.png')

In [None]:
plt.plot(losses_d, '-')
plt.plot(losses_g, '-')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(['Discriminator', 'Generator'])
plt.title('Losses');