(1, 165, 64)
1 sequence

165 time steps (one for each base)

64-dimensional vector at each ste

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split

# ----------------------------
# 1. Load & Preprocess Data
# ----------------------------
base_to_idx = {'A': 0, 'T': 1, 'C': 2, 'G': 3, 'N': 4}
idx_to_base = {v: k for k, v in base_to_idx.items()}

# Function to pad and tokenize
def preprocess_sequence(seq, max_len=165):
    seq = seq.upper()[:max_len]
    seq += 'N' * (max_len - len(seq))
    return [base_to_idx.get(base, 4) for base in seq]

df1 = pd.read_csv('/content/ecoli_mpra_expr.csv')
df2 = pd.read_csv('/content/ecoli_natural50bp_expr (1).csv')
combined_df = pd.concat([df1, df2], ignore_index=True)
combined_df['tokenized'] = combined_df['seq'].apply(preprocess_sequence)

X_seq = np.array(combined_df['tokenized'].tolist())
y_expr = combined_df['expr'].values.reshape(-1, 1)
y_seq = np.expand_dims(X_seq, -1)  # needed for sparse_categorical_crossentropy

# ----------------------------
# 2. Define Model
# ----------------------------
vocab_size = 5
max_len = 165
embedding_dim = 16
latent_dim = 64

input_seq = layers.Input(shape=(max_len,), dtype='int32')
x = layers.Embedding(input_dim=vocab_size, output_dim=embedding_dim)(input_seq)
x = layers.Bidirectional(layers.GRU(64, return_sequences=False))(x)
latent = layers.Dense(latent_dim, activation='relu', name="latent_vector")(x)

expr_pred = layers.Dense(1, name="expression_output")(latent)

x = layers.RepeatVector(max_len)(latent)
x = layers.GRU(64, return_sequences=True)(x)
decoded = layers.TimeDistributed(layers.Dense(vocab_size, activation='softmax'), name="decoder_output")(x)

autoencoder = models.Model(inputs=input_seq, outputs=[decoded, expr_pred])
autoencoder.compile(optimizer='adam',
                    loss={'decoder_output': 'sparse_categorical_crossentropy', 'expression_output': 'mse'},
                    loss_weights={'decoder_output': 1.0, 'expression_output': 1.0},
                    metrics={'decoder_output': 'accuracy'})

# ----------------------------
# 3. Train Model
# ----------------------------
autoencoder.fit(X_seq,
                {'decoder_output': y_seq, 'expression_output': y_expr},
                batch_size=64,
                epochs=20,
                validation_split=0.1)

# ----------------------------
# 4. Create Generator Tools
# ----------------------------
latent_input = tf.keras.Input(shape=(latent_dim,))
x = layers.RepeatVector(max_len)(latent_input)
x = layers.GRU(64, return_sequences=True)(x)
decoder_output = layers.TimeDistributed(layers.Dense(vocab_size, activation='softmax'))(x)
decoder = tf.keras.Model(latent_input, decoder_output)

expr_head = layers.Dense(1)(latent_input)
expression_model = tf.keras.Model(latent_input, expr_head)

def sample_with_temperature(probs, temperature=1.0):
    probs = np.clip(probs, 1e-8, 1.0)
    logits = np.log(probs) / temperature
    exp_logits = np.exp(logits - np.max(logits))
    probs = exp_logits / np.sum(exp_logits)
    return np.random.choice(len(probs), p=probs)

def decode_probs_to_seq(probs, temperature=1.0):
    return ''.join([idx_to_base[sample_with_temperature(p, temperature)] for p in probs])

# ----------------------------
# 5. Generate One Sequence for User Input
# ----------------------------
def generate_sequence_for_expression(target_expr, temperature=0.8, steps=500):
    z = tf.Variable(tf.random.normal((1, latent_dim)), trainable=True)
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.05)

    for step in range(steps):
        with tf.GradientTape() as tape:
            pred = expression_model(z, training=False)
            loss = tf.reduce_mean(tf.square(pred - target_expr))
        grads = tape.gradient(loss, [z])
        optimizer.apply_gradients(zip(grads, [z]))

        if step % 100 == 0 or step == steps - 1:
            print(f"Step {step}: Predicted = {pred.numpy()[0][0]:.4f} | Target = {target_expr:.4f}")

    probs = decoder(z, training=False).numpy()[0]
    decoded_seq = decode_probs_to_seq(probs, temperature)
    final_pred = expression_model(z, training=False).numpy()[0][0]
    return decoded_seq, final_pred

