# Helper Functions for BayesOpt

In [None]:
# for running in colab
#pip install cobra

In [None]:
# for running in colab
#pip install botorch

In [None]:
# imports
import torch
from botorch.models import SingleTaskGP, ModelListGP
from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood

import random # for initial data
import numpy as np
import json
import copy # to be able to do deep copies

In [None]:
# to enable GPU processing
if torch.cuda.is_available():
    #print(f"CUDA is available. Number of devices: {torch.cuda.device_count()}")
    # If you have multiple GPUs, specify the desired device ordinal:
    device = torch.device(f"cuda:0")  # Use GPU 0
else:
    #print("CUDA is not available. Using CPU.")
    device = torch.device("cpu")  

tkwargs = {'device': device, 'dtype': torch.double}
# output 'dtype': torch.float64 bc. in PyTorch double & float64 are equivalent
#print(tkwargs)

## Function Definitions

In [None]:
def calc_cost_tot(costs, medium):
    """
    calculates the total cost for a given medium composition
    
    PARAMETERS
    * costs - dictionary - cost for each medium component
    * medium - dictionary - medium composition (component and amount)
    
    RETURNS
    * a tensor containing the total cost of the medium
    """
    
    cost_tot = sum(concentration * costs[key] for key, concentration in medium.items())
    cost_tot_tensor = torch.tensor([cost_tot], dtype=torch.double).to(**tkwargs) # ensure it is on the device previously decided
    
    return cost_tot_tensor

In [None]:
def generate_initial_data(
    MetModel, medium, bounds, costs, 
    n_samples = 5, opt_objective = "growth-cost", 
    biomass_objective = None, production_objective = None
    ):
    """
    Creates initial data points needed to start Bayesian Optimisation
    * randomly creates media compositions within the concentration boundaries.
    * for each medium composition calculates total cost
    * for each performs FBA finding the optimal growth rate
    * stores all in lists

    PARAMETERS
    * MetModel - COBRApy model - the metabolic model to be evaluated
    * medium - dictionary - the medium composition of that model; 
        if not provided defaults to default medium provided by CobraPy
    * bounds - dictionary - upper and lower bounds for the values the medium components 
        are allowed to take, determines the search space;
    * costs - dictionary - the (monetary) cost of each component
    * n_samples - integer - how many random media compositions are to be created
    * opt_objective - string - indicates which combination of objectives is subject to optimisation
    * biomass_objective - string - the id of the biomass reaction to be optimised
    * production_objective - string - the id of the production reaction to be optimised
    
    RETURNS
    * initial_para - list of dictionaries - random medium compositions
    * initial_grwoth - tensor - corresponding growth rates
    * initial_production - tensor - corresponding production rates
    * initial_cost - tensor - corresponding medium costs
    """
    
    # assert that the objective is one of the possibilities
    opt_objective_types = ['growth-cost', 'growth-production', 'production-cost', 'growth-production-cost']
    if opt_objective not in opt_objective_types:
        raise ValueError(f"Invalid objective. Expected one of: {opt_objective_types}")
    # assert that a biomass_objective has been given
    if biomass_objective is None:
        raise ValueError("Please specifiy the biomass objective.")
    
    
    '''INITIALISATION'''
    initial_para = []
    initial_growth = []
    initial_production = []
    initial_cost = []
    
    '''GENERATE'''
    for i in range(n_samples):
        # generate random medium parameters within bounds
        random_medium = {} # empty dictionary
        for key in medium.keys():
            lower_bound, upper_bound = bounds[key]
            # Randomly choose a concentration within the provided bounds
            random_medium[key] = random.uniform(lower_bound, upper_bound)
        
        # Update the model's medium with the randomly generated medium
        MetModel.medium = random_medium

        # caclulate total cost
        cost_tot = calc_cost_tot(costs, random_medium)
            
        '''FBA'''
        FBA_solution = MetModel.optimize() # run FBA
        # extract growth rate
        growth = FBA_solution.fluxes[biomass_objective]
        
        # if growth is NaN or smaller than zero, set to zero
        if (np.isnan(growth) or (growth < 0)):
            growth = 0
        
        if (opt_objective == "growth-cost"):
            production = -1
        else: #(opt_objective == "growth-production" or opt_objective == "growth-production-cost")
            # assert that a production_objective has been given
            if production_objective is None:
                raise ValueError("Please specifiy the production objective.")
            # extract production rate
            production = FBA_solution.fluxes[production_objective]
            # if growth is NaN or smaller than zero, set to zero
            if (np.isnan(production) or (production < 0)):
                production = 0 

        # Store the parameters (random medium), total cost, and growth in respective lists
        initial_para.append(random_medium)
        initial_cost.append(cost_tot)
        initial_growth.append(growth)
        initial_production.append(production)
    
    return(
        initial_para,
        torch.tensor(initial_growth, dtype=torch.double).to(**tkwargs),
        torch.tensor(initial_production, dtype=torch.double).to(**tkwargs),
        torch.tensor(initial_cost, dtype=torch.double).to(**tkwargs) 
        )  

In [None]:
def initialise_model(
        medium_tensors_stacked,
        growth_tensors, 
        opt_objective = "growth-cost", 
        cost_tensors = None, 
        production_tensors = None
        ):
    
    """
    Initialises the BO Model using all tried medium compositions and using cost and growth as objectives;
    Will estimate the cost function f

    https://botorch.org/tutorials/constrained_multi_objective_bo
    " We use a multi-output SingleTaskGP to model the two objectives with
    a homoskedastic Gaussian likelihood with an inferred noise level"


    PARAMETERS
    * medium_tensors_stacked - tensor - all previously evaluated medium compositions
    * growth_tensor - tensor - corresponding growth rates
    * opt_objective - string - indicates which combination of objectives is subject to optimisation
    * cost_tensor - tensor - corresponding medium costs
    * production_tensor - tensor - corresponding production rates  

    RETURNS
    * mll - SumMarginalLikelihoo of the model 
    * model - list of botorch models - List of SingleTaskGP models
    """

    # assert that the objective is one of the possibilities
    opt_objective_types = ['growth-cost', 'growth-production', 'production-cost', 
                           'growth-production-cost']
    if opt_objective not in opt_objective_types:
        raise ValueError(f"Invalid objective. Expected one of: {opt_objective_types}")

    #print("GROWTH:", growth_tensors, "COSTS:", cost_tensors, "PRODUCTION:", production_tensors, sep = "\n")
    
    if opt_objective == "growth-cost":
        # combine growth and cost tensors into a single tensor
        objective_data = torch.cat((growth_tensors.view(-1,1), cost_tensors.view(-1,1)), dim = -1)

    elif opt_objective == "growth-production":
        # combine growth and production tensors into a single tensor
        objective_data = torch.cat((growth_tensors.view(-1,1), production_tensors.view(-1,1)), dim = -1)
    
    elif opt_objective == "production-cost":
        # combine production and cost tensors into a single tensor
        objective_data = torch.cat((production_tensors.view(-1,1), cost_tensors.view(-1,1)), dim = -1)

    elif opt_objective == "growth-production-cost":        
        # combine growth, production and cost tensors into a single tensor
        objective_data = torch.cat((growth_tensors.view(-1,1), production_tensors.view(-1,1), cost_tensors.view(-1,1)), dim = -1)


    models = [] # initialise empty list
    # print("OBJECTIVE SHAPE: ", objective_data.shape[-1])
    for i in range(objective_data.shape[-1]): # in range(2 or 3), three "columns" - so for each column
        train_objective = objective_data[:, i] # the column - each being one objective (growth and cost)
    
        # train a model for the chosen objective and append it to the models list
        models.append(
            SingleTaskGP(medium_tensors_stacked, train_objective.unsqueeze(-1)).to(**tkwargs)
        ) # could add argument outcome_transform=Standardize(m=1)

    model = ModelListGP(*models)
    # likelihood of the GP
    mll = SumMarginalLogLikelihood(model.likelihood, model)
    # returns SumMarginalLogLikelihood and model
    return mll, model       

In [1]:
def convert_to_dict(candidate_tensor, keys):
    """
    Converts the tensor representation of a medium back to a dictionary

    PARAMETERS
    * candidate_tensor - tensor - values of the medium composition stored in a tensor
    * keys - list - keys corresponding to all possible medium components

    RETURNS
    * candidate_dict - dictionary - a dictionary containing medium components as keys and their amount as values
    """
    
    # Squeeze the tensor to remove extra dimensions if necessary
    candidate_values = candidate_tensor.squeeze().tolist()
    
    # Create a dictionary by pairing the keys with the corresponding values from the tensor
    candidate_dict = {key: value for key, value in zip(keys, candidate_values)}
    
    return candidate_dict

In [None]:
def convert_normalise_media(bounds, medium_list):
    """
    Converts a dictionary of bounds for medium component amounts into a 2D tensor and
    subsequently stacks it into a 1D tensor.
    Converts a list of medium compositions (each a dictionary) into a tensor.
    0-1-Normalises each medium composition using the bounds.

    PARAMETERS
    * bounds - dictionary - upper and lower bound for medium components
    * medium_list - a list of medium compositions in dictionary form - all medium compositions to be
    transformed and normalised;
    -> both have to have identical keys

    RETURNS
    * bounds_tensors_stacked - tensors - 1D tensor representation of bounds
    * medium_tensors_normalised - tensors - normalised tensor representation of medium_list
    """
    # convert bounds from dictionary to tensor
    bounds_tensor = torch.tensor(list(bounds.values()), dtype=torch.double).to(**tkwargs) # [x, 2]
    
    # Stack the lower and upper bounds to match the expected format
    bounds_tensors_stacked = torch.stack([bounds_tensor[:, 0], bounds_tensor[:, 1]], dim=0)

    # normalise medium composition
    medium_tensors_normalised = [] # initialise empty list
    for m in range(len(medium_list)):
        # transform current medium to tensor
        medium_m = medium_list[m]
        medium_m_tensor = torch.tensor(list(medium_m.values()), dtype=torch.double).to(**tkwargs) # [x]

        # normalise medium composition
        normalised_medium_m = normalize(medium_m_tensor, bounds_tensors_stacked)
        # Append the normalized tensor to the list
        medium_tensors_normalised.append(normalised_medium_m)
    
    return(bounds_tensors_stacked, medium_tensors_normalised)

In [None]:
def normalise_1Dtensors(tensors):
    """
    Takes a 1-dimensional tensor and performs min-max normalisation.
    """
    min_vals = tensors.min(dim=0, keepdim=True).values
    max_vals = tensors.max(dim=0, keepdim=True).values
    if (max_vals == min_vals):
        return (tensors - min_vals) # i.e. zero
    else:
        return (tensors - min_vals) / (max_vals - min_vals)

In [None]:
def JSON_serialize_objective(objective):
    """
    Converts a cobrapy model objective into a dictionary
    This enables to save it within a json file
    """
    # Extract the reaction IDs and their coefficients in the objective
    objective_data = {
        "reactions": [],
        "coefficients": [],
        "direction": "max" if objective.direction == "max" else "min"
    }
    
    # Populate the reactions and coefficients
    for reaction, coefficient in objective.expression.as_coefficients_dict().items():
        objective_data["reactions"].append(reaction.name)
        objective_data["coefficients"].append(float(coefficient))  # Ensure coefficients are JSON-serializable

    return objective_data

In [None]:
def JSON_serialize_store_results(
        BayesOpt_result, 
        basename = "basename"
        ):
    """
    saves the output of BayesOpt_MOBO (a dictionary) in a json file
    """
    results_json_comp = copy.deepcopy(BayesOpt_result) # make deep copy so that results stays unaffected
    # Convert the tensors to lists for JSON serialisation
    results_json_comp["growth rate tensors"] = results_json_comp["growth rate tensors"].tolist()
    results_json_comp["growth rate list"] = results_json_comp.pop("growth rate tensors") # change name of dictionary key
    results_json_comp["cost tensors"] = results_json_comp["cost tensors"].tolist()
    results_json_comp["cost list"] = results_json_comp.pop("cost tensors") # change name of dictionary key
    results_json_comp["production tensors"] = results_json_comp["production tensors"].tolist()
    results_json_comp["production list"] = results_json_comp.pop("production tensors") # change name of dictionary key
    results_json_comp["is pareto"] = results_json_comp["is pareto"].tolist()
    # Convert objective to dictionary
    results_json_comp["model objective"] = JSON_serialize_objective(results_json_comp["model objective"])
        
    # convert into json and save as file
    with open((basename + ".json"), 'w') as f: 
        json.dump(results_json_comp, f, indent=2) # indent for readability
        
    # Delete the results_json_comp to free up memory
    del results_json_comp

In [None]:
def JSON_deserialize_objective(objective_data, model):
    """
    Deserializes a serialized cobra model objective from a dictionary back into its original form.
    
    PARAMETERS
    * objective_data - dictionar - Dictionary containing the serialized objective data.
    * model - cobra model - The cobra model that contains the reactions.
    
    RETURNS
    * A cobra 'Objective' object representing the deserialized objective.
    """
    
    # Extract the first half of reactions and coefficients
    reactions = objective_data["reactions"][:len(objective_data["reactions"]) // 2]
    coefficients = objective_data["coefficients"][:len(objective_data["coefficients"]) // 2]
    
    # Initialize the expression for the objective
    objective_expr = 0
    
    # Loop through reactions and their corresponding coefficients
    for reaction_id, coefficient in zip(reactions, coefficients):
        # Access the reaction from the model using its ID
        reaction = model.reactions.get_by_id(reaction_id)
        
        # Add the term to the objective expression
        objective_expr += coefficient * reaction.flux_expression
    
    # Recreate the Objective object with the correct direction
    direction = 'max' if objective_data["direction"] == "max" else "min"
    
    # Return the new Objective object
    return model.problem.Objective(objective_expr, direction=direction)

In [None]:
def JSON_deserialize_load_results(json_file, MetModel):
    """
    Takes a JSON-serialized output file and converts it back into the original results dictionary format.

    PARAMETERS
    * json_file - string - name of a json file storing the results of a BayesOpt run
    * model - cobra model - metabolic model used to create those data

    RETURNS
    * results_json_comp - dictionary - original format of the BayesOpt results; can be used for plotting
    """
    with open(json_file, 'r') as f:
        results_json_comp = json.load(f)
    
    # Convert lists back into tensors
    results_json_comp["growth rate tensors"] = torch.tensor(results_json_comp.pop("growth rate list"))
    results_json_comp["cost tensors"] = torch.tensor(results_json_comp.pop("cost list"))
    results_json_comp["production tensors"] = torch.tensor(results_json_comp.pop("production list"))
    results_json_comp["is pareto"] = torch.tensor(results_json_comp["is pareto"])
    
    # Convert objective back to its original format
    results_json_comp["model objective"] = JSON_deserialize_objective(
        results_json_comp["model objective"], MetModel)
    
    return results_json_comp