In [None]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
import numpy as np
import pandas as pd
from os import listdir
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from PIL import Image,  ImageOps

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
batch_size = 64
num_channels = 1
num_features = 8
image_size = 128
latent_dim = 128

In [None]:
result_path = '/content/drive/MyDrive/Hossein/CancerCell/result/'

In [None]:
weight_path = '/content/drive/MyDrive/Hossein/CancerCell/weight/'

In [None]:
log_path = '/content/drive/MyDrive/Hossein/CancerCell/logs'

In [None]:
df = pd.read_csv('/content/drive/MyDrive/Hossein/CancerCell/MyExpt_IdentifySecondaryObjects.csv')

In [None]:
df['Id'] = df['Metadata_Well']+'_'+df['ImageNumber'].astype('str')+'_'+df['ObjectNumber'].astype('str')

In [None]:
data = df[['Id','AreaShape_Area', 'AreaShape_BoundingBoxMinimum_Y',
'AreaShape_Eccentricity', 'AreaShape_HuMoment_4',
'AreaShape_NormalizedMoment_1_1', 'AreaShape_NormalizedMoment_1_2',
'AreaShape_Zernike_6_2', 'AreaShape_Zernike_8_6',
'Intensity_MaxIntensity_DNA']]

In [None]:
img_path = '/content/drive/MyDrive/Ravaee/CancerCell/processed_June22/'

In [None]:
img_files = listdir(img_path)

In [None]:
len(img_files)

In [None]:
scaler = MinMaxScaler()

In [None]:
scaler = scaler.fit(data[['AreaShape_Area', 'AreaShape_BoundingBoxMinimum_Y',
'AreaShape_Eccentricity', 'AreaShape_HuMoment_4',
'AreaShape_NormalizedMoment_1_1', 'AreaShape_NormalizedMoment_1_2',
'AreaShape_Zernike_6_2', 'AreaShape_Zernike_8_6',
'Intensity_MaxIntensity_DNA']])

In [None]:
Id = data['Id'].values

In [None]:
data=scaler.transform(data[['AreaShape_Area', 'AreaShape_BoundingBoxMinimum_Y',
'AreaShape_Eccentricity', 'AreaShape_HuMoment_4',
'AreaShape_NormalizedMoment_1_1', 'AreaShape_NormalizedMoment_1_2',
'AreaShape_Zernike_6_2', 'AreaShape_Zernike_8_6',
'Intensity_MaxIntensity_DNA']])

In [None]:
df = pd.DataFrame({'id':Id,'AreaShape_Area':data[:,0],
                                'AreaShape_BoundingBoxMinimum_Y':data[:,1],
                               'AreaShape_Eccentricity':data[:,2],
                                'AreaShape_HuMoment_4':data[:,3],
                                 'AreaShape_NormalizedMoment_1_1':data[:,4],
                                'AreaShape_NormalizedMoment_1_2':data[:,5],
                                'AreaShape_Zernike_6_2':data[:,6],
                                'AreaShape_Zernike_8_6':data[:,7],
                                 'Intensity_MaxIntensity_DNA':data[:,8]
                   })

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((df['id'].values,
                                              tf.cast(df['AreaShape_Area'].values, tf.float32),
                                              tf.cast(df['AreaShape_Eccentricity'].values, tf.float32),
                                              tf.cast(df['AreaShape_BoundingBoxMinimum_Y'].values, tf.float32),
                                              tf.cast(df['AreaShape_HuMoment_4'].values, tf.float32),
                                             tf.cast(df['AreaShape_NormalizedMoment_1_1'].values, tf.float32),
                                             tf.cast(df['AreaShape_NormalizedMoment_1_2'].values, tf.float32),
                                             tf.cast(df['AreaShape_Zernike_6_2'].values, tf.float32),
                                             tf.cast(df['AreaShape_Zernike_8_6'].values, tf.float32),
                                             tf.cast(df['Intensity_MaxIntensity_DNA'].values, tf.float32),
                                                   ))

In [None]:
def map_fn(path, l1,l2,l3,l4,l5,l6,l7,l8,l9):
    image = tf.image.decode_jpeg(tf.io.read_file(img_path+path+'.jpg'))
    image = (tf.cast(image, tf.float32)-127.5) / 127.5
    return image, tf.convert_to_tensor([l1,l2,l3,l4,l5,l6,l7,l8])

In [None]:
train_dataset = train_dataset.map(map_fn)
train_dataset = (train_dataset
    .shuffle(1024)
    .cache()
#     .repeat()
    .batch(batch_size)
    .prefetch(tf.data.AUTOTUNE)
)