# ----------------------------
# 6. Example Use
# ----------------------------
user_expr = float(input("Enter desired expression value (e.g., 0.8): "))
seq, pred = generate_sequence_for_expression(target_expr=user_expr)
print(f"\n🧬 Generated Sequence: {seq}\nPredicted Expression: {pred:.4f}")

In [None]:
import pickle

# Step 1A: Define the mappings
base_to_idx = {'A': 0, 'T': 1, 'C': 2, 'G': 3, 'N': 4}
idx_to_base = {v: k for k, v in base_to_idx.items()}

# Step 1B: Save them to a .pkl file
with open('vocab_mapping.pkl', 'wb') as f:
    pickle.dump({'base_to_idx': base_to_idx, 'idx_to_base': idx_to_base}, f)

print("✅ Step 1 complete: Vocabulary mappings saved to vocab_mapping.pkl")


In [None]:
# Save the joint model (encoder-decoder + expression head)
autoencoder.save("autoencoder_model.h5")

# Save the decoder (for generating sequence from z)
decoder.save("decoder_model.h5")

# Save the expression prediction model (to estimate expression from z)
expression_model.save("expression_model.h5")

print("✅ Step 2 complete: All models saved as .h5 files")




✅ Step 2 complete: All models saved as .h5 files


In [None]:
import numpy as np
import tensorflow as tf
import pickle
import re

# === Step 1: Load models ===
autoencoder = tf.keras.models.load_model("autoencoder_model.h5", compile=False)
decoder = tf.keras.models.load_model("decoder_model.h5", compile=False)
expression_model = tf.keras.models.load_model("expression_model.h5", compile=False)

# === Step 2: Load vocabulary ===
with open("vocab_mapping.pkl", "rb") as f:
    vocab_data = pickle.load(f)

vocab = vocab_data["base_to_idx"]
idx_to_char = vocab_data["idx_to_base"]

# === Step 3: Decode one-hot to sequence ===
def decode_sequence(one_hot_seq):
    indices = np.argmax(one_hot_seq, axis=-1)
    chars = [idx_to_char.get(i, 'N') for i in indices]
    return ''.join(chars)

# === Step 4: Decode with temperature ===
def decode_sequence_with_temperature(logits, temperature=1.0):
    probs = tf.nn.softmax(logits / temperature, axis=-1).numpy()
    sampled_indices = [np.random.choice(len(p), p=p) for p in probs]
    chars = [idx_to_char.get(i, 'N') for i in sampled_indices]
    return ''.join(chars)

# === Step 5: Clean DNA sequence (remove non-ACGT) ===
def clean_dna_sequence(seq):
    return re.sub(r'[^ACGT]', '', seq)

# === Step 6: Latent optimization for expression ===
latent_dim = autoencoder.get_layer("latent_vector").output.shape[-1]

def generate_sequence_for_expression(target_expr, steps=500, lr=0.05, temps=[0.4, 0.6, 0.8, 1.0]):
    z = tf.Variable(tf.random.normal([1, latent_dim]), trainable=True)
    optimizer = tf.keras.optimizers.Adam(learning_rate=lr)

    for step in range(steps):
        with tf.GradientTape() as tape:
            pred_expr = expression_model(z, training=False)
            loss = tf.reduce_mean(tf.square(pred_expr - target_expr))
        grads = tape.gradient(loss, [z])
        optimizer.apply_gradients(zip(grads, [z]))

        if step % 100 == 0 or step == steps - 1:
            print(f"Step {step}: Predicted = {pred_expr.numpy().squeeze():.4f} | Target = {target_expr:.4f}")

    logits = decoder(z, training=False).numpy().squeeze()

    generated_variants = []
    for temp in temps:
        seq = decode_sequence_with_temperature(logits, temperature=temp)
        pred_expr = expression_model(z, training=False).numpy().squeeze()
        print(f"\n🌡️ Temp {temp:.1f} | Predicted: {pred_expr:.4f} | Sequence: {seq}")
        cleaned = clean_dna_sequence(seq)
        if len(cleaned) == len(seq):  # Keep only valid ACGT sequences
            generated_variants.append((temp, seq, pred_expr))

    return generated_variants

