In [1]:
!git clone https://github.com/AvonYangXX1/AMPLify-Feedback.git
!pip install Levenshtein
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import layers
import numpy as np
import math
import matplotlib.pyplot as plt
import statistics
from Levenshtein import distance as lev
import pandas as pd
import seaborn as sns

import warnings
with warnings.catch_warnings():
    warnings.simplefilter(action='ignore', category=FutureWarning)

fatal: destination path 'AMPLify-Feedback' already exists and is not an empty directory.


In [None]:
generator = tf.keras.models.load_model('AMPLify-Feedback/model_weights/PeptideGenerator_new.keras')

In [None]:
generator.summary()

In [None]:
def create_oracle():
    inputs0 = tf.keras.layers.Input((190,43),name="SeqInput")
    inputs1 = tf.keras.layers.Input((326,),name="StateInput")
    x = tf.keras.layers.Conv1D(128, 5, activation='relu', name="Conv1D_0")(inputs0) # kernel_size=5 works well
    x = tf.keras.layers.Conv1D(128, 5, activation='relu', name="Conv1D_1")(x) # Just two layers work better
    x = tf.keras.layers.Flatten(name="Flatten_0")(x)
    x = tf.keras.layers.Dense(512, activation="relu", name="LearnSeqDense_0")(x)
    x = tf.keras.layers.Concatenate(axis=1, name="Concat")([x, inputs1])
    x = tf.keras.layers.Dense(1024, activation="relu", name="LearnConcatDense_0")(x)
    x = tf.keras.layers.LayerNormalization(name="LayerNorm_0")(x)
    x = tf.keras.layers.Dense(512, activation="relu", name="LearnConcatDense_1")(x)
    x = tf.keras.layers.LayerNormalization(name="LayerNorm_1")(x)
    x = tf.keras.layers.Dense(1, activation="linear", name="Output")(x)
    model = tf.keras.models.Model([inputs0, inputs1], x, name="MICPredictor")
    return model

In [None]:
oracle = create_oracle()
path = "AMPLify-Feedback/model_weights/MICPredictor"
for i, layer in enumerate(oracle.layers):
    weights = np.load(f"{path}/layer_{i}_weights.npy", allow_pickle=True)
    layer.set_weights(weights)

In [None]:
oracle.summary()

In [None]:
aa_vocal = np.load("AMPLify-Feedback/model_weights/SeqTV_vocal.npy")
pep_decoder = tf.keras.layers.StringLookup(vocabulary=aa_vocal[1:], invert=True, oov_token='')
species_vocal = np.load("AMPLify-Feedback/model_weights/SpeciesTV_vocal.npy")
species_decoder = tf.keras.layers.StringLookup(vocabulary=species_vocal[1:], invert=True, oov_token='')

In [None]:
def generate_sequences(generator, latent_dim, num_sequences):
    noise = (np.random.rand(num_sequences, latent_dim)-0.5)*2
    generated_sequences = generator.predict(noise, verbose=0)
    return onehot2seq(generated_sequences)

In [None]:
def onehot2seq(onehot):
  decoded_sequences = []
  for s in onehot:
    chars_array = pep_decoder(tf.math.argmax([s], axis=2)).numpy().astype('str')
    decoded_sequences += ["".join(chars) for chars in chars_array]
  return decoded_sequences

In [None]:
def coordinates_grid(num_bins) :
  labels = np.round(np.linspace(-1, 1, num_bins), 2)
  noise = []
  for x in labels:
    for y in labels:
        noise.append([x, y])
  noise = np.array(noise)
  num_sequences = len(noise)
  return noise, num_sequences, labels

In [None]:
def hallucination_mapping(num_bins, target):
  #Search coordinates in the noise plane
  noise, num_sequences, labels = coordinates_grid(num_bins)
  # Look up the index of the target bacteria
  bacteria = np.zeros(shape=(num_sequences, 326))
  index = np.where(species_vocal==target)[0][0]
  bacteria[:, index] = 1
  return noise, num_sequences, bacteria, labels

In [None]:
def visualization(noise, MIC, generated_sequences, num_bins, labels, num_sequences, iter, target):
    vis_data = np.concatenate([noise, np.array(generated_sequences).reshape(num_sequences, 1), MIC], axis=1)
    vis_data = pd.DataFrame(vis_data, columns=["Noise_1", "Noise_2", "Peptide", "MIC"])
    vis_data['MIC'] = vis_data['MIC'].astype('float64')
    vis_data['Noise_1'] = vis_data['Noise_1'].astype('float64')
    vis_data['Noise_2'] = vis_data['Noise_2'].astype('float64')
    vis_data['Noise_1_bins'] = pd.cut(vis_data['Noise_1'], bins=num_bins, labels=labels)
    vis_data['Noise_2_bins'] = pd.cut(vis_data['Noise_2'], bins=num_bins, labels=labels)

    #Print the sequence of the top 3 peptides and their coordinates
    top_sequence = vis_data.sort_values(by=['MIC'], ascending=True).head(1)
    top_sequence.reset_index(drop=True)
    print(top_sequence[['Peptide', 'MIC', 'Noise_1', 'Noise_2']])

    # Group the data by the bins and calculate the mean MIC
    grouped = vis_data.groupby(['Noise_1_bins', 'Noise_2_bins'])
    grid_mic_mean = grouped['MIC'].mean().reset_index()

    # Pivot the results to create a grid that `sns.heatmap` can visualize
    grid_mic_mean_pivot = grid_mic_mean.pivot(index='Noise_1_bins', columns='Noise_2_bins', values='MIC')
    plt.figure()
    h = sns.heatmap(grid_mic_mean_pivot.transpose().iloc[::-1],
                cmap=sns.cubehelix_palette(as_cmap=True),
                cbar_kws={'label': 'Log2 MIC'})
    h.set(xlabel="Noise Dim 1", ylabel="Noise Dim 2", title=f"MIC Landscape for {target.split('_')[0]} {target.split('_')[1]} at iteration {iter}")
    #plt.scatter(top_sequence.Noise_1.item(), top_sequence.Noise_2.item(), s=100, c='red', marker='o')
    plt.savefig(f"{target}_{iter}.png", dpi=200, bbox_inches='tight')
    plt.show()


In [None]:
@tf.function
def compute_gradient(noise, oracle, bacteria, generator_optimizer):
    noise = tf.convert_to_tensor(noise, np.float64)
    with tf.GradientTape() as gen_tape:
        gen_tape.watch(noise)
        generated_onehot = generator(noise, training=True)
        generator_loss = tf.reduce_mean(oracle([generated_onehot, bacteria]))
    gradients_of_generator = gen_tape.gradient(generator_loss, generator.trainable_variables)
    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    return generated_onehot

In [None]:
def RL_loop(generator, oracle, aa_vocal, pep_decoder, n_bins, target):
  average_mic_train, min_mic_train, max_mic_train, median_mic_train, levenstein  = [], [], [], [], []
  generator_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
  for i in range (n_iter_max) :
    #Sequences prediction and gradient computation
    noise, num_sequences, bacteria, labels = hallucination_mapping(n_bins, target)
    generated_onehot = compute_gradient(noise, oracle, bacteria, generator_optimizer)
    generated_sequences = onehot2seq(generated_onehot)

    #Oracle results and statistics
    MIC = oracle([generated_onehot, bacteria])
    average_mic_train += [np.mean(MIC)]
    min_mic_train += [np.min(MIC)]
    max_mic_train += [np.max(MIC)]
    median_mic_train += [np.median(MIC)]
    levenstein += [variability_metrics(generated_sequences, aa_vocal, pep_decoder)]

    #Visualization of the hallucination map and the top sequences
    print(f"Iter {i+1}/{n_iter_max}; Average MIC {np.mean(MIC):.4f}; Levenshtein {variability_metrics(generated_sequences, aa_vocal, pep_decoder):.4f}")
    visualization(noise, MIC, generated_sequences, n_bins, labels, num_sequences, i, target)

  return average_mic_train, min_mic_train, max_mic_train, median_mic_train, levenstein

In [None]:
def variability_metrics(sequences, aa_vocal, pep_decoder):
  total_lev = 0
  index = 0
  for s1 in range(len(sequences)):
                for s2 in range(s1+1,len(sequences)):
                        total_lev += lev(sequences[s1],sequences[s2])
                        index += 1
  return total_lev/index

In [None]:
def display(parameters):
  for p in list(parameters.keys()):
    plt.plot(range(len(parameters[p])), parameters[p], linestyle='-', label = p)
  plt.legend(loc = "upper right")
  plt.ylabel("Value")
  plt.xlabel("Iteration index")
  plt.title("Performances with the feedback loop")
  plt.savefig(f"{list(parameters.keys())[0]}.png", dpi=200, bbox_inches='tight')
  plt.show()

In [None]:
latent_dim = 2
n_iter_max = 70
n_bins = 101
target = 'Bacillus_subtilis'

average_mic_train, min_mic_train, max_mic_train, median_mic_train, levenstein = RL_loop(generator, oracle, aa_vocal, pep_decoder, n_bins, target)
display({"Average MIC" : average_mic_train, "Minimum MIC" : min_mic_train, "Maximum MIC" : max_mic_train, "Median MIC" : median_mic_train})
display({"Average Levenstein distance within the prediction" : levenstein})
generate_sequences(generator, latent_dim, 10)

In [None]:
generator.save("PeptideGenerator_new.keras")
#generator.save("/content/AMPLify-Feedback-main/model_weights/PeptideGenerator_new.keras")