In [None]:
# -----------------------------
# 1. Import Libraries & Setup
# -----------------------------
import tensorflow as tf
import numpy as np
import pandas as pd
from collections import Counter

# -----------------------------
# 2. Data Import and Preprocessing
# -----------------------------
# For demonstration, we attempt to load a CSV file "promoters.csv"
# The CSV is expected to have at least a "sequence" column.
try:
    df = pd.read_csv('promoters.csv')
except Exception as e:
    print("Dataset not found. Generating simulated data.")
    # Simulate a small dataset with DNA sequences
    sequences = ["TATAAAACCGG", "GGCCAATTTGGC", "TATACCGAATTA", "CCGTATAGGCCT"]
    df = pd.DataFrame({'sequence_id': range(len(sequences)), 'sequence': sequences})

# If dataset is empty, simulate data
if df.empty or 'sequence' not in df.columns:
    sequences = ["TATAAAACCGG", "GGCCAATTTGGC", "TATACCGAATTA", "CCGTATAGGCCT"]
    df = pd.DataFrame({'sequence_id': range(len(sequences)), 'sequence': sequences})

# Define the DNA alphabet and helper for one-hot encoding
ALPHABET = ['A', 'C', 'G', 'T']

def one_hot_encode(seq):
    mapping = {nuc: i for i, nuc in enumerate(ALPHABET)}
    encoded = np.zeros((len(seq), len(ALPHABET)), dtype=np.float32)
    for i, nucleotide in enumerate(seq):
        if nucleotide in mapping:
            encoded[i, mapping[nucleotide]] = 1.0
    return encoded

# Pad sequences to ensure uniform length
max_length = max(df['sequence'].apply(len))
def pad_sequence(seq, max_len):
    return seq + 'A' * (max_len - len(seq))  # Pad with 'A' (could use any nucleotide)
df['padded'] = df['sequence'].apply(lambda s: pad_sequence(s, max_length))

# One-hot encode sequences; shape -> (num_samples, max_length, 4)
X = np.array([one_hot_encode(seq) for seq in df['padded']])
print("Dataset shape (samples, seq_length, 4):", X.shape)

# Create a tf.data.Dataset for training
batch_size = 16
train_dataset = tf.data.Dataset.from_tensor_slices(X).shuffle(100).batch(batch_size)

# -----------------------------
# 3. Define GAN Model (Generator & Discriminator)
# -----------------------------
latent_dim = 100         # Dimensionality of noise vector input
seq_length = max_length  # Length of the promoter sequences
num_classes = 4          # Number of nucleotides (A, C, G, T)

# Generator: maps latent noise to a (seq_length, 4) probability distribution per position.
def build_generator():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu', input_dim=latent_dim),
        tf.keras.layers.Dense(seq_length * num_classes, activation='relu'),
        tf.keras.layers.Reshape((seq_length, num_classes)),
        # Using softmax so that each position forms a categorical distribution.
        tf.keras.layers.Activation('softmax')
    ], name="Generator")
    return model

# Discriminator: classifies a (seq_length, 4) sequence as real or fake.
def build_discriminator():
    model = tf.keras.Sequential([
        tf.keras.layers.Flatten(input_shape=(seq_length, num_classes)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ], name="Discriminator")
    return model

generator = build_generator()
discriminator = build_discriminator()

# -----------------------------
# 4. Define Losses and Optimizers
# -----------------------------
cross_entropy = tf.keras.losses.BinaryCrossentropy()

generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

# -----------------------------
# 5. Training Step Definition
# -----------------------------
@tf.function
def train_step(real_sequences):
    batch_size = tf.shape(real_sequences)[0]
    noise = tf.random.normal([batch_size, latent_dim])
    
    # Generate synthetic sequences
    generated_sequences = generator(noise, training=True)
    
    real_labels = tf.ones((batch_size, 1))
    fake_labels = tf.zeros((batch_size, 1))

    # Train the discriminator: maximize probability of correctly classifying real vs fake.
    with tf.GradientTape() as disc_tape:
        real_output = discriminator(real_sequences, training=True)
        fake_output = discriminator(generated_sequences, training=True)
        loss_real = cross_entropy(real_labels, real_output)
        loss_fake = cross_entropy(fake_labels, fake_output)
        disc_loss = loss_real + loss_fake
    gradients_disc = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    discriminator_optimizer.apply_gradients(zip(gradients_disc, discriminator.trainable_variables))

    # Train the generator: try to fool the discriminator.
    with tf.GradientTape() as gen_tape:
        generated_sequences = generator(noise, training=True)
        fake_output = discriminator(generated_sequences, training=True)
        gen_loss = cross_entropy(real_labels, fake_output)
    gradients_gen = gen_tape.gradient(gen_loss, generator.trainable_variables)
    generator_optimizer.apply_gradients(zip(gradients_gen, generator.trainable_variables))
    
    return disc_loss, gen_loss

# -----------------------------
# 6. Training Loop with Checkpoint Saving
# -----------------------------
EPOCHS = 50
checkpoint_generator = "generator_checkpoint.h5"
checkpoint_discriminator = "discriminator_checkpoint.h5"

for epoch in range(1, EPOCHS + 1):
    print(f"Epoch {epoch}/{EPOCHS}")
    for real_batch in train_dataset:
        d_loss, g_loss = train_step(real_batch)
    
    print(f"Epoch {epoch} => Discriminator Loss: {d_loss:.4f}, Generator Loss: {g_loss:.4f}")
    
    # Save model checkpoints every 10 epochs, overwriting the previous state.
    if epoch % 10 == 0:
        generator.save(checkpoint_generator, overwrite=True)
        discriminator.save(checkpoint_discriminator, overwrite=True)
        print(f"Checkpoint saved at epoch {epoch}")

# -----------------------------
# 7. Synthetic Data Generation
# -----------------------------
def generate_synthetic_sequences(num_samples):
    noise = tf.random.normal([num_samples, latent_dim])
    generated = generator(noise, training=False)
    # Convert probabilistic outputs to discrete indices via argmax.
    generated_indices = tf.argmax(generated, axis=-1).numpy()
    
    # Map indices to nucleotides.
    mapping = {0: 'A', 1: 'C', 2: 'G', 3: 'T'}
    synthetic_seqs = []
    for indices in generated_indices:
        seq = ''.join([mapping[i] for i in indices])
        synthetic_seqs.append(seq)
    return synthetic_seqs

# Generate sample synthetic sequences.
synthetic_data = generate_synthetic_sequences(5)
print("Synthetic Sequences Generated:")
for seq in synthetic_data:
    print(seq)

# -----------------------------
# 8. Evaluation: k-mer Distribution Comparison
# -----------------------------
def get_kmers(sequence, k=3):
    return [sequence[i:i+k] for i in range(len(sequence) - k + 1)]

# Compute k-mer frequency for real (using padded sequences) and synthetic data.
real_sequences = df['padded'].tolist()
real_kmer_counts = Counter()
for seq in real_sequences:
    real_kmer_counts.update(get_kmers(seq, k=3))

synthetic_kmer_counts = Counter()
for seq in synthetic_data:
    synthetic_kmer_counts.update(get_kmers(seq, k=3))

print("\nReal Data 3-mer Distribution:")
print(dict(real_kmer_counts))
print("\nSynthetic Data 3-mer Distribution:")
print(dict(synthetic_kmer_counts))

# Further evaluation could include statistical tests such as Chi-square to compare these distributions.
