Skip to content

Commit

Permalink
Merge f8c08c5 into d1a8b8d
Browse files Browse the repository at this point in the history
  • Loading branch information
KristianJensen committed Nov 20, 2015
2 parents d1a8b8d + f8c08c5 commit 6647beb
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 180 deletions.
264 changes: 173 additions & 91 deletions cameo/strain_design/deterministic/flux_variability_based.py
Expand Up @@ -16,7 +16,7 @@
from __future__ import absolute_import, print_function
import re

__all__ = ['DifferentialFVA', 'fseof']
__all__ = ['DifferentialFVA', 'FSEOF']

from functools import partial
from uuid import uuid4
Expand Down Expand Up @@ -49,7 +49,7 @@
from cameo import Metabolite
from cameo.parallel import SequentialView
from cameo.core.solver_based_model import Reaction
from cameo.strain_design import StrainDesignMethod
from cameo.strain_design import StrainDesignMethod, StrainDesignResult
from cameo.flux_analysis.analysis import phenotypic_phase_plane, PhenotypicPhasePlaneResult
from cameo.util import TimeMachine
import cameo
Expand Down Expand Up @@ -133,13 +133,9 @@ def __init__(self, design_space_model, objective, variables=None, reference_mode
if isinstance(objective, Reaction):
self.objective = objective.id
elif isinstance(objective, Metabolite):
try:
self.reference_model.add_demand(objective)
self.objective = self.design_space_model.add_demand(objective).id
except:
logger.debug("Demand reaction for metabolite %s already exists" % objective.id)
self.objective = self.design_space_model.reactions.get_by_id("DM_%s" % objective.id).id

self.reference_model.add_demand(objective)
self.objective = self.design_space_model.add_demand(objective).id
self.objective = objective
elif isinstance(objective, str):
self.objective = objective
else:
Expand Down Expand Up @@ -322,11 +318,12 @@ def plot(self, grid=None, width=None, height=None, title=None):
notice("Multi-dimensional plotting is not supported")
return
title = "DifferentialFVA Result" if title is None else title
points = [(elem[0][1], elem[1][1]) for elem in list(self.solutions.items)]
colors = ["red" for _ in points]
x = [elem[0][1] for elem in list(self.solutions.items)]
y = [elem[1][1] for elem in list(self.solutions.items)]
colors = ["red" for _ in x]
plotting.plot_production_envelope(self._phase_plane, objective=self.objective, key=self.variable_ids[0],
grid=grid, width=width, height=height, title=title,
points=points, points_colors=colors)
points=[x, y], points_colors=colors)

def _repr_html_(self):
def _data_frame(solution):
Expand Down Expand Up @@ -410,109 +407,180 @@ def _set_bounds(self, point):
target_reaction.lower_bound, target_reaction.upper_bound = target_bound, target_bound


def fseof(model, enforced_reaction, max_enforced_flux=0.9, granularity=10, primary_objective=None, solution_method=fba,
exclude=()):
class FSEOF(StrainDesignMethod):
"""
Performs a Flux Scanning based on Enforced Objective Flux (FSEOF) analysis.
Parameters
----------
model : SolverBasedModel
enforced_reaction : Reaction
The flux that will be enforced.
max_enforced_flux : float, optional
The maximal flux of secondary_objective that will be enforced, relative to the theoretical maximum.
granularity : int, optional (defaults to 0.9).
The number of enforced flux levels (defaults to 10).
The flux that will be enforced. Reaction object or reaction id string.
primary_objective : Reaction
The primary objective flux (defaults to model.objective).
exclude : Iterable of reactions or reaction ids that will not be included in the output.
Returns
-------
FseofResult
List of reactions that correlate with enforced flux.
References
----------
.. [1] H. S. Choi, S. Y. Lee, T. Y. Kim, and H. M. Woo, 'In silico identification of gene amplification targets
for improvement of lycopene production.,' Appl Environ Microbiol, vol. 76, no. 10, pp. 3097–3105, May 2010.
"""
ndecimals = config.ndecimals
with TimeMachine() as tm:

# Convert enforced reaction to Reaction object
if not isinstance(enforced_reaction, Reaction):
enforced_reaction = model.reactions.get_by_id(enforced_reaction)
primary_objective = primary_objective or model.objective
def __init__(self, model, enforced_reaction, primary_objective=None, *args, **kwargs):
super(FSEOF, self).__init__(*args, **kwargs)
self.model = model

# Exclude list
exclude += tuple(model.exchanges)
exclude_ids = [enforced_reaction.id]
for reaction in exclude:
if isinstance(reaction, Reaction):
exclude_ids.append(reaction.id)
if isinstance(enforced_reaction, Reaction):
if enforced_reaction in model.reactions:
self.enforced_reaction = enforced_reaction
else:
exclude_ids.append(reaction)

tm(do=int, undo=partial(setattr, model, "objective", model.objective))
tm(do=int, undo=partial(setattr, enforced_reaction, "lower_bound", enforced_reaction.lower_bound))
tm(do=int, undo=partial(setattr, enforced_reaction, "upper_bound", enforced_reaction.upper_bound))
raise ValueError("The reaction "+enforced_reaction.id+" does not belong to the model")
elif isinstance(enforced_reaction, str):
if enforced_reaction in model.reactions:
self.enforced_reaction = model.reactions.get_by_id(enforced_reaction)
else:
raise ValueError("No reaction "+enforced_reaction+" found in model")
else:
raise TypeError("Enforced reaction must be a Reaction or reaction id")

# Find initial flux of enforced reaction
model.objective = primary_objective
initial_solution = solution_method(model)
initial_fluxes = initial_solution.fluxes
initial_flux = round(initial_fluxes[enforced_reaction.id], ndecimals)
if primary_objective is None:
self.primary_objective = model.objective
elif isinstance(primary_objective, Reaction):
if primary_objective in model.reactions:
self.primary_objective = primary_objective
else:
raise ValueError("The reaction "+primary_objective.id+" does not belong to the model")
elif isinstance(primary_objective, str):
if primary_objective in model.reactions:
self.primary_objective = model.reactions.get_by_id(primary_objective)
else:
raise ValueError("No reaction "+primary_objective+" found in the model")
elif isinstance(primary_objective, type(model.objective)):
self.primary_objective = primary_objective
else:
raise TypeError("Primary objective must be an Objective, Reaction or a string")

# Find theoretical maximum of enforced reaction
model.objective = enforced_reaction
max_theoretical_flux = round(solution_method(model).fluxes[enforced_reaction.id], ndecimals)
def run(self, max_enforced_flux=0.9, granularity=10, exclude=(), solution_method=fba):
"""
Performs a Flux Scanning based on Enforced Objective Flux (FSEOF) analysis.
max_flux = max_theoretical_flux * max_enforced_flux
Parameters
----------
max_enforced_flux : float, optional
The maximal flux of secondary_objective that will be enforced, relative to the theoretical maximum (defaults to 0.9).
granularity : int, optional
The number of enforced flux levels (defaults to 10).
exclude : Iterable of reactions or reaction ids that will not be included in the output.
# Calculate enforcement levels
enforcements = [initial_flux + (i + 1) * (max_flux - initial_flux) / granularity for i in range(granularity)]
Returns
-------
FseofResult
An object containing the identified reactions and the used parameters.
# FSEOF results
results = {reaction.id: [round(initial_fluxes[reaction.id], config.ndecimals)] for reaction in model.reactions}
References
----------
.. [1] H. S. Choi, S. Y. Lee, T. Y. Kim, and H. M. Woo, 'In silico identification of gene amplification targets
for improvement of lycopene production.,' Appl Environ Microbiol, vol. 76, no. 10, pp. 3097–3105, May 2010.
# Scan fluxes for different levels of enforcement
model.objective = primary_objective
for enforcement in enforcements:
enforced_reaction.lower_bound = enforcement
enforced_reaction.upper_bound = enforcement
solution = solution_method(model)
for reaction_id, flux in solution.fluxes.items():
results[reaction_id].append(round(flux, config.ndecimals))
"""
model = self.model
primary_objective = self.primary_objective
enforced_reaction = self.enforced_reaction