# === Step 7: Run and save results ===
if __name__ == "__main__":
    target_expression = float(input("Enter desired expression value (e.g., 5.0): "))
    results = generate_sequence_for_expression(target_expression)

    output_file = f"generated_sequences_expr_{target_expression:.2f}.txt"
    with open(output_file, "w") as f:
        for temp, seq, pred in results:
            f.write(f"> Temp: {temp:.1f}, Predicted: {pred:.4f}\n{seq}\n\n")

    print(f"\n✅ All sequence variants saved to: {output_file}")


Enter desired expression value (e.g., 5.0): 5
Step 0: Predicted = -0.9703 | Target = 5.0000
Step 100: Predicted = 4.9934 | Target = 5.0000
Step 200: Predicted = 5.0000 | Target = 5.0000
Step 300: Predicted = 5.0000 | Target = 5.0000
Step 400: Predicted = 5.0000 | Target = 5.0000
Step 499: Predicted = 5.0000 | Target = 5.0000

🌡️ Temp 0.4 | Predicted: 5.0000 | Sequence: TAAAAAAGAGTACGACNANACAANTTTANTANACNGATNAAAAGAAAAATAACNNCNTTAAAAAAGACATAGNAANACTGAACAACCGATAGACATGAGAATCAACTCNGNGAAACANACGAAAAAAAGAGANCANATNANACAAANANAAGCAAAACTAAAATA

🌡️ Temp 0.6 | Predicted: 5.0000 | Sequence: GGAAAAAACNGNAACCANATATCNCCACTGCCCGATTAATACNAATGCNAATCCATAAATCAAAACGAGAAANCAGNTAGAAAAGGCGNATAAAAAGAAATGCAAAAAAAATAGGAANACGAGAACNNGTGGCCNTCTAGNAGCAGNGATAGTCNGCCTTAANTA

🌡️ Temp 0.8 | Predicted: 5.0000 | Sequence: NACACTTGCCTAGNGAAAGACCGGTCGCGATTNNGTANCAAGGTAATTGANTNCAGCANCNNCAAGNCNCNTGACANTTTGATNGGNACATAGATACGATAANTAAAGTAAGGTTCTNTTAGAGNTNNGCTAGNATNTTNGATNAAGGCTNGCGNACATTAAGNT

🌡️ Temp 1.0 | Predicted: 5.0000 | Seque

In [None]:
import pandas as pd

In [None]:
# import numpy as np
# import tensorflow as tf
# import pickle
# import re
# import os

# # --- Global Definitions (Assumes models and vocab files are available) ---
# # It's generally good practice to pass models as arguments or ensure they are loaded once
# # when the application starts, rather than repeatedly. For this example, we keep
# # them loaded globally as per your original script's structure.

# # Path to your model and vocabulary files
# # Ensure these files are in the same directory as your script or provide full paths.
# AUTOENCODER_MODEL_PATH = "autoencoder_model.h5"
# DECODER_MODEL_PATH = "decoder_model.h5"
# EXPRESSION_MODEL_PATH = "expression_model.h5"
# VOCAB_MAPPING_PATH = "vocab_mapping.pkl"

# # --- Function to load models ---
# def load_models():
#     """Loads the pre-trained Keras models."""
#     try:
#         autoencoder = tf.keras.models.load_model(AUTOENCODER_MODEL_PATH, compile=False)
#         decoder = tf.keras.models.load_model(DECODER_MODEL_PATH, compile=False)
#         expression_model = tf.keras.models.load_model(EXPRESSION_MODEL_PATH, compile=False)
#         return autoencoder, decoder, expression_model
#     except Exception as e:
#         print(f"Error loading models: {e}")
#         print("Please ensure 'autoencoder_model.h5', 'decoder_model.h5', and 'expression_model.h5' are in the correct directory.")
#         return None, None, None

