In [1]:
import os
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, Model

# Step 1: Load the dataset from Kaggle
!pip install kagglehub
import kagglehub

# Download dataset
path = kagglehub.dataset_download("andrewmvd/lung-and-colon-cancer-histopathological-images")



In [1]:
import os
import numpy as np
import tensorflow as tf
from PIL import Image
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.optimizers import Adam

# Paths to dataset directories
PATH_TO_LUNG_N = '/kaggle/input/lung-and-colon-cancer-histopathological-images/lung_colon_image_set/lung_image_sets/lung_n'
PATH_TO_LUNG_SCC = '/kaggle/input/lung-and-colon-cancer-histopathological-images/lung_colon_image_set/lung_image_sets/lung_scc'

# Define image size and batch size
IMG_SIZE = (128, 128)
BATCH_SIZE = 32

# Constants for loss weights
LAMBDA_CYCLE = 10.0
LAMBDA_IDENTITY = 5.0

# Define loss object with from_logits=True
loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=True)

# Discriminator loss with updated function
def discriminator_loss(real, generated):
    real_loss = loss_obj(tf.ones_like(real), real)
    generated_loss = loss_obj(tf.zeros_like(generated), generated)
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss * 0.5

# Generator loss function
def generator_loss(generated):
    return loss_obj(tf.ones_like(generated), generated)

# Cycle consistency loss
def calc_cycle_loss(real_image, cycled_image):
    return LAMBDA_CYCLE * tf.reduce_mean(tf.abs(real_image - cycled_image))

# Identity loss function
def identity_loss(real_image, same_image):
    return LAMBDA_IDENTITY * tf.reduce_mean(tf.abs(real_image - same_image))

# Function to load and preprocess images
def preprocess_image(img_path):
    img = Image.open(img_path).convert("RGB")
    img = img.resize(IMG_SIZE)
    img = img_to_array(img) / 127.5 - 1
    return img