# Test each reaction
fseof_reactions = []
for reaction_id, fluxes in results.items():
if reaction_id not in exclude_ids and abs(fluxes[-1]) > abs(fluxes[0]) and min(fluxes) * max(fluxes) >= 0:
fseof_reactions.append(model.reactions.get_by_id(reaction_id))
ndecimals = config.ndecimals

return FseofResult(fseof_reactions, enforced_reaction, model)
# Exclude list
exclude = list(exclude) + model.exchanges
exclude_ids = [self.enforced_reaction.id]
for reaction in exclude:
if isinstance(reaction, Reaction):
exclude_ids.append(reaction.id)
else:
exclude_ids.append(reaction)

with TimeMachine() as tm:

class FseofResult(cameo.core.result.Result):
tm(do=int, undo=partial(setattr, model, "objective", model.objective))
tm(do=int, undo=partial(setattr, enforced_reaction, "lower_bound", enforced_reaction.lower_bound))
tm(do=int, undo=partial(setattr, enforced_reaction, "upper_bound", enforced_reaction.upper_bound))

# Find initial flux of enforced reaction
model.objective = primary_objective
initial_solution = solution_method(model)
initial_fluxes = initial_solution.fluxes
initial_flux = round(initial_fluxes[enforced_reaction.id], ndecimals)

# Find theoretical maximum of enforced reaction
model.objective = enforced_reaction
max_theoretical_flux = round(solution_method(model).fluxes[enforced_reaction.id], ndecimals)

max_flux = max_theoretical_flux * max_enforced_flux

# Calculate enforcement levels
enforcements = [initial_flux + (i + 1) * (max_flux - initial_flux) / granularity for i in range(granularity)]

# FSEOF results
results = {reaction.id: [round(initial_fluxes[reaction.id], config.ndecimals)] for reaction in model.reactions}

# Scan fluxes for different levels of enforcement
model.objective = primary_objective
for enforcement in enforcements:
enforced_reaction.lower_bound = enforcement
enforced_reaction.upper_bound = enforcement
solution = solution_method(model)
for reaction_id, flux in solution.fluxes.items():
results[reaction_id].append(round(flux, config.ndecimals))

# Test each reaction
fseof_reactions = []
for reaction_id, fluxes in results.items():
if reaction_id not in exclude_ids \
and max(abs(max(fluxes)), abs(min(fluxes))) > abs(fluxes[0]) \
and min(fluxes) * max(fluxes) >= 0:
fseof_reactions.append(model.reactions.get_by_id(reaction_id))

reaction_results = {rea.id: results[rea.id] for rea in fseof_reactions}
run_args = dict(max_enforced_flux=max_enforced_flux,
granularity=granularity,
solution_method=solution_method,
exclude=exclude)
return FSEOFResult(fseof_reactions, enforced_reaction, model, primary_objective, [initial_flux]+enforcements, reaction_results, run_args)


class FSEOFResult(StrainDesignResult):
"""
Object for storing a FSEOF result.
Attributes:
reactions: A list of the reactions that are found to increase with product formation.
enforced_levels: A list of the fluxes that the enforced reaction was constrained to.
data_frame: A pandas DataFrame containing the fluxes for every reaction for each enforced flux.
run_args: The arguments that the analysis was run with. To repeat do 'FSEOF.run(**FSEOFResult.run_args)'.
"""

def __init__(self, reactions, objective, model, *args, **kwargs):
super(FseofResult, self).__init__(*args, **kwargs)
def plot(self, grid=None, width=None, height=None, title=None):
pass