# # --- Function to load vocabulary ---
# def load_vocabulary():
#     """Loads the vocabulary mapping from a pickle file."""
#     try:
#         with open(VOCAB_MAPPING_PATH, "rb") as f:
#             vocab_data = pickle.load(f)
#         vocab = vocab_data["base_to_idx"]
#         idx_to_char = vocab_data["idx_to_base"]
#         return vocab, idx_to_char
#     except Exception as e:
#         print(f"Error loading vocabulary: {e}")
#         print("Please ensure 'vocab_mapping.pkl' is in the correct directory.")
#         return None, None

# # --- Helper Functions for Sequence Manipulation ---

# def decode_sequence(one_hot_seq, idx_to_char_map):
#     """
#     Decodes a one-hot encoded sequence back into a character string.

#     Args:
#         one_hot_seq (np.array): A NumPy array representing the one-hot encoded sequence.
#         idx_to_char_map (dict): A dictionary mapping index to character.

#     Returns:
#         str: The decoded sequence string.
#     """
#     indices = np.argmax(one_hot_seq, axis=-1)
#     chars = [idx_to_char_map.get(i, 'N') for i in indices] # Use .get with default 'N' for safety
#     return ''.join(chars)

# def decode_sequence_with_temperature(logits, idx_to_char_map, temperature=1.0):
#     """
#     Decodes a sequence from logits using a specified temperature for sampling.

#     Args:
#         logits (tf.Tensor or np.array): Raw prediction scores from the decoder.
#         idx_to_char_map (dict): A dictionary mapping index to character.
#         temperature (float): Controls the randomness of sampling. Higher values
#                              (e.g., 1.0+) lead to more diverse sequences, lower values
#                              (e.g., 0.1-0.5) lead to more conservative sequences.

#     Returns:
#         str: The sampled sequence string.
#     """
#     # Ensure logits are a TensorFlow tensor for softmax
#     if not isinstance(logits, tf.Tensor):
#         logits = tf.convert_to_tensor(logits, dtype=tf.float32)

#     # Apply temperature and softmax to get probabilities
#     # Adding a small epsilon to temperature to prevent division by zero if temp is 0.
#     probs = tf.nn.softmax(logits / (temperature + 1e-8), axis=-1).numpy()

#     # Sample indices based on probabilities
#     sampled_indices = [np.random.choice(len(p), p=p) for p in probs]

#     # Convert indices back to characters
#     chars = [idx_to_char_map.get(i, 'N') for i in sampled_indices]
#     return ''.join(chars)

# def clean_dna_sequence(seq):
#     """
#     Cleans a DNA sequence by removing any characters that are not A, C, G, or T.

#     Args:
#         seq (str): The input sequence string.

#     Returns:
#         str: The cleaned sequence containing only ACGT characters.
#     """
#     return re.sub(r'[^ACGT]', '', seq)

# # --- Core Logic: Latent Optimization and Sequence Generation ---

# def generate_sequence_for_expression(
#     target_expr,
#     autoencoder_model, # Passed as argument
#     decoder_model,     # Passed as argument
#     expression_model_ref, # Passed as argument (renamed to avoid conflict with imported name)
#     idx_to_char_map,   # Passed as argument
#     steps=500,
#     lr=0.05,
#     temps=[0.4, 0.6, 0.8, 1.0]
# ):
#     """
#     Optimizes a latent vector to match a target expression value and
#     then generates diverse DNA sequences using a decoder.

#     Args:
#         target_expr (float): The desired expression value to optimize for.
#         autoencoder_model (tf.keras.Model): The loaded autoencoder model (used to get latent_dim).
#         decoder_model (tf.keras.Model): The loaded decoder model.
#         expression_model_ref (tf.keras.Model): The loaded expression prediction model.
#         idx_to_char_map (dict): Dictionary mapping integer indices back to characters (e.g., 'A', 'C', 'G', 'T').
#         steps (int): Number of optimization steps for the latent vector.
#         lr (float): Learning rate for the Adam optimizer.
#         temps (list): List of temperature values for diverse sequence generation.

#     Returns:
#         list: A list of tuples, where each tuple contains (temperature, generated_sequence, predicted_expression).
#     """
#     # Get latent dimension from the autoencoder's latent layer
#     # Assuming 'latent_vector' is the name of the latent layer in your autoencoder
#     latent_dim = autoencoder_model.get_layer("latent_vector").output.shape[-1]

