In [17]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, ReLU, LeakyReLU, BatchNormalization, Input, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import numpy as np


In [18]:
def downsample(filters, size, apply_batchnorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = keras.Sequential()
    result.add(Conv2D(filters, size, strides=2, padding='same',
                      kernel_initializer=initializer, use_bias=False))
    if apply_batchnorm:
        result.add(BatchNormalization())
    result.add(LeakyReLU())
    return result

def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = keras.Sequential()
    result.add(Conv2DTranspose(filters, size, strides=2, padding='same',
                               kernel_initializer=initializer, use_bias=False))
    result.add(BatchNormalization())
    if apply_dropout:
        result.add(keras.layers.Dropout(0.5))
    result.add(ReLU())
    return result

def Generator():
    inputs = Input(shape=[256, 256, 3])

    down_stack = [
        downsample(64, 4, apply_batchnorm=False), # (bs, 128, 128, 64)
        downsample(128, 4), # (bs, 64, 64, 128)
        downsample(256, 4), # (bs, 32, 32, 256)
        downsample(512, 4), # (bs, 16, 16, 512)
        downsample(512, 4), # (bs, 8, 8, 512)
        downsample(512, 4), # (bs, 4, 4, 512)
        downsample(512, 4), # (bs, 2, 2, 512)
        downsample(512, 4), # (bs, 1, 1, 512)
    ]

    up_stack = [
        upsample(512, 4, apply_dropout=True), # (bs, 2, 2, 512)
        upsample(512, 4, apply_dropout=True), # (bs, 4, 4, 512)
        upsample(512, 4, apply_dropout=True), # (bs, 8, 8, 512)
        upsample(512, 4), # (bs, 16, 16, 512)
        upsample(256, 4), # (bs, 32, 32, 256)
        upsample(128, 4), # (bs, 64, 64, 128)
        upsample(64, 4),  # (bs, 128, 128, 64)
    ]

    initializer = tf.random_normal_initializer(0., 0.02)
    last = Conv2DTranspose(3, 4, strides=2, padding='same', kernel_initializer=initializer, activation='tanh') # (bs, 256, 256, 3)

    x = inputs
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])

    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = Concatenate()([x, skip])

    x = last(x)
    return Model(inputs=inputs, outputs=x)

# Instantiate two generators
G_A2B = Generator()  # Generator for A (real) to B (synthetic)
G_B2A = Generator()  # Generator for B (synthetic) to A (real)


In [19]:
def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)
    inp = Input(shape=[256, 256, 3], name='input_image')

    down1 = downsample(64, 4, False)(inp)  # (bs, 128, 128, 64)
    down2 = downsample(128, 4)(down1)      # (bs, 64, 64, 128)
    down3 = downsample(256, 4)(down2)      # (bs, 32, 32, 256)

    zero_pad1 = keras.layers.ZeroPadding2D()(down3)  # (bs, 34, 34, 256)
    conv = Conv2D(512, 4, strides=1, kernel_initializer=initializer, use_bias=False)(zero_pad1)  # (bs, 31, 31, 512)
    batchnorm1 = BatchNormalization()(conv)
    leaky_relu = LeakyReLU()(batchnorm1)

    zero_pad2 = keras.layers.ZeroPadding2D()(leaky_relu)  # (bs, 33, 33, 512)
    last = Conv2D(1, 4, strides=1, kernel_initializer=initializer)(zero_pad2)  # (bs, 30, 30, 1)

    return Model(inputs=inp, outputs=last)

# Instantiate two discriminators
D_A = Discriminator()  # Discriminator for real domain A
D_B = Discriminator()  # Discriminator for synthetic domain B


In [20]:
loss_obj = keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(real, generated):
    real_loss = loss_obj(tf.ones_like(real), real)
    generated_loss = loss_obj(tf.zeros_like(generated), generated)
    total_loss = real_loss + generated_loss
    return total_loss * 0.5

def generator_loss(generated):
    return loss_obj(tf.ones_like(generated), generated)

