# Generate Drawings from Checkpoint and Zip Results

This notebook loads a trained checkpoint, generates drawings from all image files, and creates a zip file with the results.


In [1]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from PIL import Image
import os
import zipfile
from tqdm import tqdm
import numpy as np

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


Using device: cpu


## Model Architecture (from original notebook)


In [2]:
class Downblock(nn.Module):
    def __init__(
        self,
        # dims
        in_ch, out_ch,
        # conv
        kernel_size=3, stride=1, padding=0, use_bias=True,
        # norm
        apply_norm=True, use_inst_norm=True,
        # activation
        activation='relu',
        # dropout
        dropout_ratio=0.0
        ):
        super().__init__()
        layers = []

        # 1. conv (better padding)
        layers.append(nn.ReflectionPad2d(padding))
        layers.append(nn.Conv2d(
            in_ch, out_ch,
            kernel_size, stride, 0,
            bias=use_bias,
            padding_mode='reflect'
        ))

        # 2. norm
        if apply_norm:
            if use_inst_norm:
                layers.append(nn.InstanceNorm2d(out_ch))
            else:
                layers.append(nn.BatchNorm2d(out_ch))

        # 3. activation
        if activation != '':
            if activation == 'gelu':
                layers.append(nn.GELU())
            elif activation == 'leaky':
                layers.append(nn.LeakyReLU(negative_slope=0.2, inplace=True))
            else:
                layers.append(nn.ReLU(inplace=True))

        # 4. dropout
        if dropout_ratio > 0.0:
            layers.append(nn.Dropout(dropout_ratio))

        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

In [3]:
class Upblock(nn.Module):
    def __init__(
        self,
        # dims
        in_ch, out_ch,
        # conv transpose
        kernel_size=3, stride=2, padding=1, output_padding=1, use_bias=True,
        # norm
        apply_norm=True, use_inst_norm=True,
        # activation
        activation='gelu',
        # dropout
        dropout_ratio=0.5,
        # UNet specific
        is_pre_activation=False
        ):
        super().__init__()
        layers = []

        # 1. conv transpose
        layers.append(nn.ConvTranspose2d(
            in_ch, out_ch,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding,
            output_padding=output_padding,
            bias=use_bias
        ))

        # 1. upsample + conv (checkerboard fix)
        # layers.append(nn.Upsample(scale_factor=stride, mode='bilinear'))
        # layers.append(nn.ReflectionPad2d(padding))
        # layers.append(nn.Conv2d(
        #     in_ch, out_ch,
        #     kernel_size=kernel_size,
        #     stride=1,
        #     padding=0,
        #     bias=use_bias
        # ))


        # 2. norm
        if apply_norm:
            if use_inst_norm:
                layers.append(nn.InstanceNorm2d(out_ch))
            else:
                layers.append(nn.BatchNorm2d(out_ch))

        # 3. activation
        if activation != '':
            if activation == 'gelu':
                act = nn.GELU()
            else:
                act = nn.ReLU(inplace=True)

            if is_pre_activation:
                layers.insert(1, act)
            else:
                layers.append(act)

        # 4. Dropout
        if dropout_ratio > 0.0:
            layers.append(nn.Dropout(dropout_ratio))

        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

In [4]:
class UNetGenerator(nn.Module):
    def __init__(self, in_ch=3, out_ch=3, num_downs=8, dropout=0.5):
        super().__init__()

        self.num_downs = num_downs

        base_ch = 64

        # Encoder/Decoder channel progression
        enc_in_chs = [in_ch] + [base_ch * min(2**i, 8) for i in range(num_downs-1)]
        enc_out_chs = [base_ch * min(2**i, 8) for i in range(num_downs)]

        dec_in_chs = [enc_out_chs[::-1][0]] + [c * 2 for c in enc_out_chs[::-1][1:]]
        dec_out_chs = enc_in_chs[::-1]

        # Encoder
        self.encoder = nn.ModuleList()
        for i in range(num_downs):
            use_norm = i > 0 and i < num_downs - 1 # No norm in first layer or last layer (bottleneck)
            activation = 'leaky' if i != num_downs - 1 else 'relu'
            self.encoder.append(
                Downblock(
                    in_ch=enc_in_chs[i], out_ch=enc_out_chs[i],
                    kernel_size=4, stride=2, padding=1, use_bias=False,
                    apply_norm=use_norm, use_inst_norm=True,
                    activation=activation,
                    dropout_ratio=0.0
                )
            )

        # Decoder
        self.decoder = nn.ModuleList()
        for i in range(num_downs):
            use_dropout = i < 3 # First 3 decoder layers have dropout
            apply_norm = i != num_downs - 1 # No norm in final layer
            activation = '' if i == num_downs - 1 else 'relu' # No activation in final layer
            self.decoder.append(
                Upblock(
                    in_ch=dec_in_chs[i], out_ch=dec_out_chs[i],
                    kernel_size=4, stride=2, padding=1, output_padding=0, use_bias=False,
                    apply_norm=apply_norm, use_inst_norm=True,
                    activation=activation,
                    dropout_ratio=dropout if use_dropout else 0.0,
                    is_pre_activation=True
                )
            )

        self.tanh = nn.Tanh()

    def forward(self, x):
        # Encoder output for skip connections
        encoder_outputs = []

        # Encoder forward pass
        current = x
        for encoder_block in self.encoder:
            current = encoder_block(current)
            encoder_outputs.append(current)

        # Decoder forward pass with skip connections
        for i, decoder_block in enumerate(self.decoder):
            current = decoder_block(current)
            skip_idx = len(encoder_outputs) - 1 - i - 1
            if skip_idx >= 0:
                current = torch.cat([current, encoder_outputs[skip_idx]], dim=1)

        # Tanh activation at the end
        output = self.tanh(current)

        return output

