In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Prepare the data and their labels for the CycleGan
## The data format required is pngs
## The following code assumes the original data format is npys

In [None]:
# === Imports ===
import os
import numpy as np
from PIL import Image

# === Utility Function ===
def normalize_img(arr):
    arr = arr.astype(np.float32)
    arr = (arr - arr.min()) / (arr.max() - arr.min() + 1e-8)
    return (arr * 255).astype(np.uint8)

# === User Input Required: Define Dataset Paths ===
# Please update these paths to point to your own dataset locations

# Module A (e.g., ACDC)
module_a_images_path = "/path/to/module_a/images"  # e.g., npy images
module_a_masks_path = "/path/to/module_a/masks"    # e.g., corresponding masks

# Module B (e.g., EMIDEC)
module_b_images_path = "/path/to/module_b/images"  # e.g., npy images
module_b_masks_path = "/path/to/module_b/masks"    # e.g., corresponding masks

# Output Directory (default: working directory)
output_root = "/path/to/output/directory"  # e.g., ./cyclegan_data

# === Output Subdirectories (automatically created) ===
trainA_path = os.path.join(output_root, "trainA")
trainB_path = os.path.join(output_root, "trainB")
module_a_masks_output = os.path.join(output_root, "module_a/masks")
module_b_masks_output = os.path.join(output_root, "module_b/masks")

os.makedirs(trainA_path, exist_ok=True)
os.makedirs(trainB_path, exist_ok=True)
os.makedirs(module_a_masks_output, exist_ok=True)
os.makedirs(module_b_masks_output, exist_ok=True)

# === Process Module A ===
a_count = 0
for idx, file in enumerate(sorted(os.listdir(module_a_images_path))):
    if not file.endswith(".npy"):
        continue

    arr = np.load(os.path.join(module_a_images_path, file))
    norm_arr = normalize_img(arr)
    img = Image.fromarray(norm_arr)

    output_filename = f"{idx:04d}.png"
    img.save(os.path.join(trainA_path, output_filename))

    # Adjust based on your naming convention
    ending = file.replace("merged_", "")
    mask_name = f"custom_image_{ending}"
    mask_path = os.path.join(module_a_masks_path, mask_name)

    mask = np.load(mask_path)
    mask_output_filename = f"{idx:04d}.npy"
    np.save(os.path.join(module_a_masks_output, mask_output_filename), mask)

    a_count += 1

print(f"Total images saved in trainA (Module A): {a_count}")
print(f"Total masks saved in module_a/masks (Module A): {a_count}")

# === Process Module B ===
b_count = 0
for idx, file in enumerate(sorted(os.listdir(module_b_images_path))):
    if not file.endswith(".npy"):
        continue

    arr = np.load(os.path.join(module_b_images_path, file))
    norm_arr = normalize_img(arr)
    img = Image.fromarray(norm_arr)

    output_filename = f"{idx:04d}.png"
    img.save(os.path.join(trainB_path, output_filename))

    mask_filename = file
    mask_path = os.path.join(module_b_masks_path, mask_filename)
    mask = np.load(mask_path)

    mask_output_filename = f"{idx:04d}.npy"
    np.save(os.path.join(module_b_masks_output, mask_output_filename), mask)

    b_count += 1

print(f"Total images saved in trainB (Module B): {b_count}")
print(f"Total masks saved in module_b/masks (Module B): {b_count}")


### Augment and upsample module B (emidec) >>> images and masks

In [None]:
# === Imports ===
import os
from PIL import Image, ImageOps

# === User Input Required: Define Dataset Path ===
# Please update this path to your own trainB directory
trainB_path = "/path/to/trainB"  # e.g., "./cyclegan_data/trainB"

# === Horizontal and Vertical Flipping Augmentation ===

# List original image files
image_files = sorted(os.listdir(trainB_path))
original_count = len(image_files)

augmented_count = 0
for file in image_files:
    img_path = os.path.join(trainB_path, file)
    img = Image.open(img_path)

    # Horizontal flip
    img_h = ImageOps.mirror(img)
    img_h.save(os.path.join(trainB_path, f"{file[:-4]}_h.png"))
    augmented_count += 1

    # Vertical flip
    img_v = ImageOps.flip(img)
    img_v.save(os.path.join(trainB_path, f"{file[:-4]}_v.png"))
    augmented_count += 1

print(f"Performed horizontal and vertical flips on {original_count} images, generating {augmented_count} augmented images.")

# === Rotation-Based Augmentation to Reach Target Count ===

