In [1]:
from utils import *

# Data reading

In [2]:
LABELS = ['microbiano','bacteriano','antigramneg','antigrampos','fungico','viral','cancer']

data_in = pd.read_csv('data/data_sample.csv')
data_in.head()

Unnamed: 0,sequence,antigramneg,antigrampos,bacteriano,cancer,fungico,microbiano,viral
0,CHKKHTVYC,1,1,1,0,1,1,0
1,GLGILFKGAAKGMSALLLLKCA,1,1,1,0,1,1,0
2,ALGIIKGEAGKGLTC,1,1,1,0,1,1,0
3,RKKKPYIIRP,1,1,1,0,1,1,0
4,NNQEGGVIGNGHR,1,1,1,0,1,1,0


In [3]:
data = data_in.drop(LABELS, axis = 'columns')
target = np.array(data_in[LABELS].values,dtype = "float32")

# Preprocessing

In [4]:
# One Hot Encoding
data_ohe = encoding(data_in)

In [5]:
# Create tf.data.Dataset.
dataset = tf.data.Dataset.from_tensor_slices((tf.cast(data_ohe,dtype = tf.float32), target))

# Model architecture

In [6]:
MATRIX_SHAPE = (data_ohe.shape[1],data_ohe.shape[2], 1)

# Size of the noise vector
LATENT_DIM = 200

# Set the number of epochs for trainining.
EPOCHS = 5 #100
BATCH_SIZE = 256

NUM_CHANNELS = 1
NUM_CLASSES = 7
MATRIX_SIZE = (MATRIX_SHAPE[0],MATRIX_SHAPE[1])


GENERATOR_IN_CHANNELS = LATENT_DIM + NUM_CLASSES
DISCRIMINATOR_IN_CHANNELS = NUM_CHANNELS + NUM_CLASSES

dataset = dataset.shuffle(buffer_size=1024).batch(BATCH_SIZE)

In [7]:
def create_generator_model():
  
  model = Sequential(name='generator')
  model.add(Input(shape=(GENERATOR_IN_CHANNELS,)))
  model.add(Reshape((1,GENERATOR_IN_CHANNELS)))
  model.add(Bidirectional(layers.GRU(np.prod(MATRIX_SHAPE), return_sequences=True)))
  model.add(Bidirectional(layers.GRU(np.prod(MATRIX_SHAPE))))
  model.add(Dense(np.prod(MATRIX_SHAPE),activation='tanh'))
  model.add(  Reshape((MATRIX_SHAPE[0],MATRIX_SHAPE[1],1))  )
  model.summary()
  return model

# CREATE DISCRIMINATOR
def create_discriminator_model():

  d_model = Sequential(name='discriminator')
  d_model.add(Input(shape=(MATRIX_SHAPE[0], MATRIX_SHAPE[1], DISCRIMINATOR_IN_CHANNELS)))
  d_model.add(  Reshape(  (1, MATRIX_SHAPE[0] * MATRIX_SHAPE[1] * DISCRIMINATOR_IN_CHANNELS  )  )   )
  d_model.add(Bidirectional(layers.GRU(np.prod(MATRIX_SHAPE))))
  d_model.add(Dense(1, activation='sigmoid'))
  d_model.summary()
  return d_model

class ConditionalGAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim, num_classes):
        super(ConditionalGAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.num_classes = num_classes
        self.gen_loss_tracker = keras.metrics.Mean(name="generator_loss")
        self.disc_loss_tracker = keras.metrics.Mean(name="discriminator_loss")

    @property
    def metrics(self):
        return [self.gen_loss_tracker, self.disc_loss_tracker]

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(ConditionalGAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn


    


    def generate_class(self, n_gens,classes):
      conditional_vector = []
      for label in LABELS:
        conditional_vector.append(1 if (label in classes) else 0)
      conditional_vector
      pred_gen = []
      for i in range(n_gens):
          lantent_dim_conditional_vector = np.concatenate(  (np.random.normal(0, 1, self.latent_dim) ,np.array(conditional_vector) )   ).reshape(1,-1)
          gen_imgs = self.generator.predict(  lantent_dim_conditional_vector  )
          output_sequence = ohe.inverse_transform(escalon_matrix(gen_imgs[0,:,:,0]))
          output_sequence_str = ""
          for i in output_sequence:
              if str(i[0]) == '_':
                #pass
                break
              else:
                output_sequence_str += str(i[0])
          pred_gen.append(output_sequence_str)
      df_gen = pd.DataFrame({"sequence":pred_gen})
      return df_gen

    def generate_class_random(self, n_gens):
      pred_gen = []
      for i in range(n_gens):
          lantent_dim_conditional_vector = np.concatenate(  (np.random.normal(0, 1, self.latent_dim) , np.random.randint(2, size=len(LABELS)) )   ).reshape(1,-1)
          gen_imgs = self.generator.predict(  lantent_dim_conditional_vector  )
          output_sequence = ohe.inverse_transform(escalon_matrix(gen_imgs[0,:,:,0]))
          output_sequence_str = ""
          for i in output_sequence:
              if str(i[0]) == '_':
                #pass
                break
              else:
                output_sequence_str += str(i[0])
          pred_gen.append(output_sequence_str)
      df_gen = pd.DataFrame({"sequence":pred_gen})
      return df_gen


    def train_step(self, data):
        # Unpack the data.
        real_matrix, one_hot_labels = data

        # Add dummy dimensions to the labels so that they can be concatenated with
        # the matrixs. This is for the discriminator.
        matrix_one_hot_labels = one_hot_labels[:, :, None, None]
        matrix_one_hot_labels = tf.repeat(
            matrix_one_hot_labels, repeats=[MATRIX_SIZE[0] * MATRIX_SIZE[1]]
        )
        matrix_one_hot_labels = tf.reshape(
            matrix_one_hot_labels, (-1, MATRIX_SIZE[0], MATRIX_SIZE[1], NUM_CLASSES)
        )

        # Sample random points in the latent space and concatenate the labels.
        # This is for the generator.
        batch_size = tf.shape(real_matrix)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        random_vector_labels = tf.concat(
            [random_latent_vectors, one_hot_labels], axis=1
        )

        # Decode the noise (guided by labels) to fake matrixs.
        generated_matrix = self.generator(random_vector_labels)

        # Combine them with real matrixs. Note that we are concatenating the labels
        # with these matrixs here.
        fake_matrix_and_labels = tf.concat([generated_matrix, matrix_one_hot_labels], -1)
  
        #real_matrixs = tf.cast(real_matrixs,dtype = tf.float32)
        real_matrix_and_labels = tf.concat([real_matrix, matrix_one_hot_labels], -1)
        combined_matrixs = tf.concat(
            [fake_matrix_and_labels, real_matrix_and_labels], axis=0
        )

        # Assemble labels discriminating real from fake matrixs.
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )

        # Train the discriminator.
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_matrixs)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space.
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        random_vector_labels = tf.concat(
            [random_latent_vectors, one_hot_labels], axis=1
        )

        # Assemble labels that say "all real matrixs".
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            fake_matrixs = self.generator(random_vector_labels)
            fake_matrix_and_labels = tf.concat([fake_matrixs, matrix_one_hot_labels], -1)
            predictions = self.discriminator(fake_matrix_and_labels)
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

        # Monitor loss.
        self.gen_loss_tracker.update_state(g_loss)
        self.disc_loss_tracker.update_state(d_loss)
        return {
            "g_loss": self.gen_loss_tracker.result(),
            "d_loss": self.disc_loss_tracker.result(),
        }


# Model training

In [8]:
cond_gan = ConditionalGAN(
    discriminator=create_discriminator_model(), generator=create_generator_model(), latent_dim= LATENT_DIM, num_classes=NUM_CLASSES
  )
cond_gan.compile(
  d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
  g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
  loss_fn=keras.losses.BinaryCrossentropy(from_logits=False),
)

#Tensorboard
#TENSORBOARD_LOG_DIR = "data/logs/tensorboard/"+EXPERIMENT_NAME+"_" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
#tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=TENSORBOARD_LOG_DIR, histogram_freq=1)


cond_gan.fit(dataset, epochs = EPOCHS)#,callbacks=[tensorboard_callback])


Model: "discriminator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 reshape (Reshape)           (None, 1, 5880)           0         
                                                                 
 bidirectional (Bidirectiona  (None, 1470)             29180970  
 l)                                                              
                                                                 
 dense (Dense)               (None, 1)                 1471      
                                                                 
Total params: 29,182,441
Trainable params: 29,182,441
Non-trainable params: 0
_________________________________________________________________
Model: "generator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 reshape_1 (Reshape)         (None, 1, 207)            0         
                      

<keras.callbacks.History at 0x217e629c9a0>

# Generate Sample

In [9]:
cond_gan.generate_class_random(10)

Unnamed: 0,sequence
0,GRFKPLKNVRIGLC
1,IK
2,KFKPLYFRYGSYVVKPAQ
3,GFEDLPAGKVEWLWMMIPAAQKAMGD
4,IFR
5,DALTLGLFGVGQ
6,MKH
7,MSLKLWLRVQKLMCAW
8,IPKKLWYRWS
9,CK
