Skip to content

Commit

Permalink
Implement lexicographic optimization
Browse files Browse the repository at this point in the history
- Add a LexicoSolver which makes a given subsolver solve sequentially
  several specified objectives, while adding a constraint at each step
  to avoid worsening results according to previous objectives

- Add a bunch of methods to implement (with partial default
  implementation) so that a solver can be used as a subsolver by
  LexicoSolver:
  - implements_lexico_api: returns True if the solver is lexico ready
    (ie implement subsequent methods). LexicoSolver raise a warning if
    this is not the case
  - get_model_objectives_available(): list of labels available
    corresponding to the intern objectives the solver can optimize.
    Defaults to keys of the problem's ObjectiveRegister (via new method
    problem.get_objective_names())
  - set_model_objective(): update the intern objective the subsolver
    will optimize
  - get_model_objective_value(): retrieve the value of the intern
    objective currently optimized
  - add_model_constraint(): add a constraint to the intern model
    on the given objective to avoid worsening it.

- Implement them for ortools-csat solvers on knapsack and rcpsp
  • Loading branch information
nhuet authored and g-poveda committed Jul 11, 2024
1 parent 3c23f3b commit e0733f9
Show file tree
Hide file tree
Showing 9 changed files with 734 additions and 26 deletions.
13 changes: 9 additions & 4 deletions discrete_optimization/generic_tools/do_problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,17 @@ def __init__(
self.objective_handling = objective_handling
self.dict_objective_to_doc = dict_objective_to_doc

def get_objective_names(self) -> List[str]:
return sorted(self.dict_objective_to_doc)

def get_list_objective_and_default_weight(self) -> Tuple[List[str], List[float]]:
"""Flatten the list of kpi names and default weight.
Returns: list of kpi names, list of default weight for the aggregated objective function.
"""
d = [
(k, self.dict_objective_to_doc[k].default_weight)
for k in self.dict_objective_to_doc
for k in self.get_objective_names()
]
return [s[0] for s in d], [s[1] for s in d]

Expand Down Expand Up @@ -313,8 +316,7 @@ def evaluate_mobj(self, variable: Solution) -> TupleFitness:
Returns (TupleFitness): a flattened tuple fitness object representing the multi-objective criteria.
"""
obj_register = self.get_objective_register()
keys = sorted(obj_register.dict_objective_to_doc.keys())
keys = self.get_objective_names()
dict_values = self.evaluate(variable)
return TupleFitness(np.array([dict_values[k] for k in keys]), len(keys))

Expand All @@ -331,7 +333,7 @@ def evaluate_mobj_from_dict(self, dict_values: Dict[str, float]) -> TupleFitness
"""
# output of evaluate(solution) typically
keys = sorted(self.get_objective_register().dict_objective_to_doc.keys())
keys = self.get_objective_names()
return TupleFitness(np.array([dict_values[k] for k in keys]), len(keys))

@abstractmethod
Expand Down Expand Up @@ -380,6 +382,9 @@ def get_optuna_study_direction(self) -> str:
direction = "maximize"
return direction

def get_objective_names(self) -> List[str]:
return self.get_objective_register().get_objective_names()


class BaseMethodAggregating(Enum):
"""Enum class used to specify how an evaluation of a multiscenario problem should be aggregated."""
Expand Down
103 changes: 102 additions & 1 deletion discrete_optimization/generic_tools/do_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from __future__ import annotations # see annotations as str

from abc import abstractmethod
from typing import Any, List, Optional, Tuple
from typing import Any, Iterable, List, Optional, Tuple

from discrete_optimization.generic_tools.callbacks.callback import Callback
from discrete_optimization.generic_tools.do_problem import (
Expand All @@ -18,6 +18,9 @@
from discrete_optimization.generic_tools.hyperparameters.hyperparametrizable import (
Hyperparametrizable,
)
from discrete_optimization.generic_tools.result_storage.multiobj_utils import (
TupleFitness,
)
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
fitness_class,
Expand Down Expand Up @@ -99,3 +102,101 @@ def is_optimal(self) -> Optional[bool]:
"""
return None

def get_model_objectives_available(self) -> List[str]:
"""List objectives available for lexico optimization
It corresponds to the labels accepted for obj argument for
- `set_model_objective()`
- `add_model_constraint()`
- `get_model_objective_value()`
Default to `self.problem.get_objective_names()`.
Returns:
"""
return self.problem.get_objective_names()

def set_model_objective(self, obj: str) -> None:
"""Update intern model objective.
Args:
obj: a string representing the desired objective.
Should be one of `self.get_model_objectives_available()`.
Returns:
"""
...

def get_model_objective_value(self, obj: str, res: ResultStorage) -> float:
"""Get best intern model objective value found by last call to `solve()`.
The default implementation consists in using the fit of the last solution in result_storage.
This assumes:
- that the last solution is the best one for the objective considered
- that no aggregation was performed but rather that the fitness is a TupleFitness
with values in the same order as `self.problem.get_objective_names()`.
Args:
obj: a string representing the desired objective.
Should be one of `self.get_model_objectives_available()`.
res: result storage returned by last call to solve().
Returns:
"""
_, fit = res.get_best_solution_fit()
if not isinstance(fit, TupleFitness):
raise RuntimeError(
"The fitness should be a TupleFitness of the same size as `self.problem.get_objective_names()`."
)
objectives = self.problem.get_objective_names()
idx = objectives.index(obj)
return float(fit.vector_fitness[idx])

def add_model_constraint(self, obj: str, value: float) -> Iterable[Any]:
"""Add a constraint on a computed sub-objective
Args:
obj: a string representing the desired objective.
Should be one of `self.get_model_objectives_available()`.
value: the limiting value.
If the optimization direction is maximizing, this is a lower bound,
else this is an upper bound.
Returns:
the created constraints.
"""
...

def remove_model_constraint(self, constraints: Iterable[Any]) -> None:
"""Remove the intern model constraints.
Args:
constraints: constraints created with `add_model_constraint()`
Returns:
"""
...

@staticmethod
def implements_lexico_api() -> bool:
"""Tell whether this solver is implementing the api for lexicographic optimization.
Should return True only if
- `set_model_objective()`
- `add_model_constraint()`
- `get_model_objective_value()`
have been really implemented, i.e.
- calling `set_model_objective()` and `add_model_constraint()`
should actually change the next call to `solve()`,
- `get_model_objective_value()` should correspond to the intern model objective
"""
return False
120 changes: 120 additions & 0 deletions discrete_optimization/generic_tools/lexico_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright (c) 2024 AIRBUS and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""Tools for lexicographic optimization."""
import logging
from typing import Any, Iterable, List, Optional, Protocol

from discrete_optimization.generic_tools.callbacks.callback import (
Callback,
CallbackList,
)
from discrete_optimization.generic_tools.do_problem import (
ObjectiveHandling,
ParamsObjectiveFunction,
Problem,
get_default_objective_setup,
)
from discrete_optimization.generic_tools.do_solver import SolverDO
from discrete_optimization.generic_tools.result_storage.result_storage import (
ResultStorage,
)

logger = logging.getLogger(__name__)


class LexicoSolver(SolverDO):

subsolver: SolverDO

def __init__(
self,
subsolver: Optional[SolverDO],
problem: Problem,
params_objective_function: Optional[ParamsObjectiveFunction] = None,
**kwargs: Any,
):
# ensure no aggregation performed
if params_objective_function is None:
params_objective_function = get_default_objective_setup(problem)
params_objective_function.objective_handling = ObjectiveHandling.MULTI_OBJ

# SolverDO init after updating params_objective_function
super().__init__(
problem=problem,
params_objective_function=params_objective_function,
**kwargs,
)

# get subsolver (directly or from its hyperparameters to allow optuna tuning)
kwargs = self.complete_with_default_hyperparameters(kwargs)
if subsolver is None:
if kwargs["subsolver_kwargs"] is None:
subsolver_kwargs = kwargs
else:
subsolver_kwargs = kwargs["subsolver_kwargs"]
if kwargs["subsolver_cls"] is None:
if "build_default_subsolver" in kwargs:
subsolver = kwargs["build_default_subsolver"](
self.problem, **subsolver_kwargs
)
else:
raise ValueError(
"`subsolver_cls` cannot be None if `subsolver` is not specified."
)
else:
subsolver_cls = kwargs["subsolver_cls"]
subsolver = subsolver_cls(problem=self.problem, **subsolver_kwargs)
subsolver.init_model(**subsolver_kwargs)
self.subsolver = subsolver

# check compatibility with lexico optimization
if not subsolver.implements_lexico_api():
logger.warning(
"The chosen subsolver may not be implementing the api needed by LexicoSolver!"
)

def init_model(self, **kwargs: Any) -> None:
self.subsolver.init_model(**kwargs)

def solve(
self,
callbacks: Optional[List[Callback]] = None,
objectives: Optional[Iterable[str]] = None,
**kwargs: Any,
) -> ResultStorage:
# wrap all callbacks in a single one
callbacks_list = CallbackList(callbacks=callbacks)
# start of solve callback
callbacks_list.on_solve_start(solver=self)

if objectives is None:
objectives = self.subsolver.get_model_objectives_available()

res = ResultStorage(
mode_optim=self.params_objective_function.sense_function,
list_solution_fits=[],
)

for i_obj, obj in enumerate(objectives):

# log
logger.debug(f"Optimizing on {obj}")

# optimize next objective
self.subsolver.set_model_objective(obj)
res.extend(self.subsolver.solve(**kwargs))

# end of step callback: stopping?
stopping = callbacks_list.on_step_end(step=i_obj, res=res, solver=self)
if stopping:
break

# add constraint on current objective for next one
fit = self.subsolver.get_model_objective_value(obj, res)
self.subsolver.add_model_constraint(obj, fit)

# end of solve callback
callbacks_list.on_solve_end(res=res, solver=self)

return res
17 changes: 16 additions & 1 deletion discrete_optimization/generic_tools/ortools_cpsat_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
# LICENSE file in the root directory of this source tree.
import logging
from abc import abstractmethod
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Iterable, List, Optional

from ortools.sat.python.cp_model import (
FEASIBLE,
INFEASIBLE,
OPTIMAL,
UNKNOWN,
Constraint,
CpModel,
CpSolver,
CpSolverSolutionCallback,
Expand Down Expand Up @@ -107,6 +108,20 @@ def solve(
callbacks_list.on_solve_end(res=res, solver=self)
return res

def remove_model_constraint(self, constraints: Iterable[Any]) -> None:
"""Remove the intern model constraints.
Args:
constraints: constraints created with `add_model_constraint()`
Returns:
"""
for cstr in constraints:
if not isinstance(cstr, Constraint):
raise RuntimeError()
cstr.proto.Clear()


class OrtoolsCallback(CpSolverSolutionCallback):
def __init__(self, do_solver: OrtoolsCPSatSolver, callback: Callback):
Expand Down
Loading

0 comments on commit e0733f9

Please sign in to comment.