# Settings
# Please set your desired target count below
target_count = 2154  # e.g., desired number of images after augmentation
rotation_angles = [90, 180, 270, 45, 135, 30, 60]  # angles for rotation augmentation

# Get updated list of images after flips
image_files = sorted(os.listdir(trainB_path))
current_count = len(image_files)
augment_index = 0

print(f"Starting rotation augmentation with {current_count} images...")

while current_count < target_count:
    file = image_files[augment_index % len(image_files)]
    img_path = os.path.join(trainB_path, file)
    img = Image.open(img_path)

    for angle in rotation_angles:
        if current_count >= target_count:
            break
        rotated = img.rotate(angle)
        new_filename = f"{file[:-4]}_r{angle}_{augment_index}.png"
        rotated.save(os.path.join(trainB_path, new_filename))
        current_count += 1

    augment_index += 1

print(f"Rotation augmentation completed. Final total images: {current_count}")


In [None]:
# === Imports ===
import os
import numpy as np

# === User Input Required: Define Paths ===
# Please update these paths to your dataset directories

# Directory containing trainB images
trainB_path = "/path/to/trainB"  # e.g., "./cyclegan_data/trainB"

# Directory containing masks corresponding to trainB images
masks_path = "/path/to/module_b/masks"  # e.g., "./module_b/masks"

# === Settings ===
# Desired total number of images/masks after augmentation
target_count = 2154  # set as per your dataset requirement

# Rotation angles to apply for augmentation
rotation_angles = [90, 180, 270, 45, 135, 30, 60]

# === Get original mask filenames ===
mask_files = sorted([f for f in os.listdir(masks_path) if f.endswith(".npy")])
current_count = len(os.listdir(trainB_path))  # image count for loop control
augment_index = 0

print(f"Starting with {len(mask_files)} masks to match {current_count} images...")

# === Horizontal & Vertical Flipping Augmentation ===
for mask_file in mask_files:
    mask_path = os.path.join(masks_path, mask_file)
    mask = np.load(mask_path)

    # Flip horizontally
    mask_h = np.fliplr(mask)
    new_name_h = f"{mask_file[:-4]}_h.npy"
    np.save(os.path.join(masks_path, new_name_h), mask_h)

    # Flip vertically
    mask_v = np.flipud(mask)
    new_name_v = f"{mask_file[:-4]}_v.npy"
    np.save(os.path.join(masks_path, new_name_v), mask_v)

print("Horizontal & vertical flips done.")

# === Rotation-Based Augmentation to Reach Target Count ===

# Get updated list of mask files (original + flips)
mask_files = sorted([f for f in os.listdir(masks_path) if f.endswith(".npy")])
current_image_count = len(os.listdir(trainB_path))
augment_index = 0

print(f"Starting rotations to reach {target_count} total images...")

while current_image_count < target_count:
    mask_file = mask_files[augment_index % len(mask_files)]
    mask_path = os.path.join(masks_path, mask_file)
    mask = np.load(mask_path)

    for angle in rotation_angles:
        if current_image_count >= target_count:
            break

        # Rotate mask using np.rot90 for 90-degree multiples, else use scipy.ndimage.rotate
        if angle in [90, 180, 270]:
            k = angle // 90
            rotated = np.rot90(mask, k)
        else:
            # For non-90 rotations, use scipy.ndimage.rotate with order=0 to preserve mask labels
            from scipy.ndimage import rotate
            rotated = rotate(mask, angle, reshape=False, order=0, mode='nearest')

        new_name = f"{mask_file[:-4]}_r{angle}_{augment_index}.npy"
        np.save(os.path.join(masks_path, new_name), rotated)

        current_image_count += 1

    augment_index += 1

print(f"Done! Final mask count matches or exceeds trainB image count: {current_image_count}")


### Augment and replace A (ACDC) >>>images and masks

In [None]:
# === Imports ===
import os
import random
import numpy as np
from PIL import Image, ImageOps

# === User Input Required: Define Paths ===
# Please update these paths to your dataset directories

# Directory containing trainA images
trainA_path = "/path/to/trainA"  # e.g., "./cyclegan_data/trainA"

# Directory containing masks corresponding to trainA images
masksA_path = "/path/to/module_a/masks"  # e.g., "./module_a/masks"

# === Settings ===
rotation_angles = [90, 180, 270, 45, 135, 30, 60]
augmentation_choices = ['original', 'flip_h', 'flip_v', 'rotate']

