# BOFire Reaction Optimization Example

This notebook follows the example set out in https://github.com/experimental-design/bofire/blob/main/tutorials/basic_examples/Reaction_Optimization_Example.ipynb

In [None]:
# python imports we'll need in this notebook
from pprint import pprint as pp
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import time
import os

## Setting up the optimization problem as a Reaction Domain


In [None]:
from bofire.data_models.domain.api import Domain
from bofire.data_models.domain.api import Inputs, Outputs
from bofire.data_models.features.api import (
    ContinuousInput,
    ContinuousOutput,
    CategoricalInput,
    CategoricalDescriptorInput,
)  # we won't need all of those.

In [None]:
# We wish the temperature of the reaction to be between 30 and 110 °C
temperature_feature = ContinuousInput(
    key="Temperature", bounds=[30.0, 110.0], unit="°C"
)

# Catalyst Loading
catalyst_loading_feature = ContinuousInput(
    key="Catalyst Loading", bounds=[0.5, 2], unit="%"
)

# Residence Time
residence_time_feature = ContinuousInput(
    key="Residence Time", bounds=[1 * 60, 10 * 60], unit="minutes"
)

# Catalyst choice
catalyst_feature = CategoricalInput(
    key="Catalyst",
    categories=[
        "P1-L1",
        "P2-L1",
        "P1-L2",
        "P1-L3",
        "P1-L4",
        "P1-L5",
        "P1-L6",
        "P1-L7",
    ],
)

# gather all individual features
input_features = Inputs(
    features=[
        temperature_feature,
        catalyst_loading_feature,
        residence_time_feature,
        catalyst_feature,
    ]
)

In [None]:
# outputs: we wish to maximize the Yield
# import Maximize Objective to tell the optimizer you wish to optimize
from bofire.data_models.objectives.api import MaximizeObjective

objective = MaximizeObjective(
    w=1.0,
)
yield_feature = ContinuousOutput(key="Yield", objective=objective)
# create an output feature
output_features = Outputs(features=[yield_feature])

In [None]:
objective

In [None]:
# we now have
print("input_features:", input_features)
print("output_features:", output_features)

In [None]:
# The domain is now the object that holds the entire optimization problem / problem definition.
domain = Domain(
    inputs=input_features,
    outputs=output_features,
)

In [None]:
# you can now have a pretty printout of your domain via
(domain.inputs + domain.outputs).get_reps_df()

In [None]:
# and you can access your domain features via
for feature_key in (
    domain.inputs.get_keys()
):  # this will get all the feature names and loop over them
    input_feature = domain.inputs.get_by_key(
        feature_key
    )  # we can extract the individual feature object by asking for it by name
    print(feature_key, "|", input_feature)

In [None]:
# as well as the output features as
# and you can access your domain features via
for feature_key in (
    domain.outputs.get_keys()
):  # this will get all the feature names and loop over them
    output_feature = domain.outputs.get_by_key(
        feature_key
    )  # we can extract the individual feature object by asking for it by name
    print(feature_key, " | ", output_feature.__repr__())

In [None]:
(domain.inputs + domain.outputs).get_reps_df()

## Import a toy Reaction to play around with

In [None]:
import summit
import numpy as np

name_map = {
    "Catalyst Loading": "catalyst_loading",
    "Residence Time": "t_res",
    "Temperature": "temperature",
    "Catalyst": "catalyst",
    "Yield": "yld",
    "TON": "ton",
}
candidates = pd.DataFrame(
    {
        "Catalyst Loading": [0.498],
        "Residence Time": [600],
        "Temperature": [30],
        "Catalyst": ["P1-L3"],
    }
).rename(columns=name_map)
emulator = summit.get_pretrained_reizman_suzuki_emulator(case=1)
conditions = summit.DataSet.from_df(candidates)
results = emulator.run_experiments(conditions, rtn_std=True).rename(
    columns=dict(zip(name_map.values(), name_map.keys())),
)
experiments = pd.DataFrame(
    {
        "Catalyst Loading": results["Catalyst Loading"],
        "Residence Time": results["Residence Time"],
        "Temperature": results["Temperature"],
        "Catalyst": results["Catalyst"],
        "Yield": results["Yield"],
        "valid_Yield": 1,
        "TON": results["TON"],
        "valid_TON": 1,
    }
)

In [None]:
experiments

## Strategy setup

In [None]:
from bofire.data_models.strategies.api import SoboStrategy
from bofire.data_models.acquisition_functions.api import qEI

import bofire.strategies.api as strategies

In [None]:
# a single objective BO strategy

qExpectedImprovement = qEI()
sobo_strategy_data_model = SoboStrategy(
    domain=domain,
    acquisition_function=qExpectedImprovement,
)

# map the strategy data model to the actual strategy that has functionality
sobo_strategy = strategies.map(sobo_strategy_data_model)