#     # Initialize the latent vector 'z' randomly
#     z = tf.Variable(tf.random.normal([1, latent_dim]), trainable=True)
#     optimizer = tf.keras.optimizers.Adam(learning_rate=lr)

#     print(f"Starting latent optimization for target expression: {target_expr:.4f}")
#     for step in range(steps):
#         with tf.GradientTape() as tape:
#             # Predict expression for the current latent vector 'z'
#             pred_expr = expression_model_ref(z, training=False)
#             # Calculate mean squared error loss between predicted and target expression
#             loss = tf.reduce_mean(tf.square(pred_expr - target_expr))

#         # Compute gradients of the loss with respect to 'z'
#         grads = tape.gradient(loss, [z])
#         # Apply gradients to update 'z'
#         optimizer.apply_gradients(zip(grads, [z]))

#         # Print progress periodically
#         if step % 100 == 0 or step == steps - 1:
#             print(f"Step {step}: Predicted = {pred_expr.numpy().squeeze():.4f} | Target = {target_expr:.4f} | Loss = {loss.numpy():.6f}")

#     print("\nOptimization complete. Generating sequences...")
#     # Get logits from the decoder using the optimized latent vector 'z'
#     logits = decoder_model(z, training=False).numpy().squeeze()

#     generated_variants = []
#     # Generate sequences at different temperatures
#     for temp in temps:
#         # Decode sequence using the specific temperature
#         seq = decode_sequence_with_temperature(logits, idx_to_char_map, temperature=temp)
#         # Re-predict expression for the optimized 'z' (this value will be consistent across temps)
#         final_pred_expr = expression_model_ref(z, training=False).numpy().squeeze()

#         print(f"\n🌡️ Temp {temp:.1f} | Predicted: {final_pred_expr:.4f} | Sequence: {seq}")

#         # Clean the generated sequence and validate if it's purely ACGT
#         cleaned = clean_dna_sequence(seq)
#         if len(cleaned) == len(seq):  # Keep only valid ACGT sequences
#             generated_variants.append((temp, seq, final_pred_expr))
#         else:
#             print(f"  Warning: Sequence contained non-ACGT characters and was not added.")

#     return generated_variants

# # --- Main execution function ---

# def run_dna_sequence_tool(target_expression_value: float, output_filename: str = None):
#     """
#     Main function to run the DNA sequence generation tool.

#     Args:
#         target_expression_value (float): The desired expression value.
#         output_filename (str, optional): The name of the file to save results.
#                                          If None, a default name is generated.
#     """
#     print("--- Initializing DNA Sequence Generation Tool ---")

#     # Load models
#     autoencoder, decoder, expression_model_loaded = load_models()
#     if autoencoder is None or decoder is None or expression_model_loaded is None:
#         print("Exiting due to model loading error.")
#         return

#     # Load vocabulary
#     vocab, idx_to_char = load_vocabulary()
#     if vocab is None or idx_to_char is None:
#         print("Exiting due to vocabulary loading error.")
#         return

#     # Generate sequences
#     results = generate_sequence_for_expression(
#         target_expr=target_expression_value,
#         autoencoder_model=autoencoder,
#         decoder_model=decoder,
#         expression_model_ref=expression_model_loaded,
#         idx_to_char_map=idx_to_char,
#         steps=500, # Default steps
#         lr=0.05,   # Default learning rate
#         temps=[0.4, 0.6, 0.8, 1.0] # Default temperatures
#     )

#     # Determine output file name
#     if output_filename is None:
#         output_file = f"generated_sequences_expr_{target_expression_value:.2f}.txt"
#     else:
#         output_file = output_filename

#     # Save results to file
#     try:
#         with open(output_file, "w") as f:
#             for temp, seq, pred in results:
#                 f.write(f"> Temp: {temp:.1f}, Predicted: {pred:.4f}\n{seq}\n\n")
#         print(f"\n✅ All valid sequence variants saved to: {os.path.abspath(output_file)}")
#     except Exception as e:
#         print(f"Error saving results to file {output_file}: {e}")