# === Get all .png files ===
image_files = sorted([f for f in os.listdir(trainA_path) if f.endswith('.png')])

print(f"Original number of samples in trainA: {len(image_files)}")

# === Temp directories to save augmented data ===
temp_images = trainA_path + "_temp"
temp_masks = masksA_path + "_temp"

os.makedirs(temp_images, exist_ok=True)
os.makedirs(temp_masks, exist_ok=True)

# === Process each pair ===
for idx, file in enumerate(image_files):
    # Load image
    img_path = os.path.join(trainA_path, file)
    img = Image.open(img_path)

    # Load matching mask (.npy)
    mask_stem = file[:-4]  # remove .png extension
    mask_path = os.path.join(masksA_path, f"{mask_stem}.npy")
    mask = np.load(mask_path)

    # Randomly choose augmentation type
    choice = random.choice(augmentation_choices)

    # === Apply augmentation to image ===
    if choice == 'flip_h':
        img_aug = ImageOps.mirror(img)
    elif choice == 'flip_v':
        img_aug = ImageOps.flip(img)
    elif choice == 'rotate':
        angle = random.choice(rotation_angles)
        img_aug = img.rotate(angle)
    else:
        img_aug = img  # original (no augmentation)

    # === Apply the same augmentation to mask ===
    mask_aug = mask.copy()
    if choice == 'flip_h':
        mask_aug = np.fliplr(mask_aug)
    elif choice == 'flip_v':
        mask_aug = np.flipud(mask_aug)
    elif choice == 'rotate':
        if angle % 90 == 0:
            k = angle // 90
            mask_aug = np.rot90(mask_aug, k=k)
        else:
            # For non-90-degree angles, use PIL for rotation with nearest interpolation
            mask_pil = Image.fromarray(mask_aug)
            mask_pil = mask_pil.rotate(angle, resample=Image.NEAREST)
            mask_aug = np.array(mask_pil)

    # === Save augmented image and mask ===
    img_aug.save(os.path.join(temp_images, f"{mask_stem}.png"))
    np.save(os.path.join(temp_masks, f"{mask_stem}.npy"), mask_aug)

# === Replace old folders with augmented data ===
import shutil
shutil.rmtree(trainA_path)
shutil.rmtree(masksA_path)

os.rename(temp_images, trainA_path)
os.rename(temp_masks, masksA_path)

print(f"✅ Augmentation done & replaced: {len(image_files)} pairs")


In [None]:
import os

def count_files_in_folder(folder_path):
    return sum(1 for entry in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, entry)))

# Example usage
folder_path = # path
file_count = count_files_in_folder(folder_path)
print(f"Number of files in '{folder_path}': {file_count}")

folder_path = # path
file_count = count_files_in_folder(folder_path)
print(f"Number of files in '{folder_path}': {file_count}")


# Install CycleGan and apply cutsom weighted loss by applying masks as weights

In [None]:
!git clone https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix
%cd pytorch-CycleGAN-and-pix2pix
!pip install -r requirements.txt


In [None]:
# Path to the file you want to overwrite
#example
file_path = "...../pytorch-CycleGAN-and-pix2pix/models/cycle_gan_model.py"

# Your new content as a single multiline string (replace this with your actual content)
new_content = """
# Put your full new file content here
import os
import numpy as np
import torch
import itertools
from util.image_pool import ImagePool
from .base_model import BaseModel
from . import networks


class CycleGANModel(BaseModel):
    @staticmethod
    def modify_commandline_options(parser, is_train=True):
        parser.set_defaults(no_dropout=True)
        if is_train:
            parser.add_argument('--lambda_A', type=float, default=10.0)
            parser.add_argument('--lambda_B', type=float, default=10.0)
            parser.add_argument('--lambda_identity', type=float, default=0.5)
        return parser

    def __init__(self, opt):
        BaseModel.__init__(self, opt)
        self.loss_names = ['D_A', 'G_A', 'cycle_A', 'idt_A',
                           'D_B', 'G_B', 'cycle_B', 'idt_B']
        visual_names_A = ['real_A', 'fake_B', 'rec_A']
        visual_names_B = ['real_B', 'fake_A', 'rec_B']
        if self.isTrain and self.opt.lambda_identity > 0.0:
            visual_names_A.append('idt_B')
            visual_names_B.append('idt_A')
        self.visual_names = visual_names_A + visual_names_B

        if self.isTrain:
            self.model_names = ['G_A', 'G_B', 'D_A', 'D_B']
        else:
            self.model_names = ['G_A', 'G_B']

        self.netG_A = networks.define_G(opt.input_nc, opt.output_nc, opt.ngf,
                                        opt.netG, opt.norm, not opt.no_dropout,
                                        opt.init_type, opt.init_gain, self.gpu_ids)
        self.netG_B = networks.define_G(opt.output_nc, opt.input_nc, opt.ngf,
                                        opt.netG, opt.norm, not opt.no_dropout,
                                        opt.init_type, opt.init_gain, self.gpu_ids)

        if self.isTrain:
            self.netD_A = networks.define_D(opt.output_nc, opt.ndf, opt.netD,
                                            opt.n_layers_D, opt.norm,
                                            opt.init_type, opt.init_gain, self.gpu_ids)
            self.netD_B = networks.define_D(opt.input_nc, opt.ndf, opt.netD,
                                            opt.n_layers_D, opt.norm,
                                            opt.init_type, opt.init_gain, self.gpu_ids)

            if opt.lambda_identity > 0.0:
                assert (opt.input_nc == opt.output_nc)

            self.fake_A_pool = ImagePool(opt.pool_size)
            self.fake_B_pool = ImagePool(opt.pool_size)

            self.criterionGAN = networks.GANLoss(opt.gan_mode).to(self.device)
            self.criterionCycle = torch.nn.L1Loss()
            self.criterionIdt = torch.nn.L1Loss()

            self.optimizer_G = torch.optim.Adam(
                itertools.chain(self.netG_A.parameters(), self.netG_B.parameters()),
                lr=opt.lr, betas=(opt.beta1, 0.999)
            )
            self.optimizer_D = torch.optim.Adam(
                itertools.chain(self.netD_A.parameters(), self.netD_B.parameters()),
                lr=opt.lr, betas=(opt.beta1, 0.999)
            )
            self.optimizers.append(self.optimizer_G)
            self.optimizers.append(self.optimizer_D)

    def weighted_cycle_loss(self, pred, target, mask=None):
        
        loss = torch.abs(pred - target)
        if mask is not None:
            mask = mask.to(loss.device).float()
            loss = loss * mask
        return loss.mean()

    def set_input(self, input):
        AtoB = self.opt.direction == 'AtoB'
        self.real_A = input['A' if AtoB else 'B'].to(self.device)
        self.real_B = input['B' if AtoB else 'A'].to(self.device)
        self.image_paths = input['A_paths' if AtoB else 'B_paths']

        if AtoB:
            # Load Module A mask
            mask_dir = "/kaggle/working/module_a/masks"
            masks = []
            for path in self.image_paths:
                filename = os.path.basename(path)
                filename_stem = os.path.splitext(filename)[0]
                mask_path = os.path.join(mask_dir, f"{filename_stem}.npy")
                if os.path.exists(mask_path):
                    mask = np.load(mask_path)
                else:
                    # fallback: all ones means no weighting
                    mask = np.ones_like(self.real_A[0].cpu().numpy()[0])
                mask_tensor = torch.from_numpy(mask).unsqueeze(0).unsqueeze(0).float()
                masks.append(mask_tensor)
            self.real_A_mask = torch.cat(masks, dim=0).to(self.device)
            self.real_B_mask = None
        else:
            # Load Module B mask
            mask_dir = "/kaggle/working/module_b/masks"
            masks = []
            for path in self.image_paths:
                filename = os.path.basename(path)
                filename_stem = os.path.splitext(filename)[0]
                mask_path = os.path.join(mask_dir, f"{filename_stem}.npy")
                if os.path.exists(mask_path):
                    mask = np.load(mask_path)
                else:
                    mask = np.ones_like(self.real_B[0].cpu().numpy()[0])
                mask_tensor = torch.from_numpy(mask).unsqueeze(0).unsqueeze(0).float()
                masks.append(mask_tensor)
            self.real_B_mask = torch.cat(masks, dim=0).to(self.device)
            self.real_A_mask = None

    def forward(self):
       
        self.fake_B = self.netG_A(self.real_A)
        self.rec_A = self.netG_B(self.fake_B)
        self.fake_A = self.netG_B(self.real_B)
        self.rec_B = self.netG_A(self.fake_A)

    def backward_D_basic(self, netD, real, fake):
   
        pred_real = netD(real)
        loss_D_real = self.criterionGAN(pred_real, True)
        pred_fake = netD(fake.detach())
        loss_D_fake = self.criterionGAN(pred_fake, False)
        loss_D = (loss_D_real + loss_D_fake) * 0.5
        loss_D.backward()
        return loss_D

    def backward_D_A(self):
        fake_B = self.fake_B_pool.query(self.fake_B)
        self.loss_D_A = self.backward_D_basic(self.netD_A, self.real_B, fake_B)

    def backward_D_B(self):
        fake_A = self.fake_A_pool.query(self.fake_A)
        self.loss_D_B = self.backward_D_basic(self.netD_B, self.real_A, fake_A)

    def backward_G(self):
        lambda_idt = self.opt.lambda_identity
        lambda_A = self.opt.lambda_A
        lambda_B = self.opt.lambda_B

        if lambda_idt > 0:
            self.idt_A = self.netG_A(self.real_B)
            self.loss_idt_A = self.criterionIdt(self.idt_A, self.real_B) * lambda_B * lambda_idt
            self.idt_B = self.netG_B(self.real_A)
            self.loss_idt_B = self.criterionIdt(self.idt_B, self.real_A) * lambda_A * lambda_idt
        else:
            self.loss_idt_A = 0
            self.loss_idt_B = 0

        self.loss_G_A = self.criterionGAN(self.netD_A(self.fake_B), True)
        self.loss_G_B = self.criterionGAN(self.netD_B(self.fake_A), True)

        # Mask-weighted cycle losses
        self.loss_cycle_A = self.weighted_cycle_loss(self.rec_A, self.real_A, self.real_A_mask) * lambda_A
        self.loss_cycle_B = self.weighted_cycle_loss(self.rec_B, self.real_B, self.real_B_mask) * lambda_B

        self.loss_G = self.loss_G_A + self.loss_G_B + \
                      self.loss_cycle_A + self.loss_cycle_B + \
                      self.loss_idt_A + self.loss_idt_B
        self.loss_G.backward()

    def optimize_parameters(self):
        self.forward()
        self.set_requires_grad([self.netD_A, self.netD_B], False)
        self.optimizer_G.zero_grad()
        self.backward_G()
        self.optimizer_G.step()

        self.set_requires_grad([self.netD_A, self.netD_B], True)
        self.optimizer_D.zero_grad()
        self.backward_D_A()
        self.backward_D_B()
        self.optimizer_D.step()

# ... rest of your new code ...
"""

# Overwrite the file with new content
with open(file_path, "w") as f:
    f.write(new_content)

print(f"✅ Successfully overwritten the file: {file_path}")


# Train

In [None]:
!python /kaggle/working/pytorch-CycleGAN-and-pix2pix/train.py \  ##replace with the path for ypu train.py
  --dataroot /kaggle/working/cyclegan_data \   ##replace with the path for your prepared data root
  --name emidec2acdc \
  --model cycle_gan \
  --gpu_ids 0 \
  --input_nc 1 --output_nc 1 \
  --preprocess scale_width_and_crop \
  --crop_size 128 \
  --load_size 128 \
  --batch_size 4


# Inference and visualise predictions

In [None]:
!python test.py \   ##replace with the path for ypu test.py
  --dataroot /kaggle/working/cyclegan_data \  ##replace with the path for your prepared data root
  --name emidec2acdc \
  --model cycle_gan \
  --direction AtoB \
  --input_nc 1 --output_nc 1 \
  --preprocess scale_width_and_crop \
  --load_size 128 --crop_size 128 \   
  --num_test 999999  ## replace with the number of samples needed (default 50)

In [None]:
import os
import matplotlib.pyplot as plt
from PIL import Image

# Path to test results
results_path = './results/emidec2acdc/test_latest/images'  ## replace with the actual path

# Get all translated image names (ending with _fake_B.png)
fake_images = sorted([f for f in os.listdir(results_path) if f.endswith('_fake_B.png')])

# Display a few
num_examples = 150
plt.figure(figsize=(15, num_examples * 3))

for i, fake_name in enumerate(fake_images[:num_examples]):
    real_name = fake_name.replace('fake_B', 'real_A')

    real_img = Image.open(os.path.join(results_path, real_name)).convert('L')
    fake_img = Image.open(os.path.join(results_path, fake_name)).convert('L')

    # Real
    plt.subplot(num_examples, 2, 2*i+1)
    plt.imshow(real_img, cmap='gray')
    plt.title(f'Real A: {real_name}')
    plt.axis('off')

    # Fake
    plt.subplot(num_examples, 2, 2*i+2)
    plt.imshow(fake_img, cmap='gray')
    plt.title(f'Fake B: {fake_name}')
    plt.axis('off')

plt.tight_layout()
plt.show()