def cycle_loss(real_image, cycled_image, lambda_cycle=10):
    loss = tf.reduce_mean(tf.abs(real_image - cycled_image))
    return lambda_cycle * loss

def identity_loss(real_image, same_image, lambda_identity=5):
    loss = tf.reduce_mean(tf.abs(real_image - same_image))
    return lambda_identity * loss


In [21]:
# Optimizers
generator_optimizer = Adam(2e-4, beta_1=0.5)
discriminator_optimizer = Adam(2e-4, beta_1=0.5)

# Training Step
@tf.function
def train_step(real_A, real_B):
    with tf.GradientTape(persistent=True) as tape:
        # Generate fake images
        fake_B = G_A2B(real_A, training=True)
        cycled_A = G_B2A(fake_B, training=True)

        fake_A = G_B2A(real_B, training=True)
        cycled_B = G_A2B(fake_A, training=True)

        # Identity mapping
        same_A = G_B2A(real_A, training=True)
        same_B = G_A2B(real_B, training=True)

        # Discriminator predictions
        disc_real_A = D_A(real_A, training=True)
        disc_fake_A = D_A(fake_A, training=True)

        disc_real_B = D_B(real_B, training=True)
        disc_fake_B = D_B(fake_B, training=True)

        # Generator losses
        G_A2B_loss = generator_loss(disc_fake_B)
        G_B2A_loss = generator_loss(disc_fake_A)

        # Cycle-consistency losses
        total_cycle_loss = cycle_loss(real_A, cycled_A) + cycle_loss(real_B, cycled_B)

        # Identity losses
        total_identity_loss = identity_loss(real_A, same_A) + identity_loss(real_B, same_B)

        # Total generator loss
        total_G_A2B_loss = G_A2B_loss + total_cycle_loss + total_identity_loss
        total_G_B2A_loss = G_B2A_loss + total_cycle_loss + total_identity_loss

        # Discriminator losses
        D_A_loss = discriminator_loss(disc_real_A, disc_fake_A)
        D_B_loss = discriminator_loss(disc_real_B, disc_fake_B)

    # Calculate gradients
    G_A2B_gradients = tape.gradient(total_G_A2B_loss, G_A2B.trainable_variables)
    G_B2A_gradients = tape.gradient(total_G_B2A_loss, G_B2A.trainable_variables)

    D_A_gradients = tape.gradient(D_A_loss, D_A.trainable_variables)
    D_B_gradients = tape.gradient(D_B_loss, D_B.trainable_variables)

    # Apply gradients
    generator_optimizer.apply_gradients(zip(G_A2B_gradients, G_A2B.trainable_variables))
    generator_optimizer.apply_gradients(zip(G_B2A_gradients, G_B2A.trainable_variables))

    discriminator_optimizer.apply_gradients(zip(D_A_gradients, D_A.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(D_B_gradients, D_B.trainable_variables))

    return total_G_A2B_loss, total_G_B2A_loss, D_A_loss, D_B_loss


In [None]:
# Example of loading data (customize for your dataset)
def load_image(file_path):
    image = tf.io.read_file(file_path)
    image = tf.image.decode_jpeg(image)
    image = tf.image.resize(image, [256, 256])
    image = (image / 127.5) - 1  # Normalize to [-1, 1]
    return image

# Use the tf.data API to create datasets for real_A and real_B
# Replace with actual file paths for brain tumor images
real_A_paths = ["/content/image_dataset/1"]
real_B_paths = ["/content/image_dataset/3"]

dataset_A = tf.data.Dataset.from_tensor_slices(real_A_paths).map(load_image).batch(1)
dataset_B = tf.data.Dataset.from_tensor_slices(real_B_paths).map(load_image).batch(1)

# Train for a number of epochs
EPOCHS = 100
for epoch in range(EPOCHS):
    for real_A, real_B in zip(dataset_A, dataset_B):
        G_A2B_loss, G_B2A_loss, D_A_loss, D_B_loss = train_step(real_A, real_B)
        print(f'Epoch {epoch} - Generator Loss: {G_A2B_loss.numpy()}, {G_B2A_loss.numpy()}')