# # Example Usage (if you were to run this as a standalone script)
# if __name__ == "__main__":
#     # This block shows how you would typically call the main function.
#     # In a real interactive environment, you might get this from user input or another part of your application.

#     # Simulate user input for demonstration
#     # You might replace this with a more robust input method in a web app, for example.
#     try:
#         desired_expression = float(input("Enter desired expression value (e.g., 5.0): "))
#         run_dna_sequence_tool(desired_expression)
#     except ValueError:
#         print("Invalid input. Please enter a numerical value for expression.")
#     except Exception as e:
#         print(f"An unexpected error occurred: {e}")

Enter desired expression value (e.g., 5.0): 7.04
--- Initializing DNA Sequence Generation Tool ---
Starting latent optimization for target expression: 7.0400
Step 0: Predicted = 1.0916 | Target = 7.0400 | Loss = 35.383472
Step 100: Predicted = 7.0346 | Target = 7.0400 | Loss = 0.000029
Step 200: Predicted = 7.0400 | Target = 7.0400 | Loss = 0.000000
Step 300: Predicted = 7.0400 | Target = 7.0400 | Loss = 0.000000
Step 400: Predicted = 7.0400 | Target = 7.0400 | Loss = 0.000000
Step 499: Predicted = 7.0400 | Target = 7.0400 | Loss = 0.000000

Optimization complete. Generating sequences...

🌡️ Temp 0.4 | Predicted: 7.0400 | Sequence: GCTTNTNGGTTTGCNCGNGGATGGTGATTCNCNGGNNNTGACCTGTTCGGCGCCTTGAATCTCNNAATAGTCTGACNTGCNTTTNNCAGGAGATNCCAATTTAGNGGAATCTAAAACNTANNCCACTGATTCCATGGGTATNGNGGCNGNNCNTATCTTGGCTNT

🌡️ Temp 0.6 | Predicted: 7.0400 | Sequence: GAGCAGATATTTNNGTNGGGANCCNTATNTCGAGATAATGTTTNCAGGCNGGATCGTTTATTNTNGAGAGGTGCANCGGTAANACAGTCGNGGTGCGCANGTAGCCTATCCGCACTTCAACGATNATCTGGNTCNTNACAGTGGGCATN

In [None]:
# === Updated Autoencoder Training with Transformer Encoder ===

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split

# ----------------------------
# 1. Load & Preprocess Data
# ----------------------------
base_to_idx = {'A': 0, 'T': 1, 'C': 2, 'G': 3, 'N': 4}
idx_to_base = {v: k for k, v in base_to_idx.items()}

def preprocess_sequence(seq, max_len=165):
    seq = seq.upper()[:max_len]
    seq += 'N' * (max_len - len(seq))
    return [base_to_idx.get(base, 4) for base in seq]

df1 = pd.read_csv('/content/ecoli_mpra_expr.csv')
df2 = pd.read_csv('/content/ecoli_natural50bp_expr (1).csv')
combined_df = pd.concat([df1, df2], ignore_index=True)
combined_df['tokenized'] = combined_df['seq'].apply(preprocess_sequence)

X_seq = np.array(combined_df['tokenized'].tolist())
y_expr = combined_df['expr'].values.reshape(-1, 1)
y_seq = np.expand_dims(X_seq, -1)

# ----------------------------
# 2. Transformer Encoder Block
# ----------------------------
class TransformerEncoderBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = tf.keras.Sequential([
            layers.Dense(ff_dim, activation='relu'),
            layers.Dense(embed_dim),
        ])
        self.norm1 = layers.LayerNormalization(epsilon=1e-6)
        self.norm2 = layers.LayerNormalization(epsilon=1e-6)
        self.drop1 = layers.Dropout(rate)
        self.drop2 = layers.Dropout(rate)

    def call(self, inputs, training=None):
        attn_output = self.att(inputs, inputs)
        out1 = self.norm1(inputs + self.drop1(attn_output, training=training))
        ffn_output = self.ffn(out1)
        return self.norm2(out1 + self.drop2(ffn_output, training=training))

# ----------------------------
# 3. Define Model
# ----------------------------
vocab_size = 5
max_len = 165
embedding_dim = 16
latent_dim = 64

