In [1]:
# Install cobrapy if not installed yet
try:
    import cobra
    print('Cobrapy already installed')
except:
    !pip install cobra

Cobrapy already installed


In [None]:
import numpy as np
import pandas as pd
from cobra.io import load_matlab_model, read_sbml_model
from cobra.util.array import create_stoichiometric_matrix
from cobra.util.solver import linear_reaction_coefficients
from scipy.optimize import linprog
from pathlib import Path

import warnings
warnings.filterwarnings("ignore")

In [None]:
class CMCNModel:
    """ Python representation of a metabolic model"""
    def __init__(self, S, mets, rxns, lb, ub, name=None, met_names=None, rxn_names=None, c=None, b=None,
                 genes=None, gpr_rules=None):
        """
        :param S: numpy array: Stoichiometric matrix
        :param mets: List containing metabolite IDs
        :param rxns: List containing reaction IDs
        :param lb: numpy array: Lower bounds of reactions
        :param ub: numpy array: Upper bounds of reactions
        :param name: string: Model name
        :param met_names: List containing metabolite names
        :param rxn_names: List containing reaction names
        :param c: numpy array: Coefficients of the objective function
        :param b: numpy array: rhs constants for the steady state constraints
        :param genes: List containing gene IDs
        :param gpr_rules: List containing gene-protein-reaction rules
        """
        self._check_attribute_dimensions(S, mets, rxns, lb, ub, met_names, rxn_names)
        self.name = name
        self.S = S
        self.mets = mets
        self.met_names = met_names
        self.rxns = rxns
        self.rxn_names = rxn_names
        self.genes = genes
        self.gpr_rules = gpr_rules
        self.lb = lb
        self.ub = ub

        if c is None:
            self.c = np.zeros(len(self.rxns))
        else:
            self.c = c

        if b is None:
            self.b = np.zeros(len(self.mets))
        else:
            self.b = b


    ### MODEL PROPERTY FUNCTIONS ###
    @property
    def rxn_gene_matrix(self):
        """ Creates reaction-gene matrix (rows-rxns, cols-genes) """
        # FIXME: Doesn't work if gene ID is a substring of another geneID!

        # return nothing if genes or gpr rules are missing
        if self.gpr_rules is None or self.genes is None:
            print('No gene IDs or gpr rules in model')
            return None

        mat = np.zeros((len(self.rxns), len(self.genes)))
        for i, rxn in enumerate(self.rxns):
            for j, gene in enumerate(self.genes):
                if gene in self.gpr_rules[i]:
                    mat[i, j] = 1.0
        return mat


    ### MODEL MANIPULATION ###
    def split_reversible_rxns(self):
        """ Splits reversible reactions into two irreversible reactions and updates all model attributes accordingly """
        n_mets, n_rxns = self.S.shape

        # create empty arrays
        S_new = np.zeros((n_mets, 0))   # create empty stoichiometric matrix of correct dimensions
        rxns_new = []
        lb_new = np.array([])
        ub_new = np.array([])
        rxn_names_new = [] if self.rxn_names is not None else None  # check whether rxn_names are stored
        gpr_rules_new = [] if self.gpr_rules is not None else None

        for k in range(n_rxns):
            rxn_col = np.reshape(self.S[:, k], (n_mets, 1))     # reshape dimension from (, n_mets) to (n_mets, 1)

            if self.lb[k] >= 0:     # it's irreversible
                S_new = np.hstack((S_new, rxn_col))
                rxns_new.append(self.rxns[k])
                lb_new = np.hstack((lb_new, self.lb[k]))
                ub_new = np.hstack((ub_new, self.ub[k]))
                if self.rxn_names is not None:
                    rxn_names_new.append(self.rxn_names[k])
                if self.gpr_rules is not None:
                    gpr_rules_new.append(self.gpr_rules[k])

            else:           # it's reversible
                # forward rxn
                S_new = np.hstack((S_new, rxn_col))
                rxns_new.append(f"{self.rxns[k]}_f")
                lb_new = np.hstack((lb_new, 0))
                ub_new = np.hstack((ub_new, self.ub[k]))
                if self.rxn_names is not None:
                    rxn_names_new.append(f"{self.rxn_names[k]} (forward)")
                if self.gpr_rules is not None:
                    gpr_rules_new.append(self.gpr_rules[k])

                # backward rxn
                S_new = np.hstack((S_new, -rxn_col))
                rxns_new.append(f"{self.rxns[k]}_b")
                lb_new = np.hstack((lb_new, 0))
                ub_new = np.hstack((ub_new, -self.lb[k]))
                if self.rxn_names is not None:
                    rxn_names_new.append(f"{self.rxn_names[k]} (backward)")
                if self.gpr_rules is not None:
                    gpr_rules_new.append(self.gpr_rules[k])

        # assign attributes with splitted reactions to class object
        self.S = S_new
        self.rxns = rxns_new
        self.rxn_names = rxn_names_new
        self.lb = lb_new
        self.ub = ub_new
        self.c = np.zeros(len(self.rxns))
        self.gpr_rules = gpr_rules_new


    ### MODEL OPTIMIZATION ###
    def set_objective(self, rxn):
        """ changes the objective function of the model
        :param rxn: reaction id"""
        self.c = np.zeros_like(self.c)
        objective_index = self.rxns.index(rxn)
        self.c[objective_index] = 1

    def print_objective(self):
        """ prints the objective function in the format coefficient * rxn id"""
        if any(self.c):
            objective_index = np.where(self.c != 0)[0]
            print(
                f"The objective function is:  {self.c[objective_index].item()} * {self.rxns[objective_index.item()]}")
        else:
            print("The objective function is not defined in the model")

    def flux_balance_analysis(self):
        """ solves the FBA problem for maximizing the objective function using linprog
        :return: optimization status
        :return: objective value
        :return: solution for all variables"""
        if any(self.c):
            bounds = [(lb, ub) for lb, ub in zip(self.lb, self.ub)]
            solution = linprog(c=-self.c, A_eq=self.S, b_eq=self.b, bounds=bounds)
            if solution.success:
                return solution.status, -solution.fun, solution.x
            else:
                print("No optimal solution found")
                return solution.status, None, None
        else:
            raise ValueError("The objective function is not defined in the model")

    def flux_variability_analysis(self, alpha=1):
        """ Solves the FVA problem for each model reaction.
        For this function to work you will have to implement your own FBA function from Homework 5!
        I.e. calculates the flux range of each model reaction for a given alpha.
        Alpha determines the amount of suboptimality w.r.t. optimum biomass (z*) we allow (z >= alpha * z*).

        :param alpha: float between 0 and 1
        :return: (numpy.array, numpy.array); two numpy arrays containing the minimum and maximum flux values for each reaction
        """
        # Solve FBA to determine the optimal objective
        status, z, _ = self.flux_balance_analysis()

        if status == 0:
            # fixing biomass (A_ub >= b_ub)
            A_ub = np.array([self.c])
            b_ub = np.array([alpha * z])
            # Note: you could also set the lower bound of the objective to alpha*z
            # self.lb[np.where(c)[0][0]] = alpha*z

            # run FVA
            bounds = [(lb, ub) for lb, ub in zip(self.lb, self.ub)]
            min_fluxes = []
            max_fluxes = []
            for i in range(len(self.rxns)):
                c = np.zeros(len(self.rxns))
                c[i] = 1
                min_solution = linprog(c, A_ub=-A_ub, b_ub=-b_ub, A_eq=self.S, b_eq=self.b, bounds=bounds)
                max_solution = linprog(-c, A_ub=-A_ub, b_ub=-b_ub, A_eq=self.S, b_eq=self.b, bounds=bounds)
                min_fluxes.append(min_solution.fun)
                max_fluxes.append(-max_solution.fun)
            return np.array(min_fluxes), np.array(max_fluxes)
        else:
            print(f"No optimal solution found. cobra_model.optimize() terminated with status {status}")

    ### PRIVATE METHODS ###
    def _check_attribute_dimensions(self, S, mets, rxns, lb, ub, met_names, rxn_names):
        """ Checks whether the dimensions of the passed attributes during initialization match """
        n_mets, n_rxns = S.shape
        if len(mets) != n_mets:
            raise ValueError(f"Dimensions of 'S' and 'mets' do not match: {n_mets}, {len(mets)}")
        if met_names is not None and len(met_names) != n_mets:
            raise ValueError(f"Dimensions of 'S' and 'met_names' do not match: {n_mets}, {len(met_names)}")
        if len(rxns) != n_rxns:
            raise ValueError(f"Dimensions of 'S' and 'rxns' do not match: {n_rxns}, {len(rxns)}")
        if rxn_names is not None and len(rxn_names) != n_rxns:
            raise ValueError(f"Dimensions of 'S' and 'rxn_names' do not match: {n_rxns}, {len(rxn_names)}")
        if len(lb) != n_rxns:
            raise ValueError(f"Dimensions of 'S' and 'lb' do not match: {n_rxns}, {len(lb)}")
        if len(ub) != n_rxns:
            raise ValueError(f"Dimensions of 'S' and 'ub' do not match: {n_rxns}, {len(ub)}")


    def OptReg(self, alpha, target_rxn, beta, C, epsilon, alt_solution=False):
        """ Simplified OptReg method
        Determines the optimal reaction modifications (knock-out, up- and down-regulation) in order to achieve an increased
        flux through a target reaction, while maintaining a certain amount of the optimal biomass flux.

        :param alpha: float between 0 and 1, determines the minimal fraction of the biomass flux we want to maintain.
        :param target_rxn: string, Reaction ID of the target reaction.
        :param beta: float, factor by which we want to increase the flux through the target reaction.
        :param C: float.
        :param epsilon: float.
        :param alt_solution: bool, if True, the function checks whether an alternative solution to the initial solution exists.
        :return: pandas Dataframe, contains the IDs of the modified reactions and the type of modifications
        """
        
        # Split reversible reactions
        self.split_reversible_rxns()

        n = len(self.rxns)
        target_index = self.rxns.index(target_rxn)
        bio_index = self.rxns.index('BIOMASS_Ecoli_core_w_GAM')

        # Step 1: Set lower bound of target_rxn to zero
        original_lb = self.lb[target_index]
        self.lb[target_index] = 0

        # Step 2: Calculate the optimum biomass flux z*. 'BIOMASS_Ecoli_core_w_GAM'
        self.set_objective('BIOMASS_Ecoli_core_w_GAM')
        _, z_star, v = self.flux_balance_analysis()
        if not z_star:
            raise RuntimeError("Failed to calculate optimum biomass flux")

        v_min, v_max = self.flux_variability_analysis(alpha=0)
        v_l, v_u = self.flux_variability_analysis(alpha=1)

        # Target flux from the optimum biomass flux
        w_target_rxn = v[target_index]   # initially v[target_rxn]

        # Equality constraints from S matrix
        A_eq = np.hstack([self.S, np.zeros((self.S.shape[0], 3 * self.S.shape[1]))])  # Add columns for y variables, DIM: (mets, 4*rxns)
        b_eq = self.b

        # Inequality constraints
        A_ineq = np.zeros((10*n, 4*n))
        b_ineq = np.zeros(10*n)

        for i in range(n):
            # First inequality: -v_i - epsilon * y_k_i <= - epsilon
            A_ineq[i, i] = -1
            A_ineq[i, n + i] = -epsilon
            b_ineq[i] = -epsilon

            # Second inequality: v_i <= (1 - y_k_i) * v_i^{max} + y_k_i * epsilon
            A_ineq[n + i, i] = 1
            A_ineq[n + i, n + i] = v_max[i] - epsilon
            b_ineq[n + i] = v_max[i] #self.ub[i]

            # Third inequality: -v_i - (v_min_i - v_u_i(1 - C) - v_max_i * C) * y_u_i <= -v_min_i
            A_ineq[2*n + i, i] = -1
            A_ineq[2*n + i, 2*n + i] = v_u[i] * (1 - C) + v_max[i] * C - v_min[i]
            b_ineq[2*n + i] = -v_min[i]

            # Fourth inequality: v_i - (v_max_i - v_u_i) * y_u_i <= v_u_i
            A_ineq[3*n + i, i] = 1
            A_ineq[3*n + i, 2*n + i] = - v_max[i] + v_u[i]
            b_ineq[3*n + i] = v_u[i]

            # Fifth inequality: -v_i + (v_min_i - v_l_i) * y_d_i <= -v_l_i
            A_ineq[4*n + i, i] = -1
            A_ineq[4*n + i, 3*n + i] = v_min[i] - v_l[i]
            b_ineq[4*n + i] = -v_l[i]

            # Sixth inequality: v_i - (v_l_i(1 - C) + v_min_i * C - v_max_i) * y_d_i <= v_max_i
            A_ineq[5*n + i, i] = 1
            A_ineq[5*n + i, 3*n + i] = - v_l[i] * (1 - C) - v_min[i] * C + v_max[i]
            b_ineq[5*n + i] = v_max[i]

            # Seventh inequality to guarantee at most one manipulation
            # y_k_i + y_u_i + y_d_i <= 1
            A_ineq[6*n + i, n + i] = 1
            A_ineq[6*n + i, 2*n + i] = 1
            A_ineq[6*n + i, 3*n + i] = 1
            b_ineq[6*n + i] = 1

            # for j in range(n):
            if self.rxns[i].endswith('_f'): # == self.rxns[i] + "_b":  # Explicitly match forward and backward reactions
                # print('hello')
                # Equality constraint: y_k_p = y_k_q
                eq_row = np.zeros(4 * n)
                eq_row[n + i] = 1  # y_k_p
                eq_row[n + i + 1] = -1  # -y_k_q
                A_eq = np.vstack([A_eq, eq_row])
                b_eq = np.append(b_eq, 0)
                
                # Eight Inequality constraint: y_u_p + y_u_q <= 1
                A_ineq[7 * n + i, 2 * n + i] = 1  # y_u_p
                A_ineq[7 * n + i, 2 * n + i + 1] = 1  # y_u_q
                b_ineq[7 * n + i] = 1

                # Nineth Inequality constraint: y_d_p + y_d_q <= 1
                A_ineq[8 * n + i, 3 * n + i] = 1  # y_d_p
                A_ineq[8 * n + i, 3 * n + i + 1] = 1  # y_d_q
                b_ineq[8 * n + i] = 1

        # Biomass constraint
        A_ineq = np.vstack([A_ineq, np.zeros((1, 4 * n))])  # Add a row for biomass constraint
        A_ineq[-1, bio_index] = -1                         # v_bio >= alpha * z_star -> -v_bio <= -alpha * z_star
        b_ineq = np.hstack([b_ineq, -alpha * z_star])       # Biomass flux must be >= alpha * z_star
        
        # Target constraint
        A_ineq = np.vstack([A_ineq, np.zeros((1, 4 * n))])  # Add a row for the target reaction constraint
        A_ineq[-1, target_index] = -1                      # v_target >= beta * w_target_rxn -> -v_target <= -beta * w_target_rxn
        b_ineq = np.hstack([b_ineq, -beta * w_target_rxn])  # Target flux must be >= beta * w_target_rxn
        
        # Step 4: Formulate the MILP
        c = np.hstack([np.zeros(n), np.ones(3*n)])  # Objective: Minimize sum of y variables

        # Bounds for v and y variables
        lb = np.concatenate((self.lb, np.zeros(3*n)))  # lower bounds for v and y
        ub = np.concatenate((self.ub, np.ones(3*n)))  # upper bounds for v and y
        bounds = list(zip(lb, ub))
        
        # Debugging: Print dimensions and bounds
        # print("A_eq shape:", A_eq.shape, "b_eq shape:", b_eq.shape)
        # print("A_ineq shape:", A_ineq.shape, "b_ineq shape:", b_ineq.shape)
        # print("Bounds length:", len(bounds))
        
        solution = linprog(c=c, A_ub=A_ineq, b_ub=b_ineq, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs', integrality =c)

        if solution is None or not solution.success:
            raise RuntimeError(f"MILP optimization failed: {solution.status if solution else 'No solution returned'}")

        # Reset the lower bound of the target reaction
        self.lb[target_index] = original_lb
        
        # Step 5: Extract the solution
        y = solution.x[n:]
        y_k = y[:n]
        y_u = y[n:2*n]
        y_d = y[2*n:]

        # Alternative solution
        # Determine which y variables are equal to 1
        y_indices = [i for i, y_i in enumerate(y) if y_i > 0.999]  # Check for binary variables equal to 1

        # If alt_solution=True, check for alternative solutions
        if alt_solution:
            if not y_indices:
                print("No reaction modifications in the initial solution; no alternative solution possible.")
                return

            # Add an integer cut constraint to the MILP
            # Sum of selected y variables <= len(y_indices) - 1
            new_row = np.zeros(4 *n)  # Constraint for binary variables
            for idx in y_indices:
                new_row[n + idx] = 1  # Select binary variables
            A_ineq = np.vstack([A_ineq, new_row])
            b_ineq = np.hstack([b_ineq, len(y_indices) - 1])

            # Solve the modified MILP
            alternative_solution = linprog(
                c=c, A_ub=A_ineq, b_ub=b_ineq, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs' , integrality=c
            )

            if alternative_solution.success:
                print("An alternative optimal solution exists.")
            else:
                print("No alternative optimal solution exists.")

        # Step 6: Extract the modifications
        modifications = []
        for i in range(n):
            if y_k[i] > 0.5:
                modifications.append((self.rxns[i], 'Knock-out'))
            elif y_u[i] > 0.5:
                modifications.append((self.rxns[i], 'Up-regulation'))
            elif y_d[i] > 0.5:
                modifications.append((self.rxns[i], 'Down-regulation'))

        return pd.DataFrame(modifications, columns=['Reaction', 'Modification'])


In [None]:
def cobra_to_CMCNModel(model):
    """ Converts a cobrapy model into a CMCNModel object.
    :param model: cobrapy model
    :return: CMCNModel object """
    S = create_stoichiometric_matrix(model)
    name = model.name
    mets = [met.id for met in model.metabolites]
    met_names = [met.name for met in model.metabolites]
    rxns = [rxn.id for rxn in model.reactions]
    rxn_names = [rxn.name for rxn in model.reactions]
    genes = [gene.id for gene in model.genes]
    gpr_rules = [rxn.gene_reaction_rule for rxn in model.reactions]
    lb = np.array([rxn.lower_bound for rxn in model.reactions])
    ub = np.array([rxn.upper_bound for rxn in model.reactions])
    c = np.zeros(len(rxns))
    for key, value in linear_reaction_coefficients(model).items():
        c[rxns.index(key.id)] = value
    b = np.zeros(len(mets))
    model_cmcn = CMCNModel(S=S, mets=mets, rxns=rxns, lb=lb, ub=ub, c=c, b=b,
                           name=name, met_names=met_names, rxn_names=rxn_names,
                           genes=genes, gpr_rules=gpr_rules)
    return model_cmcn

In [None]:
model_cobra = load_matlab_model('e_coli_core.mat')
model_cmcn = cobra_to_CMCNModel(model_cobra)
model_cmcn.lb[model_cmcn.rxns.index('ATPM')] = 0
model_cmcn.lb[model_cmcn.rxns.index('FUM')] = 0

In [None]:
# model_cmcn.set_objective('BIOMASS_Ecoli_core_w_GAM')
model_cmcn.print_objective()

In [None]:
epsilon = 2 * 0.000001
alpha = 0.9
beta = 1.7
C = 0.001

try:
    data = model_cmcn.OptReg(alpha, 'FUM', beta, C, epsilon, alt_solution=True)
    print(data)
except RuntimeError as e:
    print(e)

In [None]:
data['Modification'].value_counts()