In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
root = '/content/drive/MyDrive/140111/'

In [3]:
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import imageio
from tensorflow.keras.initializers import he_normal
import datetime
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

In [4]:
batch_size = 64
num_channels = 1
image_size = 128
latent_dim = 128

In [5]:
data = np.load('/content/drive/MyDrive/140111/data.npy')
types = open('/content/drive/MyDrive/140111/types.txt').readlines()
sub_types = open('/content/drive/MyDrive/140111/subtypes.txt').readlines()

In [6]:
data[data > 16] = 16 
data /= 16.0

In [7]:
df = pd.DataFrame(columns=['id','type','subtype','type_code', 'subtype_code'])
df.id = np.arange(data.shape[0])
df.type = types
df.type = df.type.str.strip()
df.subtype = sub_types
df.subtype = df.subtype.str.strip()
df = df.drop(df[df.type=='NA'].index)
df = df.drop(df[df.subtype=='NA'].index)
df.type_code = LabelEncoder().fit_transform(df.type)
df.subtype_code = LabelEncoder().fit_transform(df.subtype)

In [8]:
primary_df = df[df.subtype=='Primary Tumor']
primary_df = primary_df.sample(n=726)
df = df.drop(df[df.subtype=='Primary Tumor'].index)
df = pd.concat([df,primary_df],axis=0)

In [9]:
x_train = data[df.id.values]
x_train = x_train * 2.0 - 1.0
y_train = df.type_code.values
y_train = to_categorical(y_train)
c_cat_dim = len(df.subtype.unique())
num_classes = y_train.shape[1]

In [10]:
# Create tf.data.Dataset.
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

print(f"Shape of training images: {x_train.shape}")
print(f"Shape of training labels: {y_train.shape}")

Shape of training images: (2074, 128, 128, 1)
Shape of training labels: (2074, 33)


In [11]:
generator_in_channels = latent_dim + num_classes + c_cat_dim
print(generator_in_channels)

168


