In [1]:
import tensorflow as tf
from tensorflow.keras.layers import LSTM, Dense, Bidirectional, BatchNormalization
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
import numpy as np
import os
import pandas as pd
import numpy as np
sequence_length = 31

In [2]:
class LSTMGenerator(tf.keras.Model):
    def __init__(self, sequence_length, num_nucleotides, num_hidden_units=512):
        super(LSTMGenerator, self).__init__()
        self.lstm1 = Bidirectional(LSTM(num_hidden_units, return_sequences=True))
        self.bn1 = BatchNormalization()
        self.lstm2 = Bidirectional(LSTM(num_hidden_units, return_sequences=True))
        self.bn2 = BatchNormalization()
        self.dense = Dense(num_nucleotides, activation='sigmoid')

    def call(self, inputs):
        x = self.bn1(self.lstm1(inputs))
        x = self.bn2(self.lstm2(x))
        return self.dense(x)

class LSTMDiscriminator(tf.keras.Model):
    def __init__(self, num_hidden_units=512):
        super(LSTMDiscriminator, self).__init__()
        self.lstm1 = Bidirectional(LSTM(num_hidden_units, return_sequences=True))
        self.bn1 = BatchNormalization()
        self.lstm2 = Bidirectional(LSTM(num_hidden_units, return_sequences=True))
        self.bn2 = BatchNormalization()
        self.dense = Dense(1)

    def call(self, inputs):
        x = self.bn1(self.lstm1(inputs))
        x = self.bn2(self.lstm2(x))
        return self.dense(x)

In [None]:
def read_input():
  """ Read the input data """
  data_path = "Mutated_Sequence.csv"
  data = pd.read_csv(data_path)
  
  dataset = data.loc[data['labels'] == "['DUP']"]
  arr = dataset.values
  sequences = arr[:,0]
  
  sequences = [x.upper() for x in sequences]
  
  rna_vocab = {"A":0,
               "C":1,
               "G":2,
               "T":3}
  rev_rna_vocab = {v:k for k,v in rna_vocab.items()}
  return sequences


def one_hot_encode(seq, SEQ_LEN=31):
    mapping = {"A": 0, "C": 1, "G": 2, "T": 3}
    seq2 = [mapping[i] for i in seq]
    encoded = np.zeros((SEQ_LEN, 4))  # Ensure shape consistency
    for i, nucleotide in enumerate(seq2[:SEQ_LEN]):
        encoded[i, nucleotide] = 1
    return encoded

sequences = read_input()


encoded_sequences = np.asarray([one_hot_encode(x) for x in sequences], dtype=np.float32)

print(encoded_sequences.shape)

(3987, 31, 4)


In [4]:
# Create a tf.data.Dataset
batch_size = 32

dataset = tf.data.Dataset.from_tensor_slices(encoded_sequences)

dataset = dataset.shuffle(buffer_size=len(encoded_sequences)).batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)


In [5]:
# Loss functions
def generator_loss(fake_output):
    return -tf.reduce_mean(fake_output)

def discriminator_loss(real_output, fake_output):
    real_loss = -tf.reduce_mean(real_output)  # Want real samples to be classified as positive
    fake_loss = tf.reduce_mean(fake_output)   # Want fake samples to be classified as negative
    return real_loss + fake_loss


# Gradient Penalty (Fixed: Removed Clipping)
def gradient_penalty(real_data, fake_data):
    batch_size = tf.shape(real_data)[0]
    alpha = tf.random.uniform([batch_size, 1, 1], 0.0, 1.0)
    interpolated = alpha * real_data + (1 - alpha) * fake_data

    with tf.GradientTape() as gp_tape:
        gp_tape.watch(interpolated)
        interpolated_output = discriminator(interpolated)

    grads = gp_tape.gradient(interpolated_output, [interpolated])[0]
    grads_norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2]))

    # Corrected WGAN-GP gradient penalty
    return tf.reduce_mean((grads_norm - 1.0) ** 2)

def gc_loss(probabilities, gc_target=0.5):
    """
    Compute GC content regularization loss for generated sequences.

    Args:
        probabilities: Tensor of shape [batch_size, sequence_length, 4] (one-hot encoded)
        gc_target: Target GC content (default: 50%)

    Returns:
        GC content penalty loss
    """
    # One-hot index mapping: [A, T, C, G] → C is index 2, G is index 3
    gc_probs = probabilities[..., 1] + probabilities[..., 2]  # Sum of C and G probabilities

    # Compute mean GC content per sequence
    gc_content = tf.reduce_mean(gc_probs, axis=1)  # Shape: [batch_size]

    # Penalize deviation from the target GC content
    return tf.reduce_mean(tf.abs(gc_content - gc_target))