def __init__(self, reactions, enforced_reaction, model, primary_objective, enforced_levels, reaction_results, run_args, *args, **kwargs):
super(FSEOFResult, self).__init__(*args, **kwargs)
self._reactions = reactions
self._objective = objective
self._enforced_reaction = enforced_reaction
self._model = model
self._primary_objective = primary_objective
self._run_args = run_args
self._enforced_levels = enforced_levels
self._reaction_results = reaction_results

def __len__(self):
return len(self.reactions)

# TODO: Make an iterator that returns designs from the different enforced levels
def __iter__(self):
return iter(self.reactions)

def __eq__(self, other):
return isinstance(other,
self.__class__) and self.objective == other.objective and self.reactions == other.reactions
return isinstance(other, self.__class__) and \
self.enforced_reaction == other.enforced_reaction and self.reactions == other.reactions

@property
def reactions(self):
Expand All @@ -523,27 +591,41 @@ def model(self):
return self._model

@property
def objective(self):
return self._objective
def enforced_reaction(self):
return self._enforced_reaction

@property
def primary_objective(self):
return self._primary_objective

@property
def run_args(self):
return self._run_args

@property
def enforced_levels(self):
return self._enforced_levels

def _repr_html_(self):
template = """
<table>
<tr>
<td><b>Enforced objective</b></td>
<td>%(objective)s</td>
</tr>
<tr>
<td><b>Reactions</b></td>
<td>%(reactions)s</td>
<tr>
</table>"""
return template % {'objective': self.objective.nice_id,
'reactions': "<br>".join(reaction.id for reaction in self.reactions)}
<strong>Model:</strong> %(model)s</br>
<strong>Enforced objective:</strong> %(objective)s</br>
<strong>Primary objective:</strong> %(primary)s</br>
<br>
<strong>Reaction fluxes</strong><br><br>
%(df)s
"""
return template % {'objective': str(self.enforced_reaction),
'reactions': "<br>".join(reaction.id for reaction in self.reactions),
'model': self.model.id,
'primary': str(self._primary_objective),
'df': self.data_frame._repr_html_()}

@property
def data_frame(self):
return pandas.DataFrame((r.id for r in self.reactions), columns=["Reaction id"])
df = pandas.DataFrame(self._reaction_results).transpose()
df.columns = (str(enf) for enf in self._enforced_levels)
return df


# if __name__ == '__main__':
Expand Down
16 changes: 7 additions & 9 deletions cameo/strain_design/deterministic/linear_programming.py
Expand Up @@ -206,26 +206,24 @@ def run(self, k, target, max_results=1, *args, **kwargs):
knockouts = set(reac for y, reac in self._y_vars.items() if round(y.primal, 3) == 0)
assert len(knockouts) <= k

fluxes = solution.fluxes
production = solution.f

knockout_list.append(knockouts)
fluxes_list.append(fluxes)
production_list.append(production)

if len(knockouts) < k:
self._number_of_knockouts_constraint.lb = self._number_of_knockouts_constraint.ub - len(knockouts)
fluxes_list.append(solution.fluxes)
production_list.append(solution.f)

# Add an integer cut
y_vars_to_cut = [y for y in self._y_vars if round(y.primal, 3) == 0]
integer_cut = self._model.solver.interface.Constraint(Add(*y_vars_to_cut),
lb=1,
name="integer_cut_"+str(count))

if len(knockouts) < k:
self._number_of_knockouts_constraint.lb = self._number_of_knockouts_constraint.ub - len(knockouts)

tm(do=partial(self._model.solver.add, integer_cut),
undo=partial(self._model.solver.remove, integer_cut))
count += 1

return OptKnockResult(knockout_list, fluxes_list, production_list, target)
return OptKnockResult(knockout_list, fluxes_list, production_list, target)


class RobustKnock(StrainDesignMethod):
Expand Down

0 comments on commit 6647beb

Please sign in to comment.