input_seq = layers.Input(shape=(max_len,), dtype='int32')
x = layers.Embedding(input_dim=vocab_size, output_dim=embedding_dim)(input_seq)
x = TransformerEncoderBlock(embed_dim=embedding_dim, num_heads=4, ff_dim=latent_dim)(x)
x = layers.GlobalAveragePooling1D()(x)
latent = layers.Dense(latent_dim, activation='relu', name="latent_vector")(x)

expr_pred = layers.Dense(1, name="expression_output")(latent)

x = layers.RepeatVector(max_len)(latent)
x = layers.GRU(64, return_sequences=True)(x)
decoded = layers.TimeDistributed(layers.Dense(vocab_size, activation='softmax'), name="decoder_output")(x)

autoencoder = models.Model(inputs=input_seq, outputs=[decoded, expr_pred])
autoencoder.compile(optimizer='adam',
                    loss={'decoder_output': 'sparse_categorical_crossentropy', 'expression_output': 'mse'},
                    loss_weights={'decoder_output': 1.0, 'expression_output': 1.0},
                    metrics={'decoder_output': 'accuracy'})

# ----------------------------
# 4. Train Model
# ----------------------------
autoencoder.fit(X_seq,
                {'decoder_output': y_seq, 'expression_output': y_expr},
                batch_size=64,
                epochs=20,
                validation_split=0.1)

# ----------------------------
# 5. Save Model & Vocabulary
# ----------------------------
autoencoder.save("autoencoder_model.h5")
encoder = models.Model(autoencoder.input, autoencoder.get_layer("latent_vector").output)

latent_input = tf.keras.Input(shape=(latent_dim,))
x = layers.RepeatVector(max_len)(latent_input)
x = layers.GRU(64, return_sequences=True)(x)
decoder_output = layers.TimeDistributed(layers.Dense(vocab_size, activation='softmax'))(x)
decoder = tf.keras.Model(latent_input, decoder_output)
decoder.save("decoder_model.h5")

expr_head = layers.Dense(1)(latent_input)
expression_model = tf.keras.Model(latent_input, expr_head)
expression_model.save("expression_model.h5")

import pickle
vocab_mapping = {"base_to_idx": base_to_idx, "idx_to_base": idx_to_base}
with open("vocab_mapping.pkl", "wb") as f:
    pickle.dump(vocab_mapping, f)