# Convert probabilities to nucleotide sequences
def probabilities_to_nucleotides(probabilities):
    nucleotides = ['A', 'T', 'C', 'G']
    sequences = []

    for prob_matrix in probabilities:
        prob_matrix = np.array(prob_matrix)
        row_sums = np.sum(prob_matrix, axis=1, keepdims=True)
        row_sums[row_sums == 0] = 1  # Avoid division by zero
        prob_matrix = prob_matrix / row_sums

        sequence = ''.join([nucleotides[np.random.choice(len(nucleotides), p=pos)] for pos in prob_matrix])
        sequences.append(sequence)

    return sequences

In [6]:

@tf.function
def train_step(real_sequences):
    batch_size = tf.shape(real_sequences)[0]
    noise = tf.random.normal([batch_size, sequence_length, num_nucleotides])

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_sequences = generator(noise)
        real_output = discriminator(real_sequences)
        fake_output = discriminator(generated_sequences)

        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)
        gp = gradient_penalty(real_sequences, generated_sequences)
        disc_loss += lambda_gp * gp

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    gen_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    disc_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

    return gen_loss, disc_loss, generated_sequences

In [None]:
import tensorflow as tf
import os
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.schedules import ExponentialDecay

# Hyperparameters
sequence_length = 31
num_nucleotides = 4
num_hidden_units = 512
batch_size = 32  # Increased for better stability
epochs = 150
lambda_gp = 10  # Reduced from 10 to 5 for better balance


# Model and Optimizers
generator = LSTMGenerator(sequence_length, num_nucleotides, num_hidden_units)
discriminator = LSTMDiscriminator(num_hidden_units)
gen_optimizer = Adam(learning_rate=1e-5, beta_1=0.5, beta_2=0.9)
disc_optimizer = Adam(learning_rate=1e-5, beta_1=0.5, beta_2=0.9)

# Training Loop
def train():
    generator = LSTMGenerator(sequence_length, num_nucleotides, num_hidden_units)
    discriminator = LSTMDiscriminator(num_hidden_units)
    
    gen_optimizer = Adam(learning_rate=1e-5, beta_1=0.5, beta_2=0.9)
    disc_optimizer = Adam(learning_rate=1e-5, beta_1=0.5, beta_2=0.9)

    checkpoint_dir = './training_checkpoints-dup'
    checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
    
    checkpoint = tf.train.Checkpoint(generator_optimizer=gen_optimizer,
                                     discriminator_optimizer=disc_optimizer,
                                     generator=generator,
                                     discriminator=discriminator)

    manager = tf.train.CheckpointManager(checkpoint, checkpoint_dir, max_to_keep=5)

    if manager.latest_checkpoint:
        checkpoint.restore(manager.latest_checkpoint)
        print(f"Restored from {manager.latest_checkpoint}")
    else:
        print("Initializing from scratch.")

    gen_loss_list = []
    disc_loss_list = []
    generated_samples = []
    real_samples = []

    start_epoch = int(manager.latest_checkpoint.split('-')[-1]) if manager.latest_checkpoint else 0
    print("Latest checkpoint ", manager.latest_checkpoint)
    print(start_epoch)

    for epoch in range(start_epoch, epochs):
        total_gen_loss = 0.0
        total_disc_loss = 0.0
        num_batches = 0

        for real_sequences_batch in dataset:
            real_sequences_batch = tf.convert_to_tensor(real_sequences_batch, dtype=tf.float32)
            gen_loss, disc_loss, generated_sequences = train_step(real_sequences_batch)

            total_gen_loss += gen_loss
            total_disc_loss += disc_loss
            num_batches += 1

        avg_gen_loss = total_gen_loss / num_batches
        avg_disc_loss = total_disc_loss / num_batches
        gen_loss_list.append(avg_gen_loss)
        disc_loss_list.append(avg_disc_loss)

        generated_probabilities = generated_sequences.numpy()
        nucleotide_sequences = probabilities_to_nucleotides(generated_probabilities)
        real = probabilities_to_nucleotides(real_sequences_batch.numpy())
        generated_samples.append(nucleotide_sequences)
        real_samples.append(real)

        print(f'Epoch {epoch + 1}, Generator Loss: {avg_gen_loss:.4f}, Discriminator Loss: {avg_disc_loss:.4f}')

        if (epoch + 1) % 1 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)
            print(f'Saved checkpoint for epoch {epoch + 1}')


        # Save the generated samples to a text file
        with open('real_samples-dup.txt', 'a') as f:
            for samples in real_samples:
                for sample in samples:
                    f.write(f'{sample}\n')

        with open('generated_samples-dup.txt', 'a') as f:
            for samples in generated_samples:
                for sample in samples:
                    f.write(f'{sample}\n')

train()


Restored from ./training_checkpoints-dup\ckpt-5
Latest checkpoint  ./training_checkpoints-dup\ckpt-5
5
Epoch 6, Generator Loss: 0.0784, Discriminator Loss: 0.0185
Saved checkpoint for epoch 6
Epoch 7, Generator Loss: 0.1199, Discriminator Loss: -0.0341
Saved checkpoint for epoch 7
