In [None]:
import pandas as pd
import numpy as np
#import sys
#sys.path.append('..\\AMN_refactor')
import os
import amn
#os.getcwd()
os.chdir("c:\\Users\\tnhoang\\Documents\\refactor_AMN\\AMN_refactor")
from scripts.build_dataset import generate_dataset

Generated "medium_file", which contains condition to simulated new data from experiment dataset

In [None]:
  
file_name = 'iML1515_Paul_280.csv'
save_name = 'Paul_media.csv'


dataset_name = "iML1515_Paul_280"
generation_file = "config/dataset_generation.json"
data_dir = "data"
generate_dataset(dataset_name,generation_file,data_dir,verbose=False)

In [5]:
# E_coli of Paul 
dataset_file_simulated = "/Dataset/iML1515_Paul_280.npz"
dataset_file_experimental = "/Dataset/iML1515_Paul_EXP.npz"
objective = ["BIOMASS_Ec_iML1515_core_75p37M"]

pretrained_model_name = "AMNWt_Paul.keras"
cobra_model_file = "/Dataset/iML1515_Paul_EXP.xml"

batch_size = 7
epochs = 100 
seed = 10

essential_medium_size = 23
essential_medium_level = 15
hidden_layer_size = 500

plot_figure_regression = "figures_reservoir/reservoir_iML1515_Paul.png"
plot_figure_classification = ""

In [None]:
model_dir = "../models/"
model_file_simulated = model_dir + pretrained_model_name

# Unzip the model files
from amn.archive import unzip_folders
unzip_folders(model_dir)

import os 
os.getcwd()

LOAD DATA

In [None]:
import tensorflow as tf
from amn.model.aMNWtModel import AMNWtModel

tf.random.set_seed(seed)
data_dir = "../data"

model_simulated = AMNWtModel(dataset_file=data_dir + dataset_file_simulated, 
                   objective=objective,
                   timestep=4,
                   hidden_dim=50,
                   verbose=True,
                   )

model_simulated.train_test_split(test_size=0.1, random_state=seed)
model_simulated.preprocessing_for_specific_model()


model_experimental = AMNWtModel(dataset_file=data_dir + dataset_file_experimental, 
                   objective=objective,
                   timestep=4,
                   hidden_dim=50,
                   verbose=True,
                   )

model_experimental.train_test_split(test_size=0.1, random_state=seed)
model_experimental.preprocessing_for_specific_model()

In [None]:
# load pretrain model
from amn.model.aMNWtModel import RNNCell
from amn.tools import custom_loss

loss = custom_loss(model_simulated.S, model_simulated.P_out, model_simulated.P_in)

AMNWt_model = tf.keras.models.load_model(model_file_simulated, 
                                          custom_objects={"RNNCell":RNNCell,
                                                          "my_mse":custom_loss(model_simulated.S, 
                                                                               model_simulated.P_out,
                                                                               model_simulated.P_in)}
                                          )

print("R2 :", model_simulated.R2(model_simulated.Y_train, AMNWt_model.predict(model_simulated.X_train)))
print("Q2 :", model_simulated.R2(model_simulated.Y_test, AMNWt_model.predict(model_simulated.X_test)))

In [None]:
# add dense layer and fixed weights

from tensorflow.keras import Model, Input, layers

# Different layer of new model
AMNWt_model.trainable = False

# split intput
inputs = Input((model_experimental.X.shape[1]))
input_dense = inputs[:,essential_medium_size:]  
input_fixed = inputs[:,:essential_medium_size]

In [None]:
# train the combine block with experimental
import numpy as np
from amn.visualize import plot_regression
from sklearn.model_selection import train_test_split
# split intput
inputs = Input((model_experimental.X.shape[1]))
input_dense = inputs[:,essential_medium_size:]  
input_fixed = inputs[:,:essential_medium_size]

X = model_experimental.X * essential_medium_level
Y = np.concatenate((model_experimental.Y, np.zeros((len(model_experimental.Y),3))), axis=1)
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.5, random_state=42)

# create and train ensemble of new model
hidden_layer_sizes = [100, 200, 300, 400, 500] 
models_ensemble = []
prediction_ensemble = []

#ensemble 
for hidden_size in hidden_layer_sizes:
    layer_1 = layers.Dense(hidden_size,
                     activation='relu')
    layer_2 = layers.Dense(model_experimental.X.shape[1] - essential_medium_size,
                     activation='relu')
    
    x = layer_2(layer_1(input_dense))
    y = AMNWt_model(tf.concat([input_fixed, x],1))
    
    new_model = Model(inputs=inputs, 
                    outputs=y)  

    new_model.compile(optimizer='adam',
                loss=loss,
                metrics=None)
    history = new_model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, verbose=0)
    models_ensemble.append(new_model)

    # Use it to predict all fluxes
    pred_all = new_model.predict(X_test)
    pred_growth_rate = tf.linalg.matmul(pred_all[:,:model_simulated.S.shape[1]], 
                        tf.transpose(np.float32(model_simulated.P_out))) 
    
    prediction_ensemble.append([pred_growth_rate])
    R_2 = model_simulated.R2(y_test, pred_all) # pred_all or pred_growth_rate
    print("R² = ", R_2)

# Results
true_growth_rate = y_test[:,0]
pred_gr_mean = np.mean(np.array(prediction_ensemble), axis = 1)
pred_gr_std = np.std(np.array(prediction_ensemble), axis = 1)


plot_regression(pred=pred_gr_mean,
                true=true_growth_rate,
                pred_label="",
                true_label="",
                title="",
                saving_file=None,
                show=True)

In [None]:
# MC-Dropout
n_predict = 50

dropout_layer = layers.Dropout(0.2)  # Adding dropout layer with dropout rate of 0.2
x = layer_1(input_dense)
x = dropout_layer(x, training = True)  # Applying dropout after the first hidden layer
x = layer_2(x)

y = AMNWt_model(tf.concat([input_fixed, x],1))

mc_model = Model(inputs=inputs, 
                outputs=y)  

mc_model.compile(optimizer='adam',
            loss=loss,
            metrics=None)

history = mc_model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, verbose=0)

mc_prediction = []
# Use it to predict all fluxes
for i in range(n_predict):
    pred = mc_model.predict(X_test)
    pred_growth_rate_mc = tf.linalg.matmul(pred[:,:model_simulated.S.shape[1]], 
                    tf.transpose(np.float32(model_simulated.P_out))) 
    mc_prediction.append([pred_growth_rate_mc])

pred_mc_mean = np.mean(np.array(mc_prediction), axis = 1)
pred_mc_std = np.std(np.array(mc_prediction), axis = 1)

R_2 = model_simulated.R2(y_test, pred_mc_mean)
print("R² = ", R_2)


plot_regression(pred=pred_mc_mean,
                true=true_growth_rate,
                pred_label="",
                true_label="",
                title="",
                saving_file=None,
                show=True)

In [None]:
# plot std from mcdrop-out compare to emsemble

plot_regression(pred=pred_mc_mean,
                true=pred_gr_mean,
                pred_label="Monte-Carlo Dropout STD",
                true_label="Ensemble STD",
                title="",
                saving_file=None,
                show=True)

Confirm with cobra

In [None]:
# Split the input
X_dense = X[:,essential_medium_size:]  
X_fixed = X[:,:essential_medium_size]

# Use the dense layers to get the input fluxes for cobra
x_output_dense = layer_2(layer_1(X_dense))
V_in = tf.concat([X_fixed, x_output_dense],1)


import cobra
from sklearn.metrics import r2_score
from amn.run_cobra import run_cobra



cobra_model = cobra.io.read_sbml_model(data_dir + cobra_model_file)

pred_cobra = []
for i in range(V_in.shape[0]):
    # Initialize all the reaction to 0
    inf = {r.id: 0 for r in cobra_model.reactions}
    # Add all the non-nul inputs for the entry i
    for j in range(V_in.shape[1]):
        inf[model_simulated.medium[j]] = float(V_in[i,j]) 
    result = run_cobra(cobra_model,objective,inf)

    pred_cobra.append(result[1])


In [None]:
# Results
R_2 = r2_score(true_growth_rate, pred_cobra)
print("R² = ", R_2)
plot_regression(pred=pred_cobra,
                true=true_growth_rate,
                pred_label="",
                true_label="",
                title="",
                saving_file=plot_figure_regression,
                show=True)