In [12]:
def get_discriminator():
  label_layer = layers.Input(shape=(num_classes,))
  li = layers.Dense(image_size*image_size ,use_bias=False)(label_layer)
  li = layers.Reshape((image_size,image_size,1))(li)
  input_layer = layers.Input(shape=(image_size,image_size,num_channels))
  x = layers.Concatenate()([li,input_layer])
  x = layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same")(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same")(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2D(256, (3, 3), strides=(2, 2), padding="same")(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2D(256, (3, 3), strides=(2, 2), padding="same")(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2D(512, (3, 3), strides=(2, 2), padding="same")(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.GlobalMaxPooling2D()(x)
  disc_out = layers.Dense(1, name='disc_out')(x)

  d_model = keras.models.Model([input_layer,label_layer], disc_out, name="discriminator")

  # q_net_out = layers.Dense(128, activation='relu', kernel_initializer='he_normal' , bias_initializer='he_normal'  )(x)
  # q_net_out = layers.Dense(c_cat_dim , activation='softmax')(q_net_out)
  # q_model = keras.models.Model(img_input, q_net_out, name='q_network')
  q_net_out = layers.Dense(512 , kernel_initializer='he_normal' , bias_initializer='he_normal'  )(x)
  q_net_out = layers.LeakyReLU(0.2)(q_net_out)
  q_net_out = layers.Dropout(0.2)(q_net_out)
  q_net_out = layers.Dense(256 , kernel_initializer='he_normal' , bias_initializer='he_normal'  )(q_net_out)
  q_net_out = layers.LeakyReLU(0.2)(q_net_out)
  q_net_out = layers.Dropout(0.2)(q_net_out)
  q_net_out = layers.Dense(c_cat_dim , kernel_initializer='he_normal' , bias_initializer='he_normal'  )(q_net_out)
  q_net_out = layers.Activation('softmax')(q_net_out)
  q_model = keras.models.Model([input_layer,label_layer], q_net_out, name='q_network')

  return d_model, q_model

In [13]:
def get_generator_model():
  noise = layers.Input(shape=(latent_dim,) , name='noise')
  labels = layers.Input(shape=(c_cat_dim,), name='c_cat')
  class_label = layers.Input(shape=(num_classes,), name='class_label')
  inputs =layers.concatenate([noise,labels,class_label], axis=1)
  x = layers.Dense(8 * 8 * (latent_dim+c_cat_dim+num_classes), name='gen_l1')(inputs)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Reshape((8, 8, latent_dim+c_cat_dim+num_classes))(x)
  x = layers.Conv2DTranspose(128, (3, 3), strides=(2, 2), padding="same",use_bias=False)(x)
  x = layers.BatchNormalization()(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2DTranspose(128, (3, 3), strides=(2, 2), padding="same",use_bias=False)(x)
  x = layers.BatchNormalization()(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2DTranspose(128, (5, 5), strides=(2, 2), padding="same",use_bias=False)(x)
  x = layers.BatchNormalization()(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2DTranspose(128, (7, 7), strides=(2, 2), padding="same",use_bias=False)(x)
  x = layers.BatchNormalization()(x)
  x = layers.LeakyReLU(alpha=0.2)(x)
  x = layers.Conv2D(1, (7, 7), padding="same",use_bias=False)(x)
  x = layers.BatchNormalization()(x)
  x = layers.Activation('tanh',name='gen_out')(x)
  g_model = keras.models.Model([noise,labels,class_label], x, name="generator")
  return g_model

In [14]:
# discriminator, q_network = get_discriminator()
# discriminator.summary()
discriminator = tf.keras.models.load_model(root + 'new model Discriminator.h5')
q_network = tf.keras.models.load_model(root + 'new model q network.h5')



In [15]:
# g_model = get_generator_model()
# g_model.summary()
g_model = tf.keras.models.load_model(root + 'new model Generator.h5')



In [16]:
class ConditionalInFOGAN(keras.Model):
    def __init__(self, discriminator, generator,q_network, latent_dim):
        super(ConditionalInFOGAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.q_network = q_network
        self.latent_dim = latent_dim
        self.gen_loss_tracker = keras.metrics.Mean(name="generator_loss")
        self.disc_loss_tracker = keras.metrics.Mean(name="discriminator_loss")
        self.q_loss_tracker = keras.metrics.Mean(name="q_loss")
        self.batch_size = batch_size
        self.d_steps = 5
        self.gp_weight = 100

    @property
    def metrics(self):
        return [self.gen_loss_tracker, self.disc_loss_tracker]

    def compile(self, d_optimizer, g_optimizer,q_optimizer, d_loss_fn, g_loss_fn,q_loss_fn):
        super(ConditionalInFOGAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.q_optimizer = q_optimizer
        self.d_loss_fn = d_loss_fn
        self.g_loss_fn = g_loss_fn
        self.q_loss_fn = q_loss_fn

    def gradient_penalty(self, batch_size, real_images, fake_images , class_label):
        """ Calculates the gradient penalty.

        This loss is calculated on an interpolated image
        and added to the discriminator loss.
        """
        # Get the interpolated image
        alpha = tf.random.normal([batch_size, 1, 1, 1], 0.0, 1.0)
        diff = fake_images - real_images
        interpolated = real_images + alpha * diff

        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            # 1. Get the discriminator output for this interpolated image.
            pred = self.discriminator([interpolated,class_label], training=True)

        # 2. Calculate the gradients w.r.t to this interpolated image.
        grads = gp_tape.gradient(pred, [interpolated])[0]
        # 3. Calculate the norm of the gradients.
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp

    def train_step(self, data):
        # Unpack the data.
        real_images, one_hot_class_labels = data


        for i in range(self.d_steps):
          # Sample random points in the latent space and concatenate the labels.
          # This is for the generator.
          batch_size = tf.shape(real_images)[0]
          random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
          indx = tf.random.uniform(shape=(batch_size,), minval=0, maxval=c_cat_dim, dtype=tf.int32)
          cat_labels = tf.one_hot(indx , c_cat_dim)

          # Decode the noise (guided by labels) to fake images.
          generated_images = self.generator([random_latent_vectors, cat_labels,one_hot_class_labels ])

          # Train the discriminator.
          with tf.GradientTape() as tape:
            self.discriminator.trainable = True
            
            fake_logits = self.discriminator([generated_images,one_hot_class_labels], training=True)
            real_logits = self.discriminator([real_images,one_hot_class_labels], training=True)

            d_cost = self.d_loss_fn(real_img=real_logits, fake_img=fake_logits)
            gp = self.gradient_penalty(batch_size, real_images, generated_images,one_hot_class_labels)
            d_loss = d_cost + gp * self.gp_weight

          # Get the gradients w.r.t the discriminator loss
          d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables)
          # Update the weights of the discriminator using the discriminator optimizer
          self.d_optimizer.apply_gradients(
              zip(d_gradient, self.discriminator.trainable_variables)
          )

        # Sample random points in the latent space.
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        indx = tf.random.uniform(shape=(batch_size,), minval=0, maxval=c_cat_dim, dtype=tf.int32)
        cat_labels = tf.one_hot(indx , c_cat_dim)
        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as g_tape, tf.GradientTape() as qn_tape:
          self.discriminator.trainable = False
          g_tape.watch(self.generator.trainable_variables)
          qn_tape.watch(self.q_network.trainable_variables)

          fake_images = self.generator([random_latent_vectors,cat_labels,one_hot_class_labels])
          gen_img_logits = self.discriminator([fake_images,one_hot_class_labels], training=True)
          

          cat_output = self.q_network([generated_images,one_hot_class_labels], training=True)
          cat_loss = self.q_loss_fn(cat_labels , cat_output)

          g_loss = self.g_loss_fn(gen_img_logits) + cat_loss

        # Get the gradients w.r.t the generator loss
        gen_gradient = g_tape.gradient(g_loss, self.generator.trainable_variables)
        # Update the weights of the generator using the generator optimizer
        self.g_optimizer.apply_gradients(
            zip(gen_gradient, self.generator.trainable_variables)
        )

        qn_gradinet = qn_tape.gradient(cat_loss , self.q_network.trainable_variables)
        self.q_optimizer.apply_gradients(
            zip(qn_gradinet , self.q_network.trainable_variables))
        
        # Monitor loss.
        self.gen_loss_tracker.update_state(g_loss)
        self.disc_loss_tracker.update_state(d_loss)
        self.q_loss_tracker.update_state(cat_loss)
        return {
            "g_loss": self.gen_loss_tracker.result(),
            "d_loss": self.disc_loss_tracker.result(),
            "q_loss": self.q_loss_tracker.result()
        }

In [17]:
def discriminator_loss(real_img, fake_img):
    real_loss = tf.reduce_mean(real_img)
    fake_loss = tf.reduce_mean(fake_img)
    return fake_loss - real_loss


# Define the loss functions for the generator.
def generator_loss(fake_img):
    return -tf.reduce_mean(fake_img)

In [18]:
class GANMonitor(tf.keras.callbacks.Callback):
    def __init__(self, latent_dim=128):
        self.latent_dim = latent_dim

    def on_epoch_end(self, epoch, logs=None):
        # Sample noise for the interpolation.
        _noise = tf.random.normal(shape=(4, latent_dim))
        _label = keras.utils.to_categorical([0,1,2,3], num_classes)
        _label = tf.cast(_label, tf.float32)

        cat_label = keras.utils.to_categorical([0,1,2,3], c_cat_dim)
        cat_label = tf.cast(cat_label, tf.float32)

        # Combine the noise and the labels and run inference with the generator.
        fake_images = self.model.generator.predict([_noise,cat_label,_label])
        fake_images = fake_images * 0.5 + 0.5
        fake_images *= 255.0
        converted_images = fake_images.astype(np.uint8)
        # converted_images = tf.image.resize(converted_images, (256, 256)).numpy().astype(np.uint8)

        
        for i in range(4):
          plt.subplot(2,2,i+1)
          plt.imshow(converted_images[i][:,:,0],cmap='gray')
        plt.show()

        if epoch % 20 == 0 :
          self.model.generator.save(root + 'new model Generator.h5')
          self.model.discriminator.save(root + 'new model Discriminator.h5')
          self.model.q_network.save(root + 'new model q network.h5')


In [19]:
callback = GANMonitor(latent_dim=latent_dim)

In [20]:
d_optimizer=keras.optimizers.Adam(learning_rate=0.0003,beta_1=0.5, beta_2=0.9)
g_optimizer=keras.optimizers.Adam(learning_rate=0.0003,beta_1=0.5, beta_2=0.9)
q_optimizer=keras.optimizers.Adam(learning_rate=0.0003, beta_1=0.5, beta_2=0.9)
cond_infogan = ConditionalInFOGAN(
    discriminator=discriminator, generator=g_model , q_network=q_network, latent_dim=latent_dim
)
cond_infogan.compile(
    d_optimizer=d_optimizer,
    g_optimizer=g_optimizer,
    q_optimizer=q_optimizer,
    g_loss_fn=generator_loss,
    d_loss_fn=discriminator_loss,
    q_loss_fn=keras.losses.CategoricalCrossentropy()
)


In [None]:
cond_infogan.fit(dataset, epochs=500 , callbacks=[callback])
# cond_gan.fit(dataset, epochs=20)

Output hidden; open in https://colab.research.google.com to view.