In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np  # linear algebra
import pandas as pd  # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Monet Style GAN — Kaggle Kernel
This notebook mirrors the local baseline but is streamlined for execution inside Kaggle's *I'm Something of a Painter Myself* competition environment. It trains a DCGAN on the provided Monet paintings and exports the required `images.zip` submission bundle.

## Dependencies
In Kaggle's runtime the correct CPU/GPU builds of PyTorch are already available. Uncomment the next cell only if you encounter a missing dependency in a custom environment.

In [2]:
# %pip install --quiet torch torchvision

In [3]:
import math
import random
import zipfile
from pathlib import Path
from typing import Iterable, List

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, utils
from PIL import Image

from tqdm.auto import tqdm

def seed_everything(seed: int = 42) -> None:
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(2025)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

  from .autonotebook import tqdm as notebook_tqdm


device(type='cpu')

## Data Pipeline
Kaggle exposes the Monet dataset under `/kaggle/input/gan-getting-started/monet_jpg`. The helper below confirms availability and prepares a lightweight augmentation stack.

In [4]:
DATA_ROOT = Path('/kaggle/input/gan-getting-started/monet_jpg')
if not DATA_ROOT.exists():
    raise FileNotFoundError('Expected Monet data under /kaggle/input/gan-getting-started/monet_jpg')

BATCH_SIZE = 16 if device.type == 'cuda' else 8
IMAGE_SIZE = 256
NUM_WORKERS = 2 if device.type == 'cuda' else 0