Epoch 1/20
[1m364/364[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m33s[0m 46ms/step - decoder_output_accuracy: 0.4710 - decoder_output_loss: 1.1386 - expression_output_loss: 10.2037 - loss: 11.3423 - val_decoder_output_accuracy: 0.7755 - val_decoder_output_loss: 0.4464 - val_expression_output_loss: 4.7048 - val_loss: 5.1384
Epoch 2/20
[1m364/364[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 39ms/step - decoder_output_accuracy: 0.5163 - decoder_output_loss: 0.9721 - expression_output_loss: 5.5762 - loss: 6.5483 - val_decoder_output_accuracy: 0.7665 - val_decoder_output_loss: 0.4731 - val_expression_output_loss: 4.5914 - val_loss: 5.0523
Epoch 3/20
[1m364/364[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 39ms/step - decoder_output_accuracy: 0.5255 - decoder_output_loss: 0.9594 - expression_output_loss: 5.5322 - loss: 6.4916 - val_decoder_output_accuracy: 0.7797 - val_decoder_output_loss: 0.4326 - val_expression_output_loss: 4.5777 - val_loss: 4.9990
Epoch 4/20
[1



In [None]:
import numpy as np
import tensorflow as tf
import pickle
import re
from tensorflow.keras import layers

# === Transformer Block for Loading ===
class TransformerEncoderBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
        super().__init__(**kwargs)
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = tf.keras.Sequential([
            layers.Dense(ff_dim, activation='relu'),
            layers.Dense(embed_dim),
        ])
        self.norm1 = layers.LayerNormalization(epsilon=1e-6)
        self.norm2 = layers.LayerNormalization(epsilon=1e-6)
        self.drop1 = layers.Dropout(rate)
        self.drop2 = layers.Dropout(rate)

    def call(self, inputs, training=False):
        attn_output = self.att(inputs, inputs)
        out1 = self.norm1(inputs + self.drop1(attn_output, training=training))
        ffn_output = self.ffn(out1)
        return self.norm2(out1 + self.drop2(ffn_output, training=training))

# === Load Models and Vocab ===
autoencoder = tf.keras.models.load_model(
    "autoencoder_model (1).h5",
    custom_objects={'TransformerEncoderBlock': TransformerEncoderBlock},
    compile=False
)
decoder = tf.keras.models.load_model("decoder_model (1).h5", compile=False)
expression_model = tf.keras.models.load_model("expression_model (1).h5", compile=False)

with open("vocab_mapping (1).pkl", "rb") as f:
    vocab_data = pickle.load(f)
idx_to_char = vocab_data["idx_to_base"]

# === Helper Functions ===
def decode_sequence_with_temperature(logits, temperature=1.0):
    probs = tf.nn.softmax(logits / temperature, axis=-1).numpy()
    sampled_indices = [np.random.choice(len(p), p=p) for p in probs]
    return ''.join([idx_to_char.get(i, 'N') for i in sampled_indices])

def clean_dna_sequence(seq):
    return re.sub(r'[^ACGT]', '', seq)

# === Generation Function ===
latent_dim = autoencoder.get_layer("latent_vector").output.shape[-1]

def run_promoter_generator(target_expr, steps=500, lr=0.05, temps=[0.4, 0.6, 0.8, 1.0]):
    z = tf.Variable(tf.random.normal([1, latent_dim]), trainable=True)
    optimizer = tf.keras.optimizers.Adam(learning_rate=lr)

    for step in range(steps):
        with tf.GradientTape() as tape:
            pred_expr = expression_model(z, training=False)
            loss = tf.reduce_mean(tf.square(pred_expr - target_expr))
        grads = tape.gradient(loss, [z])
        optimizer.apply_gradients(zip(grads, [z]))

    logits = decoder(z, training=False).numpy().squeeze()
    results = []

    for temp in temps:
        seq = decode_sequence_with_temperature(logits, temperature=temp)
        pred_expr = float(expression_model(z, training=False).numpy().squeeze())
        results.append({
    "temperature": temp,
    "predicted_expression": round(pred_expr, 4),
    "sequence": seq
})


    return results

# === Example Usage ===
if __name__ == "__main__":
    target = float(input("Enter desired expression value: "))
    result = run_promoter_generator(target)
    print("result:", result)


Enter desired expression value: 80
result: [{'temperature': 0.4, 'predicted_expression': 79.9556, 'sequence': 'ANCCAACACACCNCGGAAANTCCCACACTAAANGNTNGTATCTTNCACATGANCNNCATGTCAATCAAAAACAGCGCCGACCGAAACCCACACAAAGGAACCAGNCGAATCTCCGCAAACTCCTANTGCACCATCCTTCCCNACTGCGCCACNTANCGCTCACTA'}, {'temperature': 0.6, 'predicted_expression': 79.9556, 'sequence': 'GCATCGGCAATNANGANTAATNANCACTNTCAGGCGACCCTCTCGGTACGGGGCAACACCGTAATACAGGCCCCTCAACTACGTTAAGGATTTNATACGTTACCNGACGAAGANTACAATNTTTGTGTCGCTCGAAGTACANACACGACCCAACGCCTCTCAAAA'}, {'temperature': 0.8, 'predicted_expression': 79.9556, 'sequence': 'CATNCTCNNTCCCATCGCGTATNAGCTCCAAGTACCCCNNCNGCTAGNCGCATGCCANGGGANACNGCNTGNAGTGNGNGNCNCGNNAGNCNNACCACACCGCNTTAATACCCTGNAANNNNAACCAAANTGCTCTCCNTTTANACNTGCTTAACTTANCNNATG'}, {'temperature': 1.0, 'predicted_expression': 79.9556, 'sequence': 'NCNCCANTTAAAANANGNNGNAACTAAATTNCATAGCGGCNNNTNTATGCGNANGTCAGCACGCCGANTAAAGACAGCTTCNTTCGATANGAGCTGATTNNGAATCGTNANCGGANCAAACACAAGCNGTTCCNANTCCTNTNTNGCCTTATTCANANACATACC'}]