In [None]:
sobo_strategy.tell(experiments)



Since a BO strategy requries an underlying regression model for predictions, it requires a certain amount of initial experiments for it to be able to build such a model.

In order to obtain initial experiments, one way is to (pseudo)randomly sample candidate points in the reaction domain. This can e.g. be done by the RandomStrategy.


In [None]:
# a random strategy
from bofire.data_models.strategies.api import (
    RandomStrategy as RandomStrategyModel,
)

random_strategy_model = RandomStrategyModel(domain=domain)
# we have to provide the strategy with our optimization problem so it knows where to sample from.
random_strategy = strategies.map(random_strategy_model)

In [None]:
domain

In [None]:
# let's ask for five random sets of conditions
candidates = random_strategy.ask(5)



you can have a look at the candidates


In [None]:
candidates

In [None]:
import util

In [None]:
experiments = util.evaluate_candidates(candidates)

In [None]:
experiments



This info can now be given to the bo strategy so it can use it to fit the underlying regression model it utilizes via the strategy.tell() method.


In [None]:
t1 = time.time()
sobo_strategy.tell(experiments, replace=True, retrain=True)
print(f"fit took {(time.time()-t1):.2f} seconds")



Using this data we can now get a proposal for a next point to evaluate via the sobo_strategy.ask(1) method.


In [None]:
t1 = time.time()
new_candidate = sobo_strategy.ask(1)
print(f"SOBO step took {(time.time()-t1):.2f} seconds")

This ask call now takes way longer, since first a GP model is fitted to the data, and the acquisition function EI is optimized to obtain the new proposed candidiates. Note that the predictied yield and standard deviation, as well as desirability function value (the underlying value the optimizer sees) are provided in the new_candidate dataframe.

In [None]:
new_candidate

## Optimization Loop

With this strategy.ask() and strategy.tell() we can now do our optimization loop, where after each new proposal, the conditions obtained from ask are evaluated and added to the known datapoints via tell. This requires to refit the underling model in each step.

In [None]:
experimental_budget = 10
i = 0
done = False

while not done:
    i += 1
    t1 = time.time()
    # ask for a new experiment
    new_candidate = sobo_strategy.ask(1)
    new_experiment = util.evaluate_candidates(new_candidate)
    sobo_strategy.tell(new_experiment)
    print(f"Iteration took {(time.time()-t1):.2f} seconds")
    # inform the strategy about the new experiment
    # experiments = pd.concat([experiments,new_experiment],ignore_index=True)
    if i > experimental_budget:
        done = True

## Investigating Results

In [None]:
# you have access to the experiments here
sobo_strategy.experiments

In [None]:
# quick plot of yield vs. Iteration
sobo_strategy.experiments["Yield"].plot()

## Multi-Objective Optimization

In [None]:
from bofire.data_models.objectives.api import MinimizeObjective
from bofire.data_models.strategies.api import MoboStrategy
from bofire.data_models.acquisition_functions.api import qEHVI

max_objective = MaximizeObjective(w=1.0)
min_objective = MinimizeObjective(w=1.0, bounds=[0, 200])

yield_feature = ContinuousOutput(key="Yield", objective=max_objective)
ton_feature = ContinuousOutput(key="TON", objective=min_objective)
# create an output feature
output_features = Outputs(features=[yield_feature, ton_feature])
domain = Domain(
    inputs=input_features,
    outputs=output_features,
)
# a multi objective BO strategy

qExpectedImprovement = qEHVI()
mobo_strategy_data_model = MoboStrategy(
    domain=domain,
    acquisition_function=qExpectedImprovement,
)

# map the strategy data model to the actual strategy that has functionality
mobo_strategy = strategies.map(mobo_strategy_data_model)

In [None]:
# a random strategy
from bofire.data_models.strategies.api import (
    RandomStrategy as RandomStrategyModel,
)

random_strategy_model = RandomStrategyModel(domain=domain)
# we have to provide the strategy with our optimization problem so it knows where to sample from.
random_strategy = strategies.map(random_strategy_model)
candidates = random_strategy.ask(5)
experiments = util.evaluate_candidates(candidates)
mobo_strategy.tell(experiments, replace=True, retrain=True)

In [None]:
experimental_budget = 10
i = 0
done = False

while not done:
    i += 1
    t1 = time.time()
    # ask for a new experiment
    new_candidate = mobo_strategy.ask(1)
    new_experiment = util.evaluate_candidates(new_candidate)
    mobo_strategy.tell(new_experiment)
    print(f"Iteration took {(time.time()-t1):.2f} seconds")
    # inform the strategy about the new experiment
    # experiments = pd.concat([experiments,new_experiment],ignore_index=True)
    if i > experimental_budget:
        done = True

In [None]:
mobo_strategy.experiments["Yield"].plot()

In [None]:
mobo_strategy.experiments["TON"].plot()