## Configuration


In [5]:
# Path to the checkpoint file
CHECKPOINT_PATH = "/kaggle/input/final-2/cyclegan_final (1).pth"  # Update this to your checkpoint path

# Directory containing input images
# INPUT_IMAGE_DIR = "/kaggle/input/gan-getting-started/photo_jpg"  # Update this to your image directory
INPUT_IMAGE_DIR = "/kaggle/input/gan-getting-started/photo_jpg"  # Update this to your image directory

# Output zip file name
ZIP_FILE_NAME = "/kaggle/working/images.zip"

# Image size (should match training size)
IMAGE_SIZE = (256, 256)


In [6]:
!ls /kaggle/input/

final-1  final-2  gan-getting-started  test-checkpoint


In [7]:
!ls /kaggle/working

## Load Checkpoint and Initialize Model


In [8]:
def load_checkpoint(ckpt_path, map_location=None):
    """Load checkpoint from file"""
    ckpt = torch.load(ckpt_path, map_location=map_location)
    print(f'[*] Loading checkpoint from {ckpt_path} succeed!')
    return ckpt

# Initialize generator
generator = UNetGenerator(in_ch=3, out_ch=3, num_downs=8, dropout=0.5).to(device)

# Load checkpoint
checkpoint = load_checkpoint(CHECKPOINT_PATH, map_location=device)

# Load generator weights
if 'g_ptm' in checkpoint:
    generator.load_state_dict(checkpoint['g_ptm'])
    print("Generator weights loaded successfully!")
else:
    print("Warning: 'g_ptm' key not found in checkpoint. Trying direct load...")
    generator.load_state_dict(checkpoint)

# Set to evaluation mode
generator.eval()
print("Model ready for inference!")


[*] Loading checkpoint from /kaggle/input/final-2/cyclegan_final (1).pth succeed!
Generator weights loaded successfully!
Model ready for inference!


## Generate Drawings from All Images


In [9]:
!rm -rf /kaggle/working/*.jpg

In [10]:
# Define image transform (same as training)
transform = transforms.Compose([
    transforms.Resize(IMAGE_SIZE),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Get all image files
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif')
image_files = [f for f in os.listdir(INPUT_IMAGE_DIR) 
               if f.lower().endswith(image_extensions)]

print(f"Found {len(image_files)} image files to process")

# Process each image
generated_files = []
with torch.no_grad():
    for img_file in tqdm(image_files, desc="Generating drawings"):
        try:
            # Load and preprocess image
            img_path = os.path.join(INPUT_IMAGE_DIR, img_file)
            img = Image.open(img_path).convert('RGB')
            img_tensor = transform(img).unsqueeze(0).to(device)
            
            # Generate drawing
            generated = generator(img_tensor)
            
            # Convert back to PIL Image
            generated = generated.squeeze(0).cpu()
            # Denormalize from [-1, 1] to [0, 1]
            generated = generated * 0.5 + 0.5
            generated = torch.clamp(generated, 0.0, 1.0)
            
            # Convert to PIL and save
            to_pil = transforms.ToPILImage()
            generated_pil = to_pil(generated)
            
            # Save with same name as input
            output_path = img_file
            generated_pil.save(output_path)
            generated_files.append(output_path)
            
        except Exception as e:
            print(f"Error processing {img_file}: {str(e)}")
            continue

print(f"\nGenerated {len(generated_files)} drawings successfully!")


Found 7038 image files to process


Generating drawings:   0%|          | 25/7038 [00:06<31:19,  3.73it/s]


KeyboardInterrupt: 

## Zip Generated Images


In [60]:
# Create zip file
print(f"Creating zip file: {ZIP_FILE_NAME}")
with zipfile.ZipFile(ZIP_FILE_NAME, 'w', zipfile.ZIP_DEFLATED) as zipf:
    for file_path in tqdm(generated_files, desc="Adding files to zip"):
        # Get just the filename for the zip archive
        arcname = os.path.basename(file_path)
        zipf.write(file_path, arcname)

# Get zip file size
zip_size = os.path.getsize(ZIP_FILE_NAME) / (1024 * 1024)  # Size in MB
print(f"\nZip file created successfully!")
print(f"Zip file: {ZIP_FILE_NAME}")
print(f"Zip file size: {zip_size:.2f} MB")
print(f"Number of files in zip: {len(generated_files)}")


Creating zip file: /kaggle/working/images.zip


Adding files to zip: 100%|██████████| 7038/7038 [00:03<00:00, 2032.54it/s]



Zip file created successfully!
Zip file: /kaggle/working/images.zip
Zip file size: 94.86 MB
Number of files in zip: 7038


## Optional: Clean up temporary output directory

Uncomment the cell below if you want to remove the temporary output directory after zipping.


In [None]:
# Uncomment to remove the temporary output directory
# import shutil
# if os.path.exists(OUTPUT_DIR):
#     shutil.rmtree(OUTPUT_DIR)
#     print(f"Removed temporary directory: {OUTPUT_DIR}")