In [None]:
def get_discriminator():
  input_layer = layers.Input(shape=(64,64,num_channels+1))
  x = layers.Conv2D(32 , (7,7) , padding='same',strides=2,use_bias=True)(input_layer)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2D(64 , (7,7) , padding='same',strides=2,use_bias=True)(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2D(128 , (7,7) , padding='same',strides=2,use_bias=True)(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2D(256 , (7,7) , padding='same',strides=2,use_bias=True)(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Flatten()(x)
  x = layers.Dropout(0.2)(x)
  x = layers.Dense(256 , activation='relu')(x)
  x = layers.Dropout(0.2)(x)
  x = layers.Dense(1)(x)
  model = tf.keras.models.Model(input_layer,x, name='discriminator')
  model.summary()
  return model

In [None]:
def get_geneator():
  input_layer = layers.Input(shape=(latent_dim+num_features))
  x = layers.Dense(8 * 8 * (latent_dim+num_features))(input_layer)
  # x = layers.BatchNormalization()(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Reshape((8, 8, latent_dim+num_features))(x)
  x = layers.Conv2DTranspose(128, (5, 5), strides=(2, 2), padding="same",use_bias=True)(x)
  # x = layers.BatchNormalization()(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding="same",use_bias=True)(x)
  # x = layers.BatchNormalization()(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2DTranspose(3, (5, 5), strides=(2, 2), padding="same",use_bias=True)(x)
  # x = layers.BatchNormalization()(x)
  x = layers.Activation("tanh")(x)
  model = tf.keras.models.Model(input_layer,x, name='generator')
  model.summary()
  return model


In [None]:
class GANMonitor(tf.keras.callbacks.Callback):
    def __init__(self, num_img=3, latent_dim=128 , feature=[]):
        self.num_img = num_img
        self.latent_dim = latent_dim
        self.features = np.vstack([feature]*num_img)

    def on_epoch_end(self, epoch, logs=None):
        random_latent_vectors = tf.random.normal(shape=(self.num_img, self.latent_dim))
        random_vector_features = tf.concat(
            [random_latent_vectors, self.features], axis=1
        )

        # Decode the noise (guided by features) to fake images.
        generated_images = self.model.generator(random_vector_features)
        generated_images = (generated_images * 127.5) + 127.5

        for i in range(self.num_img):
            img = generated_images[i].numpy()
            img = tf.keras.preprocessing.image.array_to_img(img)
            img.save(result_path+"generated_img_{epoch}_{i}.png".format(i=i, epoch=epoch))

        self.model.generator.save(weight_path+'generator.h5')
        self.model.discriminator.save(weight_path+'discriminator.h5')


In [None]:
class ConditionalGAN(tf.keras.Model):
    def __init__(self, discriminator, generator, latent_dim,batch_size):
        super(ConditionalGAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.gen_loss_tracker = tf.keras.metrics.Mean(name="generator_loss")
        self.disc_loss_tracker = tf.keras.metrics.Mean(name="discriminator_loss")
        self.batch_size = batch_size
        self.d_steps = 3
        self.gp_weight = 10

    @property
    def metrics(self):
        return [self.gen_loss_tracker, self.disc_loss_tracker]
        # return [self.gen_loss_tracker, tf.keras.metrics.Accuracy()]

    def compile(self, d_optimizer, g_optimizer, d_loss_fn,g_loss_fn):
        super(ConditionalGAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.d_loss_fn = d_loss_fn
        self.g_loss_fn = g_loss_fn

    def gradient_penalty(self, batch_size, real_images, fake_images):
        """ Calculates the gradient penalty.

        This loss is calculated on an interpolated image
        and added to the discriminator loss.
        """
        # Get the interpolated image
        alpha = tf.random.normal([batch_size, 1, 1, 1], 0.0, 1.0)
        diff = fake_images - real_images
        interpolated = real_images + alpha * diff

        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            # 1. Get the discriminator output for this interpolated image.
            pred = self.discriminator(interpolated, training=True)

        # 2. Calculate the gradients w.r.t to this interpolated image.
        grads = gp_tape.gradient(pred, [interpolated])[0]
        # 3. Calculate the norm of the gradients.
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp

    def train_step(self, data):
        # Unpack the data.
        real_images, features = data

        for i in range(self.d_steps):
            pad = image_size * image_size // num_features
            image_of_features = tf.repeat(
                      features, repeats=[pad] ,axis=1
                  )
            image_of_features = tf.reshape(
                image_of_features, (-1, image_size, image_size, 1)
            )
            batch_size = tf.shape(real_images)[0]
            random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
            random_vector_features = tf.concat(
                [random_latent_vectors, features], axis=1
            )

            with tf.GradientTape() as tape:
                self.discriminator.trainable = True
                # Generate fake images from the latent vector
                fake_images = self.generator(random_vector_features,training=True)
                fake_image_and_features = tf.concat([fake_images, image_of_features], -1)
                real_image_and_features = tf.concat([real_images, image_of_features], -1)

                # Get the logits for the fake images
                fake_logits = self.discriminator(fake_image_and_features, training=True)

                # Get the logits for the real images
                real_logits = self.discriminator(real_image_and_features, training=True)

                # Calculate the discriminator loss using the fake and real image logits
                d_cost = self.d_loss_fn(real_img=real_logits, fake_img=fake_logits)
                # Calculate the gradient penalty
                gp = self.gradient_penalty(batch_size, real_image_and_features, fake_image_and_features)
                # Add the gradient penalty to the original discriminator loss
                d_loss = d_cost + gp * self.gp_weight

            # Get the gradients w.r.t the discriminator loss
            d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables)
            # Update the weights of the discriminator using the discriminator optimizer
            self.d_optimizer.apply_gradients(
                zip(d_gradient, self.discriminator.trainable_variables)
            )


        # Train the generator
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        random_vector_features = tf.concat(
                [random_latent_vectors, features], axis=1
            )

        with tf.GradientTape() as g_tape:
            self.discriminator.trainable = False

            g_tape.watch(self.generator.trainable_variables)

            # Generate fake images using the generator
            generated_images = self.generator(random_vector_features, training=True)
            # Get the discriminator logits for fake images
            fake_image_and_features = tf.concat([generated_images, image_of_features], -1)
            gen_img_logits = self.discriminator(fake_image_and_features, training=True)

            # Calculate the generator loss
            g_loss = self.g_loss_fn(gen_img_logits)

        # Get the gradients w.r.t the generator loss
        gen_gradient = g_tape.gradient(g_loss, self.generator.trainable_variables)
        # Update the weights of the generator using the generator optimizer
        self.g_optimizer.apply_gradients(
            zip(gen_gradient, self.generator.trainable_variables)
        )



        # Monitor loss.
        self.gen_loss_tracker.update_state(g_loss)
        self.disc_loss_tracker.update_state(d_loss)
        return {"d_loss": d_loss, "g_loss": g_loss}


In [None]:
path = df.iloc[0,:]['id']
img=Image.open('/content/drive/MyDrive/Hossein/CancerCell/processed_June22/'+path+'.jpg')
display(img)

In [None]:
x1  = df.iloc[0,:]['AreaShape_Area']
x2  = df.iloc[0,:]['AreaShape_BoundingBoxMinimum_Y']
x3  = df.iloc[0,:]['AreaShape_Eccentricity']
x4  = df.iloc[0,:]['AreaShape_HuMoment_4']
x5  = df.iloc[0,:]['AreaShape_NormalizedMoment_1_1']
x6  = df.iloc[0,:]['AreaShape_NormalizedMoment_1_2']
x7  = df.iloc[0,:]['AreaShape_Zernike_6_2']
x8  = df.iloc[0,:]['AreaShape_Zernike_8_6']

In [None]:
def discriminator_loss(real_img, fake_img):
    real_loss = tf.reduce_mean(real_img)
    fake_loss = tf.reduce_mean(fake_img)
    return fake_loss - real_loss


# Define the loss functions for the generator.
def generator_loss(fake_img):
    return -tf.reduce_mean(fake_img)

In [None]:
callback = GANMonitor(num_img=3, latent_dim=latent_dim, feature = [x1,x2,x3,x4,x5,x6,x7,x8])
tensorboard_callback = keras.callbacks.TensorBoard(log_dir=log_path)

In [None]:
cond_gan = ConditionalGAN(
    discriminator=get_discriminator(),
    generator=get_geneator(),
    latent_dim=latent_dim ,
    batch_size=batch_size
)
cond_gan.compile(
    d_optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5, beta_1=0.5, beta_2=0.9),
    g_optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5, beta_1=0.5, beta_2=0.9),
    g_loss_fn=generator_loss,
    d_loss_fn=discriminator_loss,
)

hist=cond_gan.fit(train_dataset, epochs=3000 , callbacks=[callback,tensorboard_callback])


In [None]:
plt.figure(figsize=(7,7))
plt.plot(hist.epoch,hist.history['g_loss'])
plt.plot(hist.epoch,hist.history['d_loss'])
plt.legend(['Generator loss','Discriminator loss'])
plt.show()