train_transform = transforms.Compose([
    transforms.RandomResizedCrop(IMAGE_SIZE, scale=(0.8, 1.0), ratio=(0.9, 1.1)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.15, hue=0.03),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

FileNotFoundError: Expected Monet data under /kaggle/input/gan-getting-started/monet_jpg

In [None]:
class MonetDataset(Dataset):
    def __init__(self, root: Path, transform: transforms.Compose):
        self.files = sorted(root.glob('*.jpg'))
        if not self.files:
            raise RuntimeError(f'No JPEGs found in {root}.')
        self.transform = transform

    def __len__(self) -> int:
        return len(self.files)

    def __getitem__(self, idx: int):
        path = self.files[idx]
        with Image.open(path) as img:
            img = img.convert('RGB')
        return self.transform(img), 0

dataset = MonetDataset(DATA_ROOT, train_transform)
data_loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=NUM_WORKERS, pin_memory=(device.type == 'cuda'), drop_last=True)
len(dataset)

### Exploratory Batch
Quick visual sanity check ensures augmentations look reasonable before training.

In [None]:
sample_batch, _ = next(iter(data_loader))
grid = utils.make_grid(sample_batch[:16], nrow=4, normalize=True, value_range=(-1, 1))
plt.figure(figsize=(6, 6))
plt.imshow(grid.permute(1, 2, 0).cpu().numpy())
plt.axis('off')
plt.title('Random Monet paintings (augmented)')
plt.tight_layout()
plt.show()

## DCGAN Model
Generator and discriminator mirror the architecture recommended in the competition discussion threads with minor depth tweaks for 256×256 resolution.

In [None]:
LATENT_DIM = 128
GEN_FEATURES = 64
DISC_FEATURES = 64

def gen_block(in_ch: int, out_ch: int) -> Iterable[nn.Module]:
    return (
        nn.ConvTranspose2d(in_ch, out_ch, 4, 2, 1, bias=False),
        nn.BatchNorm2d(out_ch),
        nn.ReLU(inplace=True),
    )

def disc_block(in_ch: int, out_ch: int, *, use_bn: bool = True) -> Iterable[nn.Module]:
    layers = [nn.Conv2d(in_ch, out_ch, 4, 2, 1, bias=False)]
    if use_bn:
        layers.append(nn.BatchNorm2d(out_ch))
    layers.append(nn.LeakyReLU(0.2, inplace=True))
    return layers

class Generator(nn.Module):
    def __init__(self, latent_dim: int, feature_maps: int, channels: int = 3):
        super().__init__()
        layers: List[nn.Module] = [
            nn.ConvTranspose2d(latent_dim, feature_maps * 16, 4, 1, 0, bias=False),
            nn.BatchNorm2d(feature_maps * 16),
            nn.ReLU(inplace=True)
        ]
        layers.extend(gen_block(feature_maps * 16, feature_maps * 8))
        layers.extend(gen_block(feature_maps * 8, feature_maps * 4))
        layers.extend(gen_block(feature_maps * 4, feature_maps * 2))
        layers.extend(gen_block(feature_maps * 2, feature_maps))
        layers.extend(gen_block(feature_maps, feature_maps // 2))
        layers.append(nn.ConvTranspose2d(feature_maps // 2, channels, 4, 2, 1, bias=False))
        layers.append(nn.Tanh())
        self.net = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)

class Discriminator(nn.Module):
    def __init__(self, feature_maps: int, channels: int = 3):
        super().__init__()
        layers: List[nn.Module] = [
            nn.Conv2d(channels, feature_maps // 2, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True)
        ]
        layers.extend(disc_block(feature_maps // 2, feature_maps))
        layers.extend(disc_block(feature_maps, feature_maps * 2))
        layers.extend(disc_block(feature_maps * 2, feature_maps * 4))
        layers.extend(disc_block(feature_maps * 4, feature_maps * 8))
        layers.extend(disc_block(feature_maps * 8, feature_maps * 16))
        layers.append(nn.Conv2d(feature_maps * 16, 1, 4, 1, 0, bias=False))
        self.net = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x).view(-1)

### Architectural Rationale
- **Generator:** Progressive transposed convolutions double spatial resolution each block, keeping feature maps high until late layers for richer textures.
- **Discriminator:** Mirror depth with LeakyReLU activations to stabilise gradients while BN preserves signal.
- **Latent size:** `LATENT_DIM = 128` balances expressivity and convergence time; experiment with 256 for more detail if GPU budget allows.
- **Feature scaling:** Adjust `GEN_FEATURES` / `DISC_FEATURES` to grow or shrink the network based on leaderboard feedback.

In [None]:
def init_weights(module: nn.Module) -> None:
    classname = module.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(module.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(module.weight.data, 1.0, 0.02)
        nn.init.constant_(module.bias.data, 0)

generator = Generator(LATENT_DIM, GEN_FEATURES).to(device)
discriminator = Discriminator(DISC_FEATURES).to(device)
generator.apply(init_weights)
discriminator.apply(init_weights)

criterion = nn.BCEWithLogitsLoss()

G_LR = 2e-4
D_LR = 2e-4
BETA1, BETA2 = 0.5, 0.999
REAL_LABEL = 0.9
FAKE_LABEL = 0.1
GEN_TARGET = 1.0
MAX_PREVIEW_SNAPSHOTS = 12

optimizer_g = torch.optim.Adam(generator.parameters(), lr=G_LR, betas=(BETA1, BETA2))
optimizer_d = torch.optim.Adam(discriminator.parameters(), lr=D_LR, betas=(BETA1, BETA2))
scheduler_g = torch.optim.lr_scheduler.ExponentialLR(optimizer_g, gamma=0.99)
scheduler_d = torch.optim.lr_scheduler.ExponentialLR(optimizer_d, gamma=0.99)

fixed_noise = torch.randn(64, LATENT_DIM, 1, 1, device=device)

## Training Loop
Three epochs finish within the GPU runtime limit. Increase `NUM_EPOCHS` for stronger generators when working offline or scheduling longer runs.

### Training Strategy Notes
- **Label smoothing:** Discriminator real labels use 0.9 and fake 0.1 to temper gradients and curb overconfidence.
- **Preview snapshots:** The loop captures up to `MAX_PREVIEW_SNAPSHOTS` grids so we can inspect quality progression inline.
- **Scheduler:** A mild exponential decay keeps learning stable across epochs; tweak `G_LR`, `D_LR`, or the schedulers to explore TTUR or longer runs.
- **Extend depth:** Increase `GEN_FEATURES`/`DISC_FEATURES` for deeper models when GPU time budget allows.
- **Further upgrades:** Consider feature matching, perceptual losses, or EMA on the generator weights as next experiments when iterating.

In [None]:
NUM_EPOCHS = 3
LOG_INTERVAL = 100
CHECKPOINT_DIR = Path('/kaggle/working/checkpoints')
SAMPLES_DIR = Path('/kaggle/working/samples')
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
SAMPLES_DIR.mkdir(parents=True, exist_ok=True)

best_g_loss = math.inf
history = []
preview_history = []

for epoch in range(1, NUM_EPOCHS + 1):
    generator.train()
    discriminator.train()
    for step, (real_imgs, _) in enumerate(data_loader, start=1):
        real_imgs = real_imgs.to(device)
        batch_size = real_imgs.size(0)

        noise = torch.randn(batch_size, LATENT_DIM, 1, 1, device=device)
        fake_imgs = generator(noise).detach()
        optimizer_d.zero_grad(set_to_none=True)
        real_targets = torch.full((batch_size,), REAL_LABEL, device=device)
        fake_targets = torch.full((batch_size,), FAKE_LABEL, device=device)
        real_logits = discriminator(real_imgs)
        fake_logits = discriminator(fake_imgs)
        loss_d = criterion(real_logits, real_targets) + criterion(fake_logits, fake_targets)
        loss_d.backward()
        optimizer_d.step()

        noise = torch.randn(batch_size, LATENT_DIM, 1, 1, device=device)
        optimizer_g.zero_grad(set_to_none=True)
        generated = generator(noise)
        gen_logits = discriminator(generated)
        gen_targets = torch.full((batch_size,), GEN_TARGET, device=device)
        loss_g = criterion(gen_logits, gen_targets)
        loss_g.backward()
        optimizer_g.step()

        if step % LOG_INTERVAL == 0:
            print(f'Epoch {epoch:03d}/{NUM_EPOCHS} | Step {step:04d}/{len(data_loader)} | D: {loss_d.item():.3f} | G: {loss_g.item():.3f}')
            with torch.no_grad():
                preview = generator(fixed_noise).cpu()
                preview_grid = utils.make_grid(preview, nrow=8, normalize=True, value_range=(-1, 1))
                utils.save_image(preview_grid, SAMPLES_DIR / f"epoch_{epoch:03d}_step_{step:04d}.png")
                if len(preview_history) < MAX_PREVIEW_SNAPSHOTS:
                    preview_history.append({"epoch": epoch, "step": step, "grid": preview_grid})

    scheduler_g.step()
    scheduler_d.step()
    mean_g = loss_g.item()
    history.append({"epoch": epoch, "generator_loss": mean_g, "discriminator_loss": loss_d.item()})
    if mean_g < best_g_loss:
        best_g_loss = mean_g
        torch.save(generator.state_dict(), CHECKPOINT_DIR / 'generator_best.pt')
        torch.save(discriminator.state_dict(), CHECKPOINT_DIR / 'discriminator_best.pt')
        print(f'Saved new best generator checkpoint (epoch {epoch}).')

In [None]:
if history:
    history_df = pd.DataFrame(history)
    display(history_df.head())
    ax = history_df.plot(x='epoch', y=['generator_loss', 'discriminator_loss'], marker='o', figsize=(6, 4))
    ax.set_title('Training Losses by Epoch')
    ax.set_xlabel('Epoch')
    ax.set_ylabel('Loss')
    ax.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
else:
    print('Run the training cell first to populate metrics.')

### Preview Evolution
Review a handful of saved generator outputs to gauge visual improvements across training.

In [None]:
if preview_history:
    n_cols = 3
    n_rows = math.ceil(len(preview_history) / n_cols)
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(n_cols * 4, n_rows * 4))
    axes = np.atleast_1d(axes).flatten()
    for ax, snapshot in zip(axes, preview_history):
        ax.imshow(snapshot["grid"].permute(1, 2, 0).numpy())
        ax.axis('off')
        ax.set_title(f"Epoch {snapshot['epoch']} | Step {snapshot['step']}")
    for ax in axes[len(preview_history):]:
        ax.axis('off')
    plt.tight_layout()
    plt.show()
else:
    print('No preview grids captured yet. Run the training cell to populate preview_history.')

## Generate Submission Images
Sampling 8,000 latents matches the competition requirement. Images are streamed to `/kaggle/working/submission_images` and zipped into `images.zip`.

In [None]:
SUBMISSION_IMAGES = 8_000
GEN_BATCH = 64
OUTPUT_ZIP = Path('/kaggle/working/images.zip')
TEMP_IMAGE_DIR = Path('/kaggle/working/submission_images')

checkpoint_path = CHECKPOINT_DIR / 'generator_best.pt'
if checkpoint_path.exists():
    generator.load_state_dict(torch.load(checkpoint_path, map_location=device))
generator.eval()
TEMP_IMAGE_DIR.mkdir(parents=True, exist_ok=True)

with torch.no_grad():
    produced = 0
    progress = tqdm(total=SUBMISSION_IMAGES, desc='Generating Monet-style images')
    while produced < SUBMISSION_IMAGES:
        current = min(GEN_BATCH, SUBMISSION_IMAGES - produced)
        noise = torch.randn(current, LATENT_DIM, 1, 1, device=device)
        fake_batch = generator(noise).cpu()
        for idx in range(current):
            filename = TEMP_IMAGE_DIR / f'monet_{produced + idx:05d}.jpg'
            utils.save_image(fake_batch[idx], filename, normalize=True, value_range=(-1, 1))
        produced += current
        progress.update(current)
    progress.close()

with zipfile.ZipFile(OUTPUT_ZIP, 'w', compression=zipfile.ZIP_DEFLATED) as zf:
    for img_path in sorted(TEMP_IMAGE_DIR.glob('*.jpg')):
        zf.write(img_path, arcname=img_path.name)
print(f'Created submission archive at {OUTPUT_ZIP}')

for img_path in TEMP_IMAGE_DIR.glob('*.jpg'):
    img_path.unlink()
TEMP_IMAGE_DIR.rmdir()

In [None]:
MODEL_EXPORT = Path('/kaggle/working/best_models.zip')
FILES_TO_EXPORT = [
    CHECKPOINT_DIR / 'generator_best.pt',
    CHECKPOINT_DIR / 'discriminator_best.pt'
 ]
 
with zipfile.ZipFile(MODEL_EXPORT, 'w', compression=zipfile.ZIP_DEFLATED) as zf:
    for path in FILES_TO_EXPORT:
        if path.exists():
            zf.write(path, arcname=path.name)
        else:
            print(f'Skipping missing file: {path}')
 
print(f'Packed checkpoints into {MODEL_EXPORT}')

## Next Steps
1. Use `Save & Run All` to generate a notebook version with `images.zip` and `best_models.zip` in the output tab.
2. Submit the archive via `kaggle competitions submit -c gan-getting-started -f images.zip -m "Your message"` or upload it on the competition page.
3. Download `best_models.zip` from the output tab to reuse the trained generator/discriminator in future experiments or publish them as a Kaggle Dataset.
4. Track each submission in a change log (message + settings) so you can correlate leaderboard shifts with hyperparameter tweaks.
5. Iterate on architecture or training schedule as needed for leaderboard improvements.