# Dataset generator function with added noise in discriminator
def load_data(path_n, path_scc, batch_size=BATCH_SIZE):
    lung_n_images = [os.path.join(path_n, fname) for fname in os.listdir(path_n)]
    lung_scc_images = [os.path.join(path_scc, fname) for fname in os.listdir(path_scc)]

    def generator():
        for img_n, img_scc in zip(lung_n_images, lung_scc_images):
            yield preprocess_image(img_n), preprocess_image(img_scc)

    dataset = tf.data.Dataset.from_generator(generator, output_signature=(
        tf.TensorSpec(shape=(128, 128, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(128, 128, 3), dtype=tf.float32),
    ))
    return dataset.shuffle(buffer_size=1000).repeat().batch(batch_size)

# Initialize dataset
dataset = load_data(PATH_TO_LUNG_N, PATH_TO_LUNG_SCC)
steps_per_epoch = 500 
    
@tf.keras.utils.register_keras_serializable()
class SelfAttention(tf.keras.layers.Layer):
    def __init__(self, channels, trainable=True, **kwargs):
        super(SelfAttention, self).__init__(trainable=trainable, **kwargs)
        self.channels = channels
        self.gamma = tf.Variable(1.0, trainable=trainable, dtype=tf.float32)  # Set gamma to float32

    def build(self, input_shape):
        self.theta = tf.keras.layers.Conv2D(self.channels // 8, kernel_size=1)
        self.phi = tf.keras.layers.Conv2D(self.channels // 8, kernel_size=1)
        self.g = tf.keras.layers.Conv2D(self.channels // 2, kernel_size=1)
        self.o = tf.keras.layers.Conv2D(self.channels, kernel_size=1)
        super(SelfAttention, self).build(input_shape)

    def call(self, x):
        batch_size, height, width, channels = tf.shape(x)[0], tf.shape(x)[1], tf.shape(x)[2], tf.shape(x)[3]
        
        theta = tf.reshape(self.theta(x), [batch_size, -1, self.channels // 8])
        phi = tf.reshape(self.phi(x), [batch_size, -1, self.channels // 8])
        attention_map = tf.nn.softmax(tf.matmul(theta, phi, transpose_b=True))

        g = tf.reshape(self.g(x), [batch_size, -1, self.channels // 2])
        attention_out = tf.matmul(attention_map, g)
        attention_out = tf.reshape(attention_out, [batch_size, height, width, self.channels // 2])
        attention_out = self.o(attention_out)
        
        # Cast attention_out and x to float32 before adding
        return self.gamma * tf.cast(attention_out, tf.float32) + tf.cast(x, tf.float32)



# Generator Network with ResNet and Self-Attention blocks
def ResNetGenerator(input_shape, filters=64, num_blocks=9):
    inputs = tf.keras.layers.Input(shape=input_shape)

    x = tf.keras.layers.Conv2D(filters, kernel_size=7, padding='same')(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)

    for _ in range(2):
        filters *= 2
        x = tf.keras.layers.Conv2D(filters, kernel_size=3, strides=2, padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.ReLU()(x)

    for _ in range(num_blocks):
        residual = x
        x = tf.keras.layers.Conv2D(filters, kernel_size=3, padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.ReLU()(x)
        x = tf.keras.layers.Conv2D(filters, kernel_size=3, padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.add([x, residual])

        if _ == num_blocks // 2:
            x = SelfAttention(filters)(x)

    for _ in range(2):
        filters //= 2
        x = tf.keras.layers.Conv2DTranspose(filters, kernel_size=3, strides=2, padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.ReLU()(x)

    x = tf.keras.layers.Conv2D(3, kernel_size=7, padding='same', activation='tanh')(x)
    return tf.keras.Model(inputs, x)

# Discriminator Network with PatchGAN
def PatchGANDiscriminator(input_shape, filters=64):
    inputs = tf.keras.layers.Input(shape=input_shape)
    x = tf.keras.layers.Conv2D(filters, kernel_size=4, strides=2, padding='same')(inputs)
    x = tf.keras.layers.LeakyReLU(0.2)(x)

    for i in range(3):
        filters *= 2
        stride = 1 if i == 2 else 2
        x = tf.keras.layers.Conv2D(filters, kernel_size=4, strides=stride, padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.LeakyReLU(0.2)(x)

    x = tf.keras.layers.Conv2D(1, kernel_size=4, padding='same')(x)
    return tf.keras.Model(inputs, x)
    
class CycleGAN(tf.keras.Model):
    def __init__(self, input_shape):
        super(CycleGAN, self).__init__()
        self.gen_G = ResNetGenerator(input_shape)
        self.gen_F = ResNetGenerator(input_shape)
        self.disc_X = PatchGANDiscriminator(input_shape)
        self.disc_Y = PatchGANDiscriminator(input_shape)

    def compile(self, gen_G_opt, gen_F_opt, disc_X_opt, disc_Y_opt):
        super(CycleGAN, self).compile()
        self.gen_G_opt = gen_G_opt
        self.gen_F_opt = gen_F_opt
        self.disc_X_opt = disc_X_opt
        self.disc_Y_opt = disc_Y_opt

    @tf.function
    def train_step(self, batch_data):
        real_x, real_y = batch_data

        with tf.GradientTape(persistent=True) as tape:
            # Generator forward pass
            fake_y = self.gen_G(real_x, training=True)
            cycle_x = self.gen_F(fake_y, training=True)
            fake_x = self.gen_F(real_y, training=True)
            cycle_y = self.gen_G(fake_x, training=True)
            
            same_x = self.gen_F(real_x, training=True)
            same_y = self.gen_G(real_y, training=True)
            
            # Add noise to real and fake images for discriminator inputs
            noise_factor = 0.05
            real_x_noisy = real_x + noise_factor * tf.random.normal(shape=tf.shape(real_x))
            fake_x_noisy = fake_x + noise_factor * tf.random.normal(shape=tf.shape(fake_x))
            real_y_noisy = real_y + noise_factor * tf.random.normal(shape=tf.shape(real_y))
            fake_y_noisy = fake_y + noise_factor * tf.random.normal(shape=tf.shape(fake_y))

            # Discriminator forward pass
            disc_real_x = self.disc_X(real_x_noisy, training=True)
            disc_fake_x = self.disc_X(fake_x_noisy, training=True)
            disc_real_y = self.disc_Y(real_y_noisy, training=True)
            disc_fake_y = self.disc_Y(fake_y_noisy, training=True)

            # Calculate generator losses
            gen_G_loss = generator_loss(disc_fake_y)
            gen_F_loss = generator_loss(disc_fake_x)
            cycle_loss_G = calc_cycle_loss(real_x, cycle_x)
            cycle_loss_F = calc_cycle_loss(real_y, cycle_y)
            id_loss_G = identity_loss(real_y, same_y)
            id_loss_F = identity_loss(real_x, same_x)

            total_gen_G_loss = gen_G_loss + cycle_loss_G + id_loss_G
            total_gen_F_loss = gen_F_loss + cycle_loss_F + id_loss_F

            # Calculate discriminator losses with updated discriminator_loss
            disc_X_loss = discriminator_loss(disc_real_x, disc_fake_x)
            disc_Y_loss = discriminator_loss(disc_real_y, disc_fake_y)

        # Apply gradients to optimizers
        gradients_G = tape.gradient(total_gen_G_loss, self.gen_G.trainable_variables)
        gradients_F = tape.gradient(total_gen_F_loss, self.gen_F.trainable_variables)
        gradients_disc_X = tape.gradient(disc_X_loss, self.disc_X.trainable_variables)
        gradients_disc_Y = tape.gradient(disc_Y_loss, self.disc_Y.trainable_variables)

        self.gen_G_opt.apply_gradients(zip(gradients_G, self.gen_G.trainable_variables))
        self.gen_F_opt.apply_gradients(zip(gradients_F, self.gen_F.trainable_variables))
        self.disc_X_opt.apply_gradients(zip(gradients_disc_X, self.disc_X.trainable_variables))
        self.disc_Y_opt.apply_gradients(zip(gradients_disc_Y, self.disc_Y.trainable_variables))

        # Return a dictionary of losses to monitor
        return {
            "G_loss": total_gen_G_loss,
            "F_loss": total_gen_F_loss,
            "D_X_loss": disc_X_loss,
            "D_Y_loss": disc_Y_loss
        }


# Compile and initialize the model
input_shape = (128, 128, 3)
cycle_gan_model = CycleGAN(input_shape)
cycle_gan_model.compile(
    gen_G_opt=Adam(learning_rate=0.0001, beta_1=0.5),
    gen_F_opt=Adam(learning_rate=0.0001, beta_1=0.5),
    disc_X_opt=Adam(learning_rate=0.0001, beta_1=0.5),
    disc_Y_opt=Adam(learning_rate=0.0001, beta_1=0.5)
)
    
import numpy as np
import matplotlib.pyplot as plt
import os

import os
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

def save_generated_images(epoch, gen_G, gen_F, real_x, real_y, save_dir="generated_images", max_samples=3):
    fake_y = gen_G(real_x, training=False)
    fake_x = gen_F(real_y, training=False)
    cycle_x = gen_F(fake_y, training=False)
    cycle_y = gen_G(fake_x, training=False)

    # Create directory to save images if it doesn't exist
    os.makedirs(save_dir, exist_ok=True)

    # Limit the number of samples to display
    num_samples = min(max_samples, len(real_x))

    # Create a plot with 3 columns: input, generated, and reconstructed
    fig, axes = plt.subplots(num_samples, 3, figsize=(15, 5 * num_samples))

    for i in range(num_samples):
        # Convert tensors to NumPy arrays and rescale
        real_img = (real_x[i].numpy() * 0.5 + 0.5) * 255.0
        fake_img = (fake_y[i].numpy() * 0.5 + 0.5) * 255.0
        cycle_img = (cycle_x[i].numpy() * 0.5 + 0.5) * 255.0

        # Convert to uint8 and display in respective columns
        axes[i, 0].imshow(real_img.astype(np.uint8))
        axes[i, 0].set_title("Input")
        axes[i, 1].imshow(fake_img.astype(np.uint8))
        axes[i, 1].set_title("Generated")
        axes[i, 2].imshow(cycle_img.astype(np.uint8))
        axes[i, 2].set_title("Reconstructed")

        for ax in axes[i, :]:
            ax.axis("off")

    # Save the entire plot as a single image
    plt.savefig(f"{save_dir}/epoch_{epoch}_generated_images.png")
    plt.close(fig)




# Training loop
for epoch in range(100):
    # Initialize lists to track losses
    g_losses = []
    f_losses = []
    d_x_losses = []
    d_y_losses = []
    
    for i, batch_data in enumerate(dataset.take(steps_per_epoch)):
        real_x, real_y = batch_data  # Unpack the batch data
        losses = cycle_gan_model.train_step(batch_data)
        
        # Collect losses for each batch
        g_losses.append(losses['G_loss'])
        f_losses.append(losses['F_loss'])
        d_x_losses.append(losses['D_X_loss'])
        d_y_losses.append(losses['D_Y_loss'])
        

    # Calculate average losses for the epoch
    avg_g_loss = np.mean(g_losses)
    avg_f_loss = np.mean(f_losses)
    avg_d_x_loss = np.mean(d_x_losses)
    avg_d_y_loss = np.mean(d_y_losses)
    
    # Print average losses for the epoch
    print(f"End of Epoch {epoch+1} - Avg G_loss: {avg_g_loss:.4f}, Avg F_loss: {avg_f_loss:.4f}, "
          f"Avg D_X_loss: {avg_d_x_loss:.4f}, Avg D_Y_loss: {avg_d_y_loss:.4f}")
    
    # Save generated images at the end of each epoch
    save_generated_images(epoch, cycle_gan_model.gen_G, cycle_gan_model.gen_F, real_x, real_y)


End of Epoch 1 - Avg G_loss: 6.1132, Avg F_loss: 4.8143, Avg D_X_loss: 0.3940, Avg D_Y_loss: 0.2021
End of Epoch 2 - Avg G_loss: 5.4207, Avg F_loss: 4.3434, Avg D_X_loss: 0.3200, Avg D_Y_loss: 0.1659
End of Epoch 3 - Avg G_loss: 4.7377, Avg F_loss: 4.1709, Avg D_X_loss: 0.3578, Avg D_Y_loss: 0.3082
End of Epoch 4 - Avg G_loss: 3.6742, Avg F_loss: 4.0650, Avg D_X_loss: 0.3787, Avg D_Y_loss: 0.4276
End of Epoch 5 - Avg G_loss: 3.4199, Avg F_loss: 4.1650, Avg D_X_loss: 0.3683, Avg D_Y_loss: 0.4716
End of Epoch 6 - Avg G_loss: 3.0962, Avg F_loss: 3.6546, Avg D_X_loss: 0.4149, Avg D_Y_loss: 0.5237
End of Epoch 7 - Avg G_loss: 3.1455, Avg F_loss: 3.7822, Avg D_X_loss: 0.3879, Avg D_Y_loss: 0.5158
End of Epoch 8 - Avg G_loss: 2.6389, Avg F_loss: 3.4023, Avg D_X_loss: 0.4429, Avg D_Y_loss: 0.6106
End of Epoch 9 - Avg G_loss: 2.4543, Avg F_loss: 3.2223, Avg D_X_loss: 0.4581, Avg D_Y_loss: 0.6323
End of Epoch 10 - Avg G_loss: 2.3894, Avg F_loss: 3.1901, Avg D_X_loss: 0.4561, Avg D_Y_loss: 0.6343

KeyboardInterrupt: 

In [2]:
# Directory to save models
save_dir = "saved_models"
os.makedirs(save_dir, exist_ok=True)

# Save each model in .keras format
cycle_gan_model.gen_G.save(os.path.join(save_dir, "generator_G_02.keras"))
cycle_gan_model.gen_F.save(os.path.join(save_dir, "generator_F_02.keras"))
cycle_gan_model.disc_X.save(os.path.join(save_dir, "discriminator_X_02.keras"))
cycle_gan_model.disc_Y.save(os.path.join(save_dir, "discriminator_Y_02.keras"))

print("Models saved successfully in .keras format.")


Models saved successfully in .keras format.


In [22]:
!pip install torch_fidelity

  pid, fd = os.forkpty()


[0m[31mERROR: Could not find a version that satisfies the requirement torch_fidelity (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for torch_fidelity[0m[31m
[0m

In [21]:
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model
from skimage.metrics import structural_similarity as ssim
from torch_fidelity import calculate_metrics
import torch

# Load trained CycleGAN models
def load_cycle_gan_models():
    gen_G = load_model("/kaggle/working/saved_models/generator_G_02.keras", compile=False)
    gen_F = load_model("/kaggle/working/saved_models/generator_F_02.keras", compile=False)
    return gen_G, gen_F

# Convert images from numpy arrays to PyTorch tensors
def numpy_to_tensor(images):
    images = torch.tensor(images).permute(0, 3, 1, 2)  # Change from NHWC to NCHW
    images = images / 255.0 * 2 - 1  # Normalize to [-1, 1]
    return images

# Calculate FID score using torch-fidelity
def calculate_fid_torch_fidelity(real_images, generated_images):
    # Convert images to the right format
    real_images_tensor = numpy_to_tensor(real_images)
    generated_images_tensor = numpy_to_tensor(generated_images)
    
    # Calculate FID using torch-fidelity
    metrics = calculate_metrics(
        input1={'torch': real_images_tensor},
        input2={'torch': generated_images_tensor},
        fid=True,  # Calculate only FID
        verbose=False
    )
    return metrics['frechet_inception_distance']

# SSIM calculation for 128x128 images or smaller
def calculate_ssim(input_images, reconstructed_images, win_size=3):
    ssim_scores = []
    for input_img, recon_img in zip(input_images, reconstructed_images):
        win_size = min(win_size, min(input_img.shape[:2]))  
        try:
            score = ssim(input_img, recon_img, multichannel=True, data_range=input_img.max() - input_img.min(), win_size=win_size)
            ssim_scores.append(score)
        except ValueError as e:
            print(f"Error calculating SSIM for image: {e}")
            ssim_scores.append(np.nan)  
    return np.mean(ssim_scores) * 100 if len(ssim_scores) > 0 else np.nan

# Generate and display images, calculate FID and SSIM scores
def generate_and_display_images(gen_G, gen_F, dataset, num_samples=20):
    input_images, reconstructed_images = [], []
    fid_scores = []

    for batch in dataset.take(num_samples):
        real_x, _ = batch
        fake_y = gen_G(real_x, training=False)
        cycle_x = gen_F(fake_y, training=False)

        input_images.extend(real_x.numpy())
        reconstructed_images.extend(cycle_x.numpy())

    input_images = np.array(input_images)
    reconstructed_images = np.array(reconstructed_images)

    # Display input, generated, and reconstructed images
    fig, axes = plt.subplots(num_samples, 3, figsize=(15, 5 * num_samples))

    for i in range(num_samples):
        real_img = (input_images[i] * 0.5 + 0.5) * 255.0
        cycle_img = (reconstructed_images[i] * 0.5 + 0.5) * 255.0

        axes[i, 0].imshow(real_img.astype(np.uint8))
        axes[i, 0].set_title("Input Image")
        axes[i, 2].imshow(cycle_img.astype(np.uint8))
        axes[i, 2].set_title("Reconstructed Image")

        for ax in axes[i, :]:
            ax.axis("off")

        ssim_score = calculate_ssim([real_img.astype(np.uint8)], [cycle_img.astype(np.uint8)], win_size=3)
        fid_score = calculate_fid_torch_fidelity(real_img[None, ...], cycle_img[None, ...])
        fid_scores.append(fid_score)

        print(f"Image {i+1}: SSIM = {ssim_score:.4f}, FID = {fid_score:.4f}")

    plt.tight_layout()
    plt.show()

    # Display the average FID score for the dataset
    avg_fid_score = np.mean(fid_scores)
    print(f"Average FID for the dataset: {avg_fid_score:.4f}")

# Load models and initialize dataset
gen_G, gen_F = load_cycle_gan_models()
dataset = load_data(PATH_TO_LUNG_N, PATH_TO_LUNG_SCC).take(20)

generate_and_display_images(gen_G, gen_F, dataset)


ModuleNotFoundError: No module named 'torch_fidelity'