# Helper Functions for BayesOpt

In [None]:
# for running in colab
#pip install cobra

In [None]:
# for running in colab
#pip install botorch

In [None]:
# imports
import torch
from botorch.models import SingleTaskGP, ModelListGP
from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood

import random # for initial data
import numpy as np

In [None]:
# to enable GPU processing
if torch.cuda.is_available():
    #print(f"CUDA is available. Number of devices: {torch.cuda.device_count()}")
    # If you have multiple GPUs, specify the desired device ordinal:
    device = torch.device(f"cuda:0")  # Use GPU 0
else:
    #print("CUDA is not available. Using CPU.")
    device = torch.device("cpu")  

tkwargs = {'device': device, 'dtype': torch.double}
# output 'dtype': torch.float64 bc. in PyTorch double & float64 are equivalent
#print(tkwargs)

## Function Definitions

In [None]:
def calc_cost_tot(costs, medium):
    """
    calculates the total cost for a given medium composition
    
    PARAMETERS
    * costs - dictionary - cost for each medium component
    * medium - dictionary - medium composition (component and amount)
    
    RETURNS
    * a tensor containing the total cost of the medium
    """
    
    cost_tot = sum(concentration * costs[key] for key, concentration in medium.items())
    cost_tot_tensor = torch.tensor([cost_tot], dtype=torch.double).to(**tkwargs) # ensure it is on the device previously decided
    
    return cost_tot_tensor

In [None]:
"""
initial_para, initial_growth, initial_production, initial_cost, = generate_initial_data(
        MetModel, medium, bounds, costs, n_samples = n_start,
        objective = objective,
        biomass_objective = biomass_objective, 
        production_objective = production_objective
        )
"""
def generate_initial_data(
    MetModel, medium, bounds, costs, n_samples = 5, 
    opt_objective = "growth-cost", 
    biomass_objective = None, production_objective = None
    ):
    """
    Creates initial data points needed to start Bayesian Optimisation
    * randomly creates media compositions within the concentration boundaries.
    * for each medium composition calculates total cost
    * for each performs FBA finding the optimal growth rate
    * stores all in lists

    PARAMETERS
    * MetModel - COBRApy model - the metabolic model to be evaluated
    * medium - dictionary - the medium composition of that model; 
        if not provided defaults to default medium provided by CobraPy
    * bounds - dictionary - upper and lower bounds for the values the medium components 
        are allowed to take, determines the search space;
    * costs - dictionary - the (monetary) cost of each component
    * n_samples - integer - how many random media compositions are to be created
    * opt_objective - string - indicates which combination of objectives is subject to optimisation
    * biomass_objective - string - the id of the biomass reaction to be optimised
    * production_objective - string - the id of the production reaction to be optimised
    
    RETURNS
    * initial_para - list of dictionaries - random medium compositions
    * initial_grwoth - tensor - corresponding growth rates
    * initial_production - tensor - corresponding production rates
    * initial_cost - tensor - corresponding medium costs
    """
    
    # assert that the objective is one of the possibilities
    opt_objective_types = ['growth-cost', 'growth-production', 'growth-production-cost',]
    if opt_objective not in opt_objective_types:
        raise ValueError("Invalid objective. Expected one of: %o" % opt_objective_types)
    
    # initalise empty lists
    initial_para = []
    initial_growth = []
    initial_production = []
    initial_cost = []
    
    # uses Model.slim_optimize() because it runs faster
    if opt_objective == "growth-cost":
        for i in range(n_samples):
            # generate random medium parameters within bounds
            random_medium = {} # empty dictionary
            for key in medium.keys():
                lower_bound, upper_bound = bounds[key]
                # Randomly choose a concentration within the provided bounds
                random_medium[key] = random.uniform(lower_bound, upper_bound)
            # Update the model's medium with the randomly generated medium
            MetModel.medium = random_medium

            # caclulate total cost
            cost_tot = calc_cost_tot(costs, random_medium)
            cost_tot = -cost_tot # BoTorch assumes maximisation, so we maximise the negative of the costs.
            
            # perform FBA
            growth = MetModel.slim_optimize()
            # if growth is NAN or smaller than zero, set to zero
            if (np.isnan(growth) or (growth < 0)):
                growth = 0

            # Store the parameters (random medium), total cost, and growth in respective lists
            initial_para.append(random_medium)
            initial_cost.append(cost_tot)
            initial_growth.append(growth)
    
    # use Model.optimize() to also get the productionr rate (by id)
    else: # opt_objective == "growth-production" or "growth-production-cost"
        if production_objective is None:
            raise ValueError("Please specifiy the production objective.")
        if biomass_objective is None:
            raise ValueError("Please specifiy the biomass objective.")
        
        for i in range(n_samples):
            # generate random medium parameters within bounds
            random_medium = {} # empty dictionary
            for key in medium.keys():
                lower_bound, upper_bound = bounds[key]
                # Randomly choose a concentration within the provided bounds
                random_medium[key] = random.uniform(lower_bound, upper_bound)
            
            # Update the model's medium with the randomly generated medium
            MetModel.medium = random_medium

            # caclulate total cost
            cost_tot = calc_cost_tot(costs, random_medium)
            cost_tot = -cost_tot # BoTorch assumes maximisation, so we maximise the negative of the costs.

            '''FBA'''
            # assign biomass function id as objective
            MetModel.objective = biomass_objective
            # run FBA
            FBA_solution = MetModel.optimize()
            # extract growth rate
            growth = FBA_solution.fluxes[biomass_objective]
            # extract production rate
            production = FBA_solution.fluxes[production_objective]
            
            # if either is NaN or smaller than zero, set to zero
            if (np.isnan(growth) or (growth < 0)):
                growth = 0
            if (np.isnan(production) or (production < 0)):
                production = 0 

            # Store the parameters (random medium), growth rate and production rate in respective lists
            initial_para.append(random_medium)
            initial_growth.append(growth)
            initial_production.append(production)
            initial_cost.append(cost_tot)
    
    return(
        initial_para,
        torch.tensor(initial_growth, dtype=torch.double).to(**tkwargs),
        torch.tensor(initial_production, dtype=torch.double).to(**tkwargs),
        torch.tensor(initial_cost, dtype=torch.double).to(**tkwargs) 
        )
     

In [None]:
def initialise_model(
        medium_tensors_stacked,
        growth_tensors, 
        opt_objective = "growth-cost", 
        cost_tensors = None, 
        production_tensors = None
        ):
    
    """
    Initialises the BO Model using all tried medium compositions and using cost and growth as objectives;
    Will estimate the cost function f

    https://botorch.org/tutorials/constrained_multi_objective_bo
    " We use a multi-output SingleTaskGP to model the two objectives with
    a homoskedastic Gaussian likelihood with an inferred noise level"


    PARAMETERS
    * medium_tensors_stacked - tensor - all previously evaluated medium compositions
    * growth_tensor - tensor - corresponding growth rates
    * opt_objective - string - indicates which combination of objectives is subject to optimisation
    * cost_tensor - tensor - corresponding medium costs
    * production_tensor - tensor - corresponding production rates  

    RETURNS
    * mll - SumMarginalLikelihoo of the model 
    * model - list of botorch models - List of SingleTaskGP models
    """

    # assert that the objective is one of the possibilities
    opt_objective_types = ['growth-cost', 'growth-production', 'growth-production-cost',]
    if opt_objective not in opt_objective_types:
        raise ValueError("Invalid objective. Expected one of: %o" % opt_objective_types)
    

    if opt_objective == "growth-cost":
        # combine growth and cost tensors into a single tensor
        objective_data = torch.cat((growth_tensors.view(-1,1), cost_tensors.view(-1,1)), dim = -1)

    elif opt_objective == "growth-production":
        # combine growth and production tensors into a single tensor
        objective_data = torch.cat((growth_tensors.view(-1,1), production_tensors.view(-1,1)), dim = -1)

    elif opt_objective == "growth-production-cost":        
        # combine growth, production and cost tensors into a single tensor
        objective_data = torch.cat((growth_tensors.view(-1,1), production_tensors.view(-1,1), cost_tensors.view(-1,1)), dim = -1)


    models = [] # initialise empty list
    for i in range(objective_data.shape[-1]): # in range(3), three "columns" - so for each column
        train_objective = objective_data[:, i] # the column - each being one objective (growth and cost)
    
        # train a model for the chosen objective and append it to the models list
        models.append(
            SingleTaskGP(medium_tensors_stacked, train_objective.unsqueeze(-1)).to(**tkwargs)
        ) # could add argument outcome_transform=Standardize(m=1)

    model = ModelListGP(*models)
    # likelihood of the GP
    mll = SumMarginalLogLikelihood(model.likelihood, model)
    # returns SumMarginalLogLikelihood and model
    return mll, model
        

In [1]:
def convert_to_dict(candidate_tensor, keys):
    """
    Converts the tensor representation of a medium back to a dictionary

    PARAMETERS
    * candidate_tensor - tensor - values of the medium composition stored in a tensor
    * keys - list - keys corresponding to all possible medium components

    RETURNS
    * candidate_dict - dictionary - a dictionary containing medium components as keys and their amount as values
    """
    
    # Squeeze the tensor to remove extra dimensions if necessary
    candidate_values = candidate_tensor.squeeze().tolist()
    
    # Create a dictionary by pairing the keys with the corresponding values from the tensor
    candidate_dict = {key: value for key, value in zip(keys, candidate_values)}
    
    return candidate_dict

In [None]:
def convert_normalise_media(bounds, medium_list):
    '''convert medium_list to tensor'''
    # convert bounds from dictionary to tensor
    bounds_tensor = torch.tensor(list(bounds.values()), dtype=torch.double).to(**tkwargs) # [x, 2]
    
    # Stack the lower and upper bounds to match the expected format
    bounds_tensors_stacked = torch.stack([bounds_tensor[:, 0], bounds_tensor[:, 1]], dim=0)

    # normalise medium composition
    medium_tensors_normalised = [] # initialise empty list
    for m in range(len(medium_list)):
        # transform current medium to tensor
        medium_m = medium_list[m]
        medium_m_tensor = torch.tensor(list(medium_m.values()), dtype=torch.double).to(**tkwargs) # [x]

        # normalise medium composition
        normalised_medium_m = normalize(medium_m_tensor, bounds_tensors_stacked)
        # Append the normalized tensor to the list
        medium_tensors_normalised.append(normalised_medium_m)
    
    return(bounds_tensors_stacked, medium_tensors_normalised)
    

In [None]:
def normalise_1Dtensors(tensors):
    """
    
    """
    min_vals = tensors.min(dim=0, keepdim=True).values
    max_vals = tensors.max(dim=0, keepdim=True).values
    if (max_vals == min_vals):
        return (tensors - min_vals) # i.e. zero
    else:
        return (tensors - min_vals) / (max_vals - min_vals)