In [2]:
!pip install -q torch torchvision matplotlib
!pip install -q wandb


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m30.8 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m127.9/127.9 MB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m207.5/207.5 MB[0m [31m8.1 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m21.1/21.1 MB[0m [31m77.4 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25h[31mERROR: pip's 

In [3]:
import os
from PIL import Image
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms


In [4]:
train_path = "/kaggle/input/person-face-sketches/train"
print("Contents of /train:", os.listdir(train_path))

test_path = "/kaggle/input/person-face-sketches/test"
print("Contents of /test:", os.listdir(test_path))


Contents of /train: ['photos', 'sketches']
Contents of /test: ['photos', 'sketches']


In [5]:
class FaceSketchDataset(Dataset):
    def __init__(self, base_path, folders=["train", "test", "val"], transform=None):
        self.photo_paths = []
        self.sketch_paths = []
        self.transform = transform

        for folder in folders:
            photo_dir = os.path.join(base_path, folder, "photos")
            sketch_dir = os.path.join(base_path, folder, "sketches")
            photo_files = sorted(os.listdir(photo_dir))
            sketch_files = sorted(os.listdir(sketch_dir))
            for p, s in zip(photo_files, sketch_files):
                self.photo_paths.append(os.path.join(photo_dir, p))
                self.sketch_paths.append(os.path.join(sketch_dir, s))

    def __len__(self):
        return len(self.photo_paths)

    def __getitem__(self, idx):
        photo = Image.open(self.photo_paths[idx]).convert("RGB")
        sketch = Image.open(self.sketch_paths[idx]).convert("RGB")

        if self.transform:
            photo = self.transform(photo)
            sketch = self.transform(sketch)

        return {"photo": photo, "sketch": sketch}


In [6]:
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

dataset = FaceSketchDataset(
    base_path="/kaggle/input/person-face-sketches",
    folders=["train", "test", "val"],
    transform=transform
)

dataloader = DataLoader(dataset, batch_size=4, shuffle=True, num_workers=2, pin_memory=True)


In [7]:
class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.block = nn.Sequential(
            nn.ReflectionPad2d(1),
            nn.Conv2d(channels, channels, 3),
            nn.InstanceNorm2d(channels),
            nn.ReLU(inplace=True),
            nn.ReflectionPad2d(1),
            nn.Conv2d(channels, channels, 3),
            nn.InstanceNorm2d(channels)
        )

    def forward(self, x):
        return x + self.block(x)

class Generator(nn.Module):
    def __init__(self, in_channels=3, out_channels=3, n_residuals=3):
        super(Generator, self).__init__()
        model = [
            nn.ReflectionPad2d(3),
            nn.Conv2d(in_channels, 64, 7),
            nn.InstanceNorm2d(64),
            nn.ReLU(inplace=True)
        ]

        curr_dim = 64
        for _ in range(2):
            model += [
                nn.Conv2d(curr_dim, curr_dim * 2, 3, 2, 1),
                nn.InstanceNorm2d(curr_dim * 2),
                nn.ReLU(inplace=True)
            ]
            curr_dim *= 2

        for _ in range(n_residuals):
            model += [ResidualBlock(curr_dim)]

        for _ in range(2):
            model += [
                nn.ConvTranspose2d(curr_dim, curr_dim // 2, 3, 2, 1, output_padding=1),
                nn.InstanceNorm2d(curr_dim // 2),
                nn.ReLU(inplace=True)
            ]
            curr_dim //= 2

        model += [
            nn.ReflectionPad2d(3),
            nn.Conv2d(curr_dim, out_channels, 7),
            nn.Tanh()
        ]

        self.model = nn.Sequential(*model)

    def forward(self, x):
        return self.model(x)


In [8]:
class Discriminator(nn.Module):
    def __init__(self, in_channels=3):
        super(Discriminator, self).__init__()
        def block(in_filters, out_filters, normalize=True):
            layers = [nn.Conv2d(in_filters, out_filters, 4, 2, 1)]
            if normalize:
                layers.append(nn.InstanceNorm2d(out_filters))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers

        self.model = nn.Sequential(
            *block(in_channels, 64, normalize=False),
            *block(64, 128),
            *block(128, 256),
            *block(256, 512),
            nn.Conv2d(512, 1, 4, 1, 1)
        )

    def forward(self, x):
        return self.model(x)


In [9]:
from torchvision.utils import save_image
from tqdm import tqdm
import torch.nn as nn
import torch
import os



device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# =============================
# Multi-GPU Model Initialization
# =============================
G_AB = nn.DataParallel(Generator()).to(device)
G_BA = nn.DataParallel(Generator()).to(device)
D_A = nn.DataParallel(Discriminator()).to(device)
D_B = nn.DataParallel(Discriminator()).to(device)

# =============================
# Loss Functions & Optimizers
# =============================
adversarial_loss = nn.MSELoss()
cycle_loss = nn.L1Loss()
identity_loss = nn.L1Loss()

lr = 0.0002
beta1 = 0.5
beta2 = 0.999

optimizer_G = torch.optim.Adam(
    list(G_AB.parameters()) + list(G_BA.parameters()),
    lr=lr, betas=(beta1, beta2)
)
optimizer_D_A = torch.optim.Adam(D_A.parameters(), lr=lr, betas=(beta1, beta2))
optimizer_D_B = torch.optim.Adam(D_B.parameters(), lr=lr, betas=(beta1, beta2))

# =============================
# Folder Setup
# =============================
os.makedirs("generated_images", exist_ok=True)
os.makedirs("saved_models", exist_ok=True)

# =============================
# Helper to Denormalize Images
# =============================
def denormalize(tensor):
    return tensor * 0.5 + 0.5

# =============================
# Resume from Checkpoint Logic
# =============================
resume = True  # 🔄 Toggle resume
checkpoint_path = "/kaggle/input/cyclegan-checkpoint/checkpoint.pth"
start_epoch = 1
num_epochs = 5  # total epochs you want to train

if resume and os.path.exists(checkpoint_path):
    print(f"🔁 Resuming training from {checkpoint_path}")
    checkpoint = torch.load(checkpoint_path)

    G_AB.load_state_dict(checkpoint['G_AB'])
    G_BA.load_state_dict(checkpoint['G_BA'])
    D_A.load_state_dict(checkpoint['D_A'])
    D_B.load_state_dict(checkpoint['D_B'])

    optimizer_G.load_state_dict(checkpoint['opt_G'])
    optimizer_D_A.load_state_dict(checkpoint['opt_D_A'])
    optimizer_D_B.load_state_dict(checkpoint['opt_D_B'])

    start_epoch = checkpoint['epoch'] + 1
else:
    print("🆕 Starting training from scratch.")

# =============================
# Confirm GPU Usage
# =============================
print("🚀 Using GPUs:")
print("GPUs used by G_AB:", G_AB.device_ids)
print("GPUs used by D_A:", D_A.device_ids)

# =============================
# Training Loop
# =============================
real_label = 1.0
fake_label = 0.0

for epoch in range(start_epoch, num_epochs + 1):
    G_losses, D_A_losses, D_B_losses = [], [], []
    print(f"🌟 Epoch {epoch}/{num_epochs}")
    loop = tqdm(enumerate(dataloader), total=len(dataloader), desc=f"Epoch {epoch}")

    for i, batch in loop:
        real_A = batch["photo"].to(device)
        real_B = batch["sketch"].to(device)

        #### -------- Train Generators -------- ####
        optimizer_G.zero_grad()

        same_B = G_AB(real_B)
        loss_identity_B = identity_loss(same_B, real_B) * 5.0

        same_A = G_BA(real_A)
        loss_identity_A = identity_loss(same_A, real_A) * 5.0

        fake_B = G_AB(real_A)
        pred_fake_B = D_B(fake_B)
        loss_GAN_AB = adversarial_loss(pred_fake_B, torch.ones_like(pred_fake_B).to(device))

        fake_A = G_BA(real_B)
        pred_fake_A = D_A(fake_A)
        loss_GAN_BA = adversarial_loss(pred_fake_A, torch.ones_like(pred_fake_A).to(device))

        recov_A = G_BA(fake_B)
        loss_cycle_A = cycle_loss(recov_A, real_A) * 10.0

        recov_B = G_AB(fake_A)
        loss_cycle_B = cycle_loss(recov_B, real_B) * 10.0

        # Total Generator Loss
        loss_G = (loss_identity_A + loss_identity_B +
                  loss_GAN_AB + loss_GAN_BA +
                  loss_cycle_A + loss_cycle_B)
        loss_G.backward()
        optimizer_G.step()

        #### -------- Train Discriminator A -------- ####
        optimizer_D_A.zero_grad()
        pred_real_A = D_A(real_A)
        loss_real_A = adversarial_loss(pred_real_A, torch.ones_like(pred_real_A).to(device))

        pred_fake_A = D_A(fake_A.detach())
        loss_fake_A = adversarial_loss(pred_fake_A, torch.zeros_like(pred_fake_A).to(device))

        loss_D_A = 0.5 * (loss_real_A + loss_fake_A)
        loss_D_A.backward()
        optimizer_D_A.step()

        #### -------- Train Discriminator B -------- ####
        optimizer_D_B.zero_grad()
        pred_real_B = D_B(real_B)
        loss_real_B = adversarial_loss(pred_real_B, torch.ones_like(pred_real_B).to(device))

        pred_fake_B = D_B(fake_B.detach())
        loss_fake_B = adversarial_loss(pred_fake_B, torch.zeros_like(pred_fake_B).to(device))

        loss_D_B = 0.5 * (loss_real_B + loss_fake_B)
        loss_D_B.backward()
        optimizer_D_B.step()

        G_losses.append(loss_G.item())
        D_A_losses.append(loss_D_A.item())
        D_B_losses.append(loss_D_B.item())

        loop.set_postfix({
            "Loss_G": f"{loss_G.item():.4f}",
            "Loss_D_A": f"{loss_D_A.item():.4f}",
            "Loss_D_B": f"{loss_D_B.item():.4f}"
        })

    #### Save Output Images ####
    save_image(denormalize(fake_B), f"generated_images/epoch{epoch}_fake_sketch.png")
    save_image(denormalize(fake_A), f"generated_images/epoch{epoch}_fake_photo.png")

    #### Save Checkpoint ####
    checkpoint = {
        'epoch': epoch,
        'G_AB': G_AB.state_dict(),
        'G_BA': G_BA.state_dict(),
        'D_A': D_A.state_dict(),
        'D_B': D_B.state_dict(),
        'opt_G': optimizer_G.state_dict(),
        'opt_D_A': optimizer_D_A.state_dict(),
        'opt_D_B': optimizer_D_B.state_dict()
    }
    torch.save(checkpoint, "saved_models/checkpoint_latest.pth")

    print(f"\n✅ Epoch {epoch} completed ➤ "
          f"Generator: {sum(G_losses)/len(G_losses):.4f}, "
          f"D_A: {sum(D_A_losses)/len(D_A_losses):.4f}, "
          f"D_B: {sum(D_B_losses)/len(D_B_losses):.4f}\n")


🆕 Starting training from scratch.
🚀 Using GPUs:
GPUs used by G_AB: [0, 1]
GPUs used by D_A: [0, 1]
🌟 Epoch 1/5


Epoch 1: 100%|██████████| 5584/5584 [27:07<00:00,  3.43it/s, Loss_G=3.3378, Loss_D_A=0.1282, Loss_D_B=0.0957]



✅ Epoch 1 completed ➤ Generator: 4.1273, D_A: 0.1704, D_B: 0.1393

🌟 Epoch 2/5


Epoch 2: 100%|██████████| 5584/5584 [27:02<00:00,  3.44it/s, Loss_G=5.3969, Loss_D_A=0.0996, Loss_D_B=0.0402]



✅ Epoch 2 completed ➤ Generator: 3.6981, D_A: 0.1435, D_B: 0.0819

🌟 Epoch 3/5


Epoch 3: 100%|██████████| 5584/5584 [27:02<00:00,  3.44it/s, Loss_G=3.7132, Loss_D_A=0.0881, Loss_D_B=0.0042]



✅ Epoch 3 completed ➤ Generator: 3.5758, D_A: 0.1313, D_B: 0.0590

🌟 Epoch 4/5


Epoch 4: 100%|██████████| 5584/5584 [27:01<00:00,  3.44it/s, Loss_G=2.5749, Loss_D_A=0.1358, Loss_D_B=0.1482]



✅ Epoch 4 completed ➤ Generator: 3.5170, D_A: 0.1232, D_B: 0.0420

🌟 Epoch 5/5


Epoch 5: 100%|██████████| 5584/5584 [27:02<00:00,  3.44it/s, Loss_G=2.9121, Loss_D_A=0.1079, Loss_D_B=0.0287]



✅ Epoch 5 completed ➤ Generator: 3.4390, D_A: 0.1180, D_B: 0.0366



In [11]:
torch.save(G_AB.module.state_dict(), "G_AB_trained.pth")
torch.save(G_BA.module.state_dict(), "G_BA_trained.pth")


In [14]:
import torch
from torchvision import transforms
from torchvision.utils import save_image
from PIL import Image

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load generator safely (weights only)
generator = Generator().to(device)
generator.load_state_dict(
    torch.load("G_AB_trained.pth", map_location=device, weights_only=True)
)
generator.eval()

# Same transforms used during training
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

# Convert image
def generate_sketch(input_image_path, output_image_path):
    image = Image.open(input_image_path).convert("RGB")
    image = transform(image).unsqueeze(0).to(device)
    with torch.no_grad():
        output = generator(image)
    output = output.squeeze().cpu() * 0.5 + 0.5
    save_image(output, output_image_path)


In [16]:
from flask import Flask, request, send_file
from io import BytesIO

app = Flask(__name__)

@app.route("/convert", methods=["POST"])
def convert_image():
    file = request.files['file']
    img = Image.open(file).convert("RGB")
    img = transform(img).unsqueeze(0).to(device)
    with torch.no_grad():
        output = generator(img)
    output = output.squeeze().cpu() * 0.5 + 0.5

    buffer = BytesIO()
    transforms.ToPILImage()(output).save(buffer, format="PNG")
    buffer.seek(0)
    return send_file(buffer, mimetype='image/png')

if __name__ == "__main__":
    app.run()


 * Serving Flask app '__main__'
 * Debug mode: off
