In [1]:
# python version 3.10.11
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import plotly.graph_objects as go


%config InlineBackend.figure_format = 'retina'
mpl.rcParams.update(mpl.rcParamsDefault)
mpl.rcParams['axes.linewidth'] = 1.75 #set the value globally

from ax.core.observation import ObservationFeatures
from ax.core.objective import ScalarizedObjective
from ax.core.optimization_config import OptimizationConfig
from ax.metrics.l2norm import L2NormMetric
from ax.modelbridge.factory import get_GPEI
from ax.service.ax_client import AxClient
from ax.service.utils.instantiation import ObjectiveProperties
from ax.modelbridge.generation_strategy import GenerationStrategy, GenerationStep

In [3]:
def generate_mesh(I_range, Br_range, Cl_range):
    """
    Generate a mesh of points for I, Br, and Cl that sum to 1.

    :param I_range: Tuple of (start, end, step) for I.
    :param Br_range: Tuple of (start, end, step) for Br.
    :param Cl_range: Tuple of (start, end, step) for Cl.
    :return: List of [I, Br, Cl] points that sum to 1.
    """
    mesh = []
    I_start, I_end, I_step = I_range
    Br_start, Br_end, Br_step = Br_range
    Cl_start, Cl_end, Cl_step = Cl_range

    I_values = np.linspace(I_start, I_end, round((I_end - I_start) / I_step) + 1)
    Br_values = np.linspace(Br_start, Br_end, round((Br_end - Br_start) / Br_step) + 1)
    Cl_values = np.linspace(Cl_start, Cl_end, round((Cl_end - Cl_start) / Cl_step) + 1)

    for i in I_values:
        for br in Br_values:
            for cl in Cl_values:
                if abs(br + cl + i - 1) < 1e-4:  # Add a small tolerance to deal with floating point precision
                    mesh.append([i, br, cl])

    return mesh



def make_targets(MA_values, mesh_array):
    # Create solutions
    targets = []
    for MA in MA_values:
        for i, br, cl in mesh_array:
            targets.append(make_solution(MA, i, cl))
    return targets


def plot_mesh(mesh_array, color='red', color_values=None, fig=None, symbol='circle', size =5, **marker_kwargs):
    # If no figure is provided, create a new one
    if fig is None:
        fig = go.Figure()

    # If color_values is provided, use it to color the points
    if color_values is not None:
        marker_color = color_values
        colorscale = 'Viridis_r'  # or any other colorscale you prefer
    else:
        marker_color = color
        colorscale = None

    # Add the mesh points to the figure
    fig.add_trace(go.Scatterternary(a=mesh_array[:, 1], b=mesh_array[:, 0], c=mesh_array[:, 2], mode='markers', 
                                    marker=dict(color=marker_color, size= size, colorscale=colorscale, symbol=symbol, **marker_kwargs)))

    # Set axis labels
    fig.update_ternaries(aaxis_title="Br", baxis_title="I", caxis_title="Cl")

    # Update axes ranges
    fig.update_ternaries(sum=1, baxis_min=.8)  # update for I axis

    # Change layout options
    fig.update_layout({
        'plot_bgcolor': 'white',  # make plot background white
        'paper_bgcolor': 'white',  # make paper background white
        'ternary': {
            'sum': 1.0,
            'aaxis': {'linecolor': 'black', 'linewidth': 1, 'ticks': 'outside'},
            'baxis': {'linecolor': 'black', 'linewidth': 1, 'ticks': 'outside'},
            'caxis': {'linecolor': 'black', 'linewidth': 1, 'ticks': 'outside'}
        }
    })

    return fig  # return the updated figure object


def plot_BO_mesh(mesh_array, acq_values, fig=None, symbol='circle', **marker_kwargs):
    # If no figure is provided, create a new one
    if fig is None:
        fig = go.Figure()

    # Add the mesh points to the figure
    fig.add_trace(go.Scatterternary(a=mesh_array[:, 1], b=mesh_array[:, 0], c=mesh_array[:, 2], mode='markers', 
                                    marker=dict(color=acq_values, colorscale='Viridis', symbol=symbol, **marker_kwargs)))

    # Set axis labels
    fig.update_ternaries(aaxis_title="Br", baxis_title="I", caxis_title="Cl")

    # Update axes ranges
    fig.update_ternaries(sum=1, baxis_min=.8)  # update for I axis

    # Change layout options
    fig.update_layout({
        'plot_bgcolor': 'white',  # make plot background white
        'paper_bgcolor': 'white',  # make paper background white
        'ternary': {
            'sum': 1.0,
            'aaxis': {'linecolor': 'black', 'linewidth': 1, 'ticks': 'outside'},
            'baxis': {'linecolor': 'black', 'linewidth': 1, 'ticks': 'outside'},
            'caxis': {'linecolor': 'black', 'linewidth': 1, 'ticks': 'outside'}
        }
    })

    return fig  # return the updated figure object



def enforce_minimum_distance(ax_client, num_trials, min_distance):
    new_points = []
    trial_indices = []
    while len(new_points) < num_trials:
        # Get a batch of points from the optimizer
        batch_trial_indices, _ = ax_client.get_next_trials(1)
        if not batch_trial_indices:  # If no new trials were generated, break the loop
            break
        for trial_index in batch_trial_indices:
            trial = ax_client.get_trial(trial_index)
            point = trial.arm.parameters
            point_vector = np.array(list(point.values()))  # Convert the point to a vector
            # Check if the point is far enough from all existing points
            if all(np.linalg.norm(point_vector - np.array(list(existing_point.values()))) >= min_distance for existing_point in new_points):
                new_points.append(point)
                trial_indices.append(trial_index)
    return trial_indices


def create_experiment(ax_client, search_parameters, objectives, parameter_constraints):
    ax_client.create_experiment(
        name="moo_experiment",
        parameters=search_parameters,
        objectives=objectives,
        parameter_constraints=parameter_constraints,
        overwrite_existing_experiment=True,
    )

def complete_trials(ax_client, df, standard_errors):
    for _, row in df.iterrows():
        metrics_data = {}
        for metric, standard_error in standard_errors.items():
            if pd.notnull(row[metric]):
                metrics_data[metric] = (row[metric], standard_error)
        if metrics_data:
            trial_parameters = {'I': row['I'], 'Br': row['Br']}
            _, trial_index = ax_client.attach_trial(trial_parameters)
            ax_client.complete_trial(trial_index=trial_index, raw_data=metrics_data)

def create_suggested_samples_df(ax_client, batch_trial_indices):
    suggested_samples = []
    for trial_index in batch_trial_indices:
        trial = ax_client.get_trial(trial_index)
        arm_parameters = trial.arm.parameters
        suggested_samples.append(arm_parameters)
    df_suggested = pd.DataFrame(suggested_samples)
    df_suggested['I'] = df_suggested['I'].round(8)
    df_suggested['Br'] = df_suggested['Br'].round(8)
    df_suggested['Cl'] = 1 - df_suggested['I'] - df_suggested['Br']
    df_suggested.loc[np.isclose(df_suggested['I'] + df_suggested['Br'], 1, atol=1e-8), 'Cl'] = 0
    return df_suggested

def evaluate_acquisition_function(ax_client, search_parameters, metrics, weights):
    model = ax_client.generation_strategy.model
    x_eval = np.linspace(search_parameters[0]['bounds'][0], search_parameters[0]['bounds'][1], 101)
    y_eval = np.linspace(search_parameters[1]['bounds'][0], search_parameters[1]['bounds'][1], 101)
    acq_values = np.zeros((len(x_eval), len(y_eval)))
    metric_names = [metric.name for metric in metrics]
    for i, x in enumerate(x_eval):
        for j, y in enumerate(y_eval):
            if x + y >= 0.95 and x + y <= 1:
                plot_parameters = [ObservationFeatures(parameters={'I': x, 'Br': y})]
                acq_values_dict, _ = model.predict(plot_parameters)
                scalarized_acq_value = 0
                for metric_name, weight in zip(metric_names, weights):
                    acq_value = acq_values_dict[metric_name][0]
                    scalarized_acq_value += weight * acq_value
                acq_values[i, j] = scalarized_acq_value
            else:
                acq_values[i, j] = np.nan
    return acq_values

def get_batch_trial_indices(ax_client, num_trials, min_distance=None):
    if min_distance is not None:
        return enforce_minimum_distance(ax_client, num_trials=num_trials, min_distance=min_distance)
    else:
        trial_indices, _ = ax_client.get_next_trials(num_trials)
        return trial_indices
    
    
def create_mesh_array(search_parameters, num_points=101):
    """
    Create a mesh array from the evaluation points.
    
    Args:
        search_parameters (list[dict]): The search parameters.
        num_points (int, optional): The number of points to evaluate. Defaults to 101.
    
    Returns:
        np.ndarray: The mesh array.
    """
    # Define points at which we'll evaluate the acquisition function
    x_eval = np.linspace(search_parameters[0]['bounds'][0], search_parameters[0]['bounds'][1], num_points)
    y_eval = np.linspace(search_parameters[1]['bounds'][0], search_parameters[1]['bounds'][1], num_points)

    # Create a mesh array from the evaluation points
    mesh_array = np.zeros((len(x_eval), len(y_eval), 3))
    mesh_array[:, :, 0] = x_eval[:, None]
    mesh_array[:, :, 1] = y_eval[None, :]
    mesh_array[:, :, 2] = 1 - x_eval[:, None] - y_eval[None, :]

    return mesh_array


def flatten_mesh_array(mesh_array):
    return mesh_array.reshape(-1, 3), acq_values.flatten()

def plot_acquisition_function(mesh_array_flat, acq_values_flat):
    fig = plot_BO_mesh(mesh_array_flat, acq_values_flat)
    return fig

def plot_data_points(df, df_suggested, fig):
    mesh_array_0 = df[['I', 'Br', 'Cl']].values
    mesh_array_1 = df_suggested[['I', 'Br', 'Cl']].values
    fig = plot_mesh(mesh_array_0, color_values = 'black', fig=fig, size=2)
    fig = plot_mesh(mesh_array_1, color='red',symbol = 'x', fig=fig, size = 5)
    return fig


def complete_trials_and_get_new_df(ax_client, batch_trial_indices, df):
    new_rows = []
    for trial_index in batch_trial_indices:
        trial = ax_client.get_trial(trial_index)
        parameters = trial.arm.parameters
        metric_value = np.random.uniform(low=0.1, high=.5, size=(1,))[0]  # TODO: replace with actual function
        ax_client.complete_trial(trial_index=trial_index, raw_data={'emission_energy_difference': (metric_value, 0.0)})
        parameters['emission_energy_difference'] = metric_value  # store the metric value in the parameters
        new_rows.append(parameters)

    # Append the new rows to the original DataFrame, which will then be fed back into Ax for the next batch
    df_new = pd.DataFrame(new_rows)
    df_new = pd.concat([df, df_new], ignore_index=True)
    df_new['Cl'] = 1 - df_new['I'] - df_new['Br']  # calculate Cl from I and B

    # If I + Br is close to 1 within a tolerance, set Cl to exactly 0
    df_new.loc[np.isclose(df_new['I'] + df_new['Br'], 1, atol=1e-8), 'Cl'] = 0

    return df_new


# def enforce_minimum_distance(ax_client, num_trials, min_distance, max_attempts=100):
#     new_points = []
#     trial_indices = []
#     attempts = 0
#     while len(new_points) < num_trials and attempts < max_attempts:
#         # Get a batch of points from the optimizer
#         batch_trial_indices, _ = ax_client.get_next_trials(1)
#         if not batch_trial_indices:  # If no new trials were generated, break the loop
#             break
#         for trial_index in batch_trial_indices:
#             trial = ax_client.get_trial(trial_index)
#             point = trial.arm.parameters
#             point_vector = np.array(list(point.values()))  # Convert the point to a vector
#             # Check if the point is far enough from all existing points
#             if all(np.linalg.norm(point_vector - np.array(list(existing_point.values()))) >= min_distance for existing_point in new_points):
#                 new_points.append(point)
#                 trial_indices.append(trial_index)
#         attempts += 1
#     return trial_indices



In [4]:
# Importing Initial Screening Dataset

df = pd.read_csv('filepath.csv')

In [5]:
# Create a new column 'FA_value' by extracting FA value from 'spincoat0_drop0_solutes'
df['FA'] = df['spincoat0_drop0_solutes'].str.extract('FA(\d+.\d+)').astype(float)

# Create a new column 'MA_value' by extracting MA value from 'spincoat0_drop0_solutes'
df['MA'] = df['spincoat0_drop0_solutes'].str.extract('MA(\d+.\d+)')

# Fill NA values in 'MA_value' column with 0
df['MA'] = df['MA'].fillna(0).astype(float)


In [6]:
# Find unique 'MA' values
MA_values = df['MA'].unique()

# Loop through each unique 'MA' value
for MA in MA_values:
    # Filter the dataframe for the current 'MA' value
    df_filtered = df[df['MA'] == MA]
    
    # Get the 'I', 'Br', and 'Cl' values as a NumPy array
    mesh_array = df_filtered[['I', 'Br', 'Cl']].values
    
    # Plot the ternary diagram
    fig = plot_mesh(mesh_array, color='red')
    
    # Add a title to the plot
    fig.update_layout(title_text=f'MA = {MA}')
    
    # Show the plot
    fig.show()


In [7]:
df_filtered = df[df['MA'] == 0].reset_index(drop=True)
df = df_filtered

In [8]:
bandgap_target = 1.67

df = df[df['emission_energy'] >= 1.5]
df = df.dropna(subset=['emission_energy', 'redplspec_fwhm', 'photostability_peakev_rate'])
df = df[df['I'] + df['Br'] >= 0.95].reset_index(drop=True)
df['emission_energy_difference_negative'] = -abs(bandgap_target - df['emission_energy'])
df['redplspec_fwhm_negative'] = -abs(df['redplspec_fwhm'])
df['photostability_peakev_rate_negative'] = -abs(df['photostability_peakev_rate'])
# df[['MA', 'I', 'Br', 'Cl', 'redplspec_fwhm','redplspec_intensity', 'emission_energy', 'photostability_intensity_rate', 'photostability_peakev_rate']]
df[['I', 'Br', 'Cl', 'emission_energy_difference_negative', 'redplspec_fwhm_negative', 'photostability_peakev_rate_negative']][0:3]


Unnamed: 0,I,Br,Cl,emission_energy_difference_negative,redplspec_fwhm_negative,photostability_peakev_rate_negative
0,0.9,0.075,0.025,-0.032526,-0.105141,-0.009812
1,0.833,0.117,0.05,-0.025517,-0.112115,-0.016963
2,0.867,0.133,0.0,-0.007837,-0.096869,-0.002481


## Multiobjective

In [11]:
ax_client = AxClient()

num_trials = 10

# Define your metrics
metrics = [
    L2NormMetric('emission_energy_difference_negative', param_names=['I', 'Br']),
    L2NormMetric('redplspec_fwhm_negative', param_names=['I', 'Br']),
    L2NormMetric('photostability_peakev_rate_negative', param_names=['I', 'Br']),
]

# Create a ScalarizedObjective
weights = [1, .1, 1]
objective = ScalarizedObjective(metrics=metrics, weights=weights)

# Create an ScalarizedOptimizationConfig
optimization_config = OptimizationConfig(objective=objective)

# Define a dictionary that maps each metric to its corresponding standard error
standard_errors = {
    'emission_energy_difference_negative': 0.00226,
    'redplspec_fwhm_negative': 0.00226*2,
    'photostability_peakev_rate_negative': 0.00226,
}

# Define your search space and optimization parameters
search_parameters = [
    {"name": "I", "type": "range", "bounds": [0.8, 1.0]},
    {"name": "Br", "type": "range", "bounds": [0.0, .2]},
]

# Add a sum constraint
parameter_constraints = ["I + Br >= .95", "I + Br <= 1"]

# Define the generation strategy

gs = GenerationStrategy(
    name="CustomStrategy_GPEI_ONLY",
    steps=[
        GenerationStep(
            model=get_GPEI, 
            num_trials=num_trials, 
            model_kwargs={
                # "nu": 2,
                "acquisition_function_kwargs": {"xi": .01},
            }
        )
    ],
)

# Initialize the AxClient
ax_client = AxClient(generation_strategy=gs, verbose_logging=False)

# Create the experiment
create_experiment(
    ax_client, 
    search_parameters, 
    {
        'emission_energy_difference_negative': ObjectiveProperties(minimize=True),
        'redplspec_fwhm_negative': ObjectiveProperties(minimize=True), 
        'photostability_peakev_rate_negative': ObjectiveProperties(minimize=True),
    }, 
    parameter_constraints
)

# Set the optimization config on your experiment
ax_client.experiment.optimization_config = optimization_config

# Complete trials
complete_trials(ax_client, df, standard_errors)

# Get batch_trial_indices
enforce_min_distance = False  # Set this to True if you want to enforce minimum distance
min_distance = 0.05 if enforce_min_distance else None

batch_trial_indices = get_batch_trial_indices(ax_client, num_trials=num_trials, min_distance=min_distance)

# Create a DataFrame of the suggested samples
df_suggested = create_suggested_samples_df(ax_client, batch_trial_indices)

# Evaluate the acquisition function
acq_values = evaluate_acquisition_function(ax_client, search_parameters, metrics, weights)

# df_new = complete_trials_and_get_new_df(ax_client, batch_trial_indices, df)

print(df_suggested)


Using a factory function to describe the model, so optimization state cannot be stored and optimization is not resumable if interrupted.

[INFO 07-29 17:52:30] ax.modelbridge.generation_strategy: Using model via callable function, so optimization is not resumable if interrupted.
[INFO 07-29 17:52:30] ax.service.utils.instantiation: Due to non-specification, we will use the heuristic for selecting objective thresholds.
[INFO 07-29 17:52:30] ax.service.utils.instantiation: Inferred value type of ParameterType.FLOAT for parameter I. If that is not the expected value type, you can explicity specify 'value_type' ('int', 'float', 'bool' or 'str') in parameter dict.
[INFO 07-29 17:52:30] ax.service.utils.instantiation: Inferred value type of ParameterType.FLOAT for parameter Br. If that is not the expected value type, you can explicity specify 'value_type' ('int', 'float', 'bool' or 'str') in parameter dict.
[INFO 07-29 17:52:30] ax.service.utils.instantiation: Created search space: SearchSp

          I        Br        Cl
0  0.824210  0.166654  0.009137
1  0.852098  0.147902  0.000000
2  0.826506  0.173494  0.000000
3  0.824486  0.158847  0.016667
4  0.820957  0.144770  0.034274
5  0.844629  0.155371  0.000000
6  0.843915  0.156085  0.000000
7  0.844372  0.155628  0.000000
8  1.000000  0.000000  0.000000
9  1.000000  0.000000  0.000000


In [12]:
# Create a mesh array from the evaluation points
mesh_array = create_mesh_array(search_parameters)

# Flatten the mesh array and acquisition function values for plotting
mesh_array_flat, acq_values_flat = flatten_mesh_array(mesh_array)

# Plot the acquisition function on a ternary plot
fig = plot_acquisition_function(mesh_array_flat, acq_values_flat)

# Plot the data points
fig = plot_data_points(df, df_suggested, fig)

fig.show()

### Extra Scratch-Work

In [None]:
# def plot_mesh(mesh_array, fig=None, **marker_kwargs):
#     # If no figure is provided, create a new one
#     if fig is None:
#         fig = go.Figure()

#     # Add the mesh points to the figure
#     fig.add_trace(go.Scatterternary(a=mesh_array[:, 1], b=mesh_array[:, 0], c=mesh_array[:, 2], mode='markers', marker=marker_kwargs))

#     # Set axis labels
#     fig.update_ternaries(aaxis_title="Br", baxis_title="I", caxis_title="Cl")

#     # Update axes ranges
#     fig.update_ternaries(sum=1, baxis_min=.8)  # update for I axis

#     # Change layout options
#     fig.update_layout({
#         'plot_bgcolor': 'white',  # make plot background white
#         'paper_bgcolor': 'white',  # make paper background white
#         'ternary': {
#             'sum': 1.0,
#             'aaxis': {'linecolor': 'black', 'linewidth': 1, 'ticks': 'outside'},
#             'baxis': {'linecolor': 'black', 'linewidth': 1, 'ticks': 'outside'},
#             'caxis': {'linecolor': 'black', 'linewidth': 1, 'ticks': 'outside'}
#         }
#     })

#     return fig  # return the updated figure object

In [None]:
# from ax.core.metric import Metric
# from ax.core.objective import Objective
# from ax.core.optimization_config import OptimizationConfig
# from ax.core.outcome_constraint import OutcomeConstraint
# from ax.core.parameter import RangeParameter, ParameterType
# from ax.core.search_space import SearchSpace
# from ax.core.experiment import Experiment
# from ax.core.data import Data
# from ax.core.trial import Trial
# from ax.modelbridge.factory import Models

# # Define your custom metrics
# class EmissionEnergyDifference(Metric):
#     def fetch_trial_data(self, trial): 
#         records = []
#         for arm_name, arm in trial.arms_by_name.items():
#             params = arm.parameters
#             records.append({
#                 "arm_name": arm_name,
#                 "metric_name": self.name,
#                 "mean": params['emission_energy_difference'],
#                 "sem": 0.0,
#                 "trial_index": trial.index,
#             })
#         return Data(df=pd.DataFrame.from_records(records))

# class RedPLSpecIntensity(Metric):
#     def fetch_trial_data(self, trial): 
#         records = []
#         for arm_name, arm in trial.arms_by_name.items():
#             params = arm.parameters
#             records.append({
#                 "arm_name": arm_name,
#                 "metric_name": self.name,
#                 "mean": params['redplspec_intensity'],
#                 "sem": 0.0,
#                 "trial_index": trial.index,
#             })
#         return Data(df=pd.DataFrame.from_records(records))

# # Define your search space and optimization parameters
# search_space = SearchSpace(
#     parameters=[
#         RangeParameter("MA", ParameterType.FLOAT, lower=0.0, upper=0.1),
#         RangeParameter("I", ParameterType.FLOAT, lower=0.6, upper=1.0),
#         RangeParameter("Br", ParameterType.FLOAT, lower=0.0, upper=0.2),
#     ],
# )

# # Define your objectives
# objective1 = Objective(metric=EmissionEnergyDifference(), minimize=True)
# objective2 = Objective(metric=RedPLSpecIntensity(), minimize=True)

# # Define your optimization configuration
# optimization_config = OptimizationConfig(
#     objective=objective1,
#     outcome_constraints=[OutcomeConstraint("photostability_intensity_rate", op="<=", bound=0.5)],
# )

# # Create an experiment
# exp = Experiment(
#     name="test_experiment",
#     search_space=search_space,
#     optimization_config=optimization_config,
# )

# # Add your existing data to the experiment
# for _, row in df.iterrows():
#     parameters = {
#         'MA': row['MA'],
#         'I': row['I'],
#         'Br': row['Br'],
#         'emission_energy_difference': abs(row['emission_energy'] - 1.67),
#         'redplspec_intensity': row['redplspec_intensity'],
#     }
#     trial = Trial(experiment=exp, generator_run=Models.SOBOL(search_space=exp.search_space).gen(1))
#     trial.mark_running(no_runner_required=True).mark_completed()
#     data = Data.from_evaluations(
#         evaluations={
#             'emission_energy_difference': abs(row['emission_energy'] - 1.67),
#             'redplspec_intensity': row['redplspec_intensity'],
#             'photostability_intensity_rate': row['photostability_intensity_rate'],
#             'photostability_peakev_rate': row['photostability_peakev_rate']
#         },
#         trial_index=trial.index
#     )
#     exp.attach_data(data)

# # Now you can proceed with optimization as usual
# num_trials = 10
# for _ in range(num_trials):
#     trial = exp.new_trial(generator_run=Models.SOBOL(search_space=exp.search_space).gen(1))
#     trial.mark_running(no_runner_required=True).mark_completed()
#     data = Data.from_evaluations(
#         evaluations={
#             'emission_energy_difference': np.random.rand(),
#             'redplspec_intensity': np.random.rand(),
#             'photostability_intensity_rate': np.random.rand(),
#             'photostability_peakev_rate': np.random.rand()
#         },
#         trial_index=trial.index
#     )
#     exp.attach_data(data)

# # Convert the list of dictionaries to a DataFrame
# df_trials = pd.DataFrame([trial.arm.parameters for trial in exp.trials.values()])


In [None]:
# from ax.core.objective import Objective
# from ax.core.optimization_config import OptimizationConfig
# from ax.core.outcome_constraint import OutcomeConstraint
# from ax.core.search_space import SearchSpace
# from ax.core.experiment import Experiment
# from ax.core.parameter import RangeParameter, ParameterType
# # from ax.metrics.l2norm import L2Norm
# from ax.modelbridge.factory import get_sobol, get_GPEI

# # Define your search space
# search_space = SearchSpace(
#     parameters=[
#         RangeParameter("MA", ParameterType.FLOAT, lower=0.0, upper=0.1),
#         RangeParameter("I", ParameterType.FLOAT, lower=0.6, upper=1.0),
#         RangeParameter("Br", ParameterType.FLOAT, lower=0.0, upper=0.2),
#     ],
# )

# # Define your metrics
# class EmissionEnergyDifference(Objective):
#     def __init__(self, name: str):
#         super().__init__(name)

# class RedPLSpecIntensity(Objective):
#     def __init__(self, name: str):
#         super().__init__(name)

# emission_energy_difference = EmissionEnergyDifference(name="emission_energy_difference")
# redplspec_intensity = RedPLSpecIntensity(name="redplspec_intensity")

# # Define your scalarized objective
# objective = ScalarizedObjective(metrics=[emission_energy_difference, redplspec_intensity])

# # Define your optimization configuration
# optimization_config = OptimizationConfig(
#     objective=objective,
#     outcome_constraints=[OutcomeConstraint(metric=PhotostabilityIntensityRate(name='photostability_intensity_rate'), op="<=", bound=0.5)],
# )

# # Create an experiment
# exp = Experiment(
#     name="test_experiment",
#     search_space=search_space,
#     optimization_config=optimization_config,
# )

# # Generate some initial random points
# sobol = get_sobol(exp.search_space)
# for _ in range(5):
#     exp.new_trial(generator_run=sobol.gen(1))

# # Optimize
# for _ in range(10):
#     gpei = get_GPEI(experiment=exp, data=exp.fetch_data())
#     generator_run = gpei.gen(1)
#     exp.new_trial(generator_run=generator_run)


In [None]:
# from ax.core.data import Data
# from ax.core.experiment import Experiment
# from ax.core.metric import Metric
# from ax.core.objective import Objective
# from ax.core.optimization_config import OptimizationConfig
# from ax.core.outcome_constraint import OutcomeConstraint
# from ax.core.parameter import RangeParameter, ParameterType
# from ax.core.search_space import SearchSpace
# from ax.modelbridge.factory import get_sobol, get_GPEI

# # Define your custom metrics
# class EmissionEnergyDifference(Metric):
#     def fetch_trial_data(self, trial):
#         records = []
#         for arm_name, arm in trial.arms_by_name.items():
#             # Here you would get the data for this arm. This is just a placeholder.
#             df_row = df[df['arm_name'] == arm_name]
#             records.append({
#                 "arm_name": arm_name,
#                 "metric_name": self.name,
#                 "mean": df_row['emission_energy_difference'].values[0],
#                 "sem": 0.0,
#                 "trial_index": trial.index,
#             })
#         return Data(df=pd.DataFrame.from_records(records))

# class RedPLSpecIntensity(Metric):
#     def fetch_trial_data(self, trial):
#         records = []
#         for arm_name, arm in trial.arms_by_name.items():
#             # Here you would get the data for this arm. This is just a placeholder.
#             df_row = df[df['arm_name'] == arm_name]
#             records.append({
#                 "arm_name": arm_name,
#                 "metric_name": self.name,
#                 "mean": df_row['redplspec_intensity'].values[0],
#                 "sem": 0.0,
#                 "trial_index": trial.index,
#             })
#         return Data(df=pd.DataFrame.from_records(records))

# # Define your search space
# search_space = SearchSpace(
#     parameters=[
#         RangeParameter("MA", ParameterType.FLOAT, lower=0.0, upper=0.1),
#         RangeParameter("I", ParameterType.FLOAT, lower=0.6, upper=1.0),
#         RangeParameter("Br", ParameterType.FLOAT, lower=0.0, upper=0.2),
#     ],
# )

# # Define your objectives
# emission_energy_difference = EmissionEnergyDifference(name="emission_energy_difference")
# redplspec_intensity = RedPLSpecIntensity(name="redplspec_intensity")

# objective = Objective(metric=emission_energy_difference, minimize=False)

# # Define your optimization configuration
# optimization_config = OptimizationConfig(
#     objective=objective,
#     outcome_constraints=[OutcomeConstraint(metric=redplspec_intensity, op="<=", bound=0.5)],
# )

# # Create an experiment
# exp = Experiment(
#     name="test_experiment",
#     search_space=search_space,
#     optimization_config=optimization_config,
# )

# # Generate some initial random points
# sobol = get_sobol(exp.search_space)
# for _ in range(5):
#     exp.new_trial(generator_run=sobol.gen(1))

# # Optimize
# for _ in range(10):
#     gpei = get_GPEI(experiment=exp, data=exp.fetch_data())
#     generator_run = gpei.gen(1)
#     exp.new_trial(generator_run=generator_run)


In [None]:
# from ax.plot.contour import plot_contour
# from ax.plot.trace import optimization_trace_single_method
# from ax.service.managed_loop import optimize
# from ax.metrics.branin import BraninMetric
# from ax.utils.measurement.synthetic_functions import branin
# from ax.utils.notebook.plotting import render, init_notebook_plotting

# # Initialize notebook plotting
# init_notebook_plotting()

# # Your code here...

# # After the optimization process
# # Plot model predictions
# render(plot_contour(model=ax_client.generation_strategy.model, param_x='I', param_y='Br', metric_name='emission_energy_difference'))

# # Plot acquisition function
# # render(plot_contour(model=ax_client.generation_strategy.model, param_x='I', param_y='Br', metric_name='emission_energy_difference')


# # Plot the acquisition function
# # plt.contourf(x_eval, y_eval, acq_values)
# # plt.colorbar()
# # plt.xlabel('I')
# # plt.ylabel('Br')
# # plt.title('Acquisition Function for emission_energy_difference')
# # plt.show()


In [None]:
# from ax.core.objective import MultiObjective
# from ax.core.optimization_config import MultiObjectiveOptimizationConfig
# from ax.service.ax_client import AxClient
# from ax.service.utils.instantiation import ObjectiveProperties
# from ax.modelbridge.generation_strategy import GenerationStrategy, GenerationStep
# from ax import Models

# from ax.metrics.l2norm import L2NormMetric
# from ax.modelbridge.factory import get_sobol, get_GPEI, get_MOO_EHVI

# import numpy as np
# import pandas as pd

# ax_client = AxClient()

# bandgap_target = 1.67

# df = df[df['I'] + df['Br'] >= 0.95].reset_index(drop=True)
# df['emission_energy_difference'] = -abs(bandgap_target - df['emission_energy'])
# df['redplspec_fwhm'] = -abs(df['redplspec_fwhm'])

# num_trials = 5

# gs = GenerationStrategy(
#     name="CustomStrategy_GPEI_ONLY",
#     steps=[
#         GenerationStep(model=get_MOO_EHVI, num_trials=num_trials),
#     ],
# )
# ax_client = AxClient(generation_strategy=gs, verbose_logging=False)

# search_parameters = [
#     {"name": "I", "type": "range", "bounds": [0.8, 1.0]},
#     {"name": "Br", "type": "range", "bounds": [0.0, .2]},
# ]

# parameter_constraints = ["I + Br >= .95", "I + Br <= 1"]

# ax_client.create_experiment(
#     name="moo_experiment",
#     parameters=search_parameters,
#     objectives={
#         'redplspec_fwhm': ObjectiveProperties(minimize=True), 
#         'emission_energy_difference': ObjectiveProperties(minimize=True),
#     },
#     parameter_constraints=parameter_constraints,
#     overwrite_existing_experiment=True
# )

# metrics = [
#     L2NormMetric('emission_energy_difference', param_names=['I', 'Br']),
#     L2NormMetric('redplspec_fwhm', param_names=['I', 'Br']),
# ]

# objective = MultiObjective(metrics=metrics)

# optimization_config = MultiObjectiveOptimizationConfig(
#     objective=objective,
# )

# ax_client.experiment.optimization_config = optimization_config

# standard_error_ee = 0.001
# standard_error_fwhm = 0.002

# for _, row in df.iterrows():
#     trial_metrics = {}

#     if pd.notnull(row['emission_energy_difference']):
#         trial_metrics['emission_energy_difference'] = (row['emission_energy_difference'], standard_error_ee)
#     if pd.notnull(row['redplspec_fwhm']):
#         trial_metrics['redplspec_fwhm'] = (row['redplspec_fwhm'], standard_error_fwhm)

#     if trial_metrics:  # Only proceed if there's at least one metric
#         trial_parameters = {
#             'I': row['I'],
#             'Br': row['Br'],
#         }
#         _, trial_index = ax_client.attach_trial(trial_parameters)
#         ax_client.complete_trial(trial_index=trial_index, raw_data=trial_metrics)

# # Compute the Pareto frontier
# metric_names = [metric.name for metric in metrics]
# objectives = ax_client.experiment.optimization_config.objective.objectives
# frontier = compute_posterior_pareto_frontier(
#     experiment=ax_client.experiment,
#     data=ax_client.experiment.fetch_data(),
#     primary_objective=objectives[1].metric,
#     secondary_objective=objectives[0].metric,
#     absolute_metrics=metric_names,
#     num_points=20,
# )
# render(plot_pareto_frontier(frontier, CI_level=0.5)) 



In [None]:
# #summarize the statistics of df

# df.describe()

In [None]:
# from ax.core.objective import MultiObjective
# from ax.core.optimization_config import MultiObjectiveOptimizationConfig
# from ax.service.ax_client import AxClient
# from ax.service.utils.instantiation import ObjectiveProperties
# from ax.service.ax_client import AxClient
# from ax.modelbridge.generation_strategy import GenerationStrategy, GenerationStep
# from ax import Models

# from ax.core.objective import ScalarizedObjective
# from ax.core.optimization_config import OptimizationConfig
# from ax.core.outcome_constraint import OutcomeConstraint
# from ax.core.types import ComparisonOp
# from ax.metrics.l2norm import L2NormMetric
# from ax.modelbridge.factory import get_sobol, get_GPEI, get_MOO_EHVI

# from ax.modelbridge.registry import Models, ModelRegistryBase

# from scipy.stats import norm

# import numpy as np
# import pandas as pd

# ax_client = AxClient()


# bandgap_target = 1.67

# df = df[df['I'] + df['Br'] >= 0.95].reset_index(drop=True)
# #since multiple objectives,it defaults to maximize, so need to define as negative goal earlier
# df['emission_energy_difference_negative'] = -abs(bandgap_target - df['emission_energy'])
# df['redplspec_fwhm_negative'] = -abs(df['redplspec_fwhm'])
# df['photostability_peakev_rate_negative'] = -abs(df['photostability_peakev_rate'])


# num_trials = 5
# # steps = 2


# gs = GenerationStrategy(
#     name="CustomStrategy_GPEI_ONLY",
#     steps=[
#         GenerationStep(
#             model=get_GPEI, 
#             num_trials=num_trials, 
#             model_kwargs={
#                 "nu": 2,
#                 "acquisition_function_kwargs": {"xi": 0.01},
#                 # "modelbridge_kwargs": {"transforms": norm(loc=0, scale=0.001)}
#             }
#         )
#     ],
# )

# ax_client = AxClient(generation_strategy=gs, verbose_logging=False)

# # Define your search space and optimization parameters
# search_parameters = [
#     {"name": "I", "type": "range", "bounds": [0.8, 1.0]},
#     {"name": "Br", "type": "range", "bounds": [0.0, .2]},
# ]

# # Add a sum constraint
# parameter_constraints = ["I + Br >= .95", "I + Br <= 1"]

# status_quo = {"I": 0.9, "Br": 0.1}  # Optional parameter

# ax_client.create_experiment(
#     name="moo_experiment",
#     parameters=search_parameters,
#     objectives={
#         'emission_energy_difference_negative': ObjectiveProperties(minimize=True),
#         'redplspec_fwhm_negative': ObjectiveProperties(minimize=True), 
#         'photostability_peakev_rate_negative': ObjectiveProperties(minimize=True),
#     },
#     parameter_constraints=parameter_constraints,
#     # status_quo=status_quo,
#     overwrite_existing_experiment=True

    
# )

# # Define your metrics
# metrics = [
#     L2NormMetric('emission_energy_difference_negative', param_names=['I', 'Br']),
#     L2NormMetric('redplspec_fwhm_negative', param_names=['I', 'Br']),
#     L2NormMetric('photostability_peakev_rate_negative', param_names=['I', 'Br']),
    
    
# ]

# # Create a ScalarizedObjective
# weights = [1, .1, 1]  #TODO # (these should be the same length as your metrics)
# objective = ScalarizedObjective(metrics=metrics, weights=weights)

# # Create an ScalarizedOptimizationConfig
# optimization_config = OptimizationConfig(
#     objective=objective,
# )



# # # Create a MultiObjective
# # objective = MultiObjective(metrics=metrics)

# # # Create a MultiObjectiveOptimizationConfig
# # optimization_config = MultiObjectiveOptimizationConfig(
# #     objective=objective,
# # )

# # # Create an OptimizationConfig for use with constrains
# # optimization_config = OptimizationConfig(
# #     objective=objective,
# #     outcome_constraints=[OutcomeConstraint(L2NormMetric('emission_energy_difference', param_names=['I', 'Br']), op = ComparisonOp.LEQ, bound=0.01, relative=False)],
# # )

# # Set the optimization config on your experiment
# ax_client.experiment.optimization_config = optimization_config

# # Define a dictionary that maps each metric to its corresponding standard error
# standard_errors = {
#     'emission_energy_difference_negative': 0.0008,
#     'redplspec_fwhm_negative': 0.00016,
#     'photostability_peakev_rate_negative': 0.0008,
# }

# # Loop over each row in the DataFrame
# for _, row in df.iterrows():
#     metrics = {}

#     # Loop over each metric and its corresponding standard error
#     for metric, standard_error in standard_errors.items():
#         # Check if the metric is not null
#         if pd.notnull(row[metric]):
#             # Add the metric to the metrics dictionary
#             metrics[metric] = (row[metric], standard_error)

#     # Only proceed if there's at least one metric
#     if metrics:
#         trial_parameters = {
#             'I': row['I'],
#             'Br': row['Br'],
#         }
#         _, trial_index = ax_client.attach_trial(trial_parameters)
#         ax_client.complete_trial(trial_index=trial_index, raw_data=metrics)



# # Now you can proceed with optimization as usual
# batch_trial_indices, _ = ax_client.get_next_trials(num_trials)

# # Create a DataFrame of the suggested samples
# suggested_samples = []
# for trial_index in batch_trial_indices:
#     trial = ax_client.get_trial(trial_index)
#     arm_parameters = trial.arm.parameters
#     suggested_samples.append(arm_parameters)
# df_suggested = pd.DataFrame(suggested_samples)
# df_suggested['I'] = df_suggested['I'].round(8)
# df_suggested['Br'] = df_suggested['Br'].round(8)
# df_suggested['Cl'] = 1 - df_suggested['I'] - df_suggested['Br']
# df_suggested.loc[np.isclose(df_suggested['I'] + df_suggested['Br'], 1, atol=1e-8), 'Cl'] = 0



# # batch_trial_indices, _ = ax_client.get_next_trials(num_trials)

# # Get the model from the generation strategy
# model = ax_client.generation_strategy.model

# # Define points at which we'll evaluate the acquisition function
# x_eval = np.linspace(search_parameters[0]['bounds'][0], search_parameters[0]['bounds'][1], 101)
# y_eval = np.linspace(search_parameters[1]['bounds'][0], search_parameters[1]['bounds'][1], 101)

# # Initialize arrays to store acquisition function values
# acq_values = np.zeros((len(x_eval), len(y_eval)))

# # Evaluate the acquisition function at each point
# for i, x in enumerate(x_eval):
#     for j, y in enumerate(y_eval):
#         # Check if the point satisfies the constraint
#         if x + y >= 0.95 and x + y <= 1:
#             plot_parameters = [ObservationFeatures(parameters={'I': x, 'Br': y})]
#             # Compute the acquisition function value
#             acq_values_dict, _ = model.predict(plot_parameters)
#             # Initialize scalarized_acq_value
#             scalarized_acq_value = 0
#             # Loop over each metric
#             for metric, weight in zip(metrics, weights):
#                 # Get the acquisition value for this metric
#                 acq_value = acq_values_dict[metric][0]
#                 # Add the weighted acquisition value to the scalarized acquisition value
#                 scalarized_acq_value += weight * acq_value
#             # Store the scalarized acquisition value
#             acq_values[i, j] = scalarized_acq_value
#         else:
#             acq_values[i, j] = np.nan  # Assign NaN to points that don't satisfy the constraint







In [None]:
# from ax.core.objective import ScalarizedObjective
# from ax.core.optimization_config import OptimizationConfig
# from ax.metrics.l2norm import L2NormMetric
# from ax.modelbridge.factory import get_GPEI
# from ax.service.ax_client import AxClient
# from ax.service.utils.instantiation import ObjectiveProperties
# from ax.modelbridge.generation_strategy import GenerationStrategy, GenerationStep
# from scipy.stats import norm
# import numpy as np
# import pandas as pd


# ax_client = AxClient()

# bandgap_target = 1.67

# df = df[df['I'] + df['Br'] >= 0.95].reset_index(drop=True)
# #since multiple objectives,it defaults to maximize, so need to define as negative goal earlier
# df['emission_energy_difference_negative'] = -abs(bandgap_target - df['emission_energy'])
# df['redplspec_fwhm_negative'] = -abs(df['redplspec_fwhm'])
# df['photostability_peakev_rate_negative'] = -abs(df['photostability_peakev_rate'])

# # Define your metrics
# metrics = [
#     L2NormMetric('emission_energy_difference_negative', param_names=['I', 'Br']),
#     L2NormMetric('redplspec_fwhm_negative', param_names=['I', 'Br']),
#     L2NormMetric('photostability_peakev_rate_negative', param_names=['I', 'Br']),
#     # L2NormMetric('photostability_intensity_rate', param_names=['I', 'Br']),
# ]

# # Create a ScalarizedObjective
# weights = [1, .1, 1]  #TODO # (these should be the same length as your metrics)
# objective = ScalarizedObjective(metrics=metrics, weights=weights)

# # Create an ScalarizedOptimizationConfig
# optimization_config = OptimizationConfig(
#     objective=objective,
# )

# # # Create a MultiObjective
# # objective = MultiObjective(metrics=metrics)

# # # Create a MultiObjectiveOptimizationConfig
# # optimization_config = MultiObjectiveOptimizationConfig(
# #     objective=objective,
# # )

# # Define a dictionary that maps each metric to its corresponding standard error
# standard_errors = {
#     'emission_energy_difference_negative': 0.0008,
#     'redplspec_fwhm_negative': 0.00016,
#     'photostability_peakev_rate_negative': 0.0008,
#     # 'photostability_intensity_rate': 0.0008,
# }

# # Define your search space and optimization parameters
# search_parameters = [
#     {"name": "I", "type": "range", "bounds": [0.8, 1.0]},
#     {"name": "Br", "type": "range", "bounds": [0.0, .2]},
# ]

# # Add a sum constraint
# parameter_constraints = ["I + Br >= .95", "I + Br <= 1"]

# # Define the generation strategy
# num_trials =45 #TODO
# gs = GenerationStrategy(
#     name="CustomStrategy_GPEI_ONLY",
#     steps=[
#         GenerationStep(
#             model=get_GPEI, 
#             num_trials=num_trials, 
#             model_kwargs={
#                 "nu": 2, #TODO controls the smoothness of the GP
#                 "acquisition_function_kwargs": {"xi": .01}, #TODO adjusts the tradeoff between exploration and exploitation (smaller xi means more exploitation)
#             }
#         )
#     ],
# )

# # Initialize the AxClient
# ax_client = AxClient(generation_strategy=gs, verbose_logging=False)

# # Create the experiment
# ax_client.create_experiment(
#     name="moo_experiment",
#     parameters=search_parameters,
#     objectives={
#         'emission_energy_difference_negative': ObjectiveProperties(minimize=True),
#         'redplspec_fwhm_negative': ObjectiveProperties(minimize=True), 
#         'photostability_peakev_rate_negative': ObjectiveProperties(minimize=True),
#         # 'photostability_intensity_rate': ObjectiveProperties(minimize=False),
#     },
#     parameter_constraints=parameter_constraints,
#     overwrite_existing_experiment=True,
# )

# # Set the optimization config on your experiment
# ax_client.experiment.optimization_config = optimization_config


# # Loop over each row in the DataFrame
# for _, row in df.iterrows():
#     metrics_data = {}

#     # Loop over each metric and its corresponding standard error
#     for metric, standard_error in standard_errors.items():
#         # Check if the metric is not null
#         if pd.notnull(row[metric]):
#             # Add the metric to the metrics dictionary
#             metrics_data[metric] = (row[metric], standard_error)

#     # Only proceed if there's at least one metric
#     if metrics_data:
#         trial_parameters = {
#             'I': row['I'],
#             'Br': row['Br'],
#         }
#         _, trial_index = ax_client.attach_trial(trial_parameters)
#         ax_client.complete_trial(trial_index=trial_index, raw_data=metrics_data)









# # Enforce the minimum distance constraint
# batch_trial_indices = enforce_minimum_distance(ax_client, num_trials=10, min_distance=0.0001)


# # Now proceed with optimization as usual
# # batch_trial_indices, _ = ax_client.get_next_trials(num_trials)

# # Create a DataFrame of the suggested samples
# suggested_samples = []
# for trial_index in batch_trial_indices:
#     trial = ax_client.get_trial(trial_index)
#     arm_parameters = trial.arm.parameters
#     suggested_samples.append(arm_parameters)
# df_suggested = pd.DataFrame(suggested_samples)
# df_suggested['I'] = df_suggested['I'].round(8)
# df_suggested['Br'] = df_suggested['Br'].round(8)
# df_suggested['Cl'] = 1 - df_suggested['I'] - df_suggested['Br']
# df_suggested.loc[np.isclose(df_suggested['I'] + df_suggested['Br'], 1, atol=1e-8), 'Cl'] = 0

# # Get the model from the generation strategy
# model = ax_client.generation_strategy.model

# # Define points at which we'll evaluate the acquisition function
# x_eval = np.linspace(search_parameters[0]['bounds'][0], search_parameters[0]['bounds'][1], 101)
# y_eval = np.linspace(search_parameters[1]['bounds'][0], search_parameters[1]['bounds'][1], 101)

# # Initialize arrays to store acquisition function values
# acq_values = np.zeros((len(x_eval), len(y_eval)))

# # Create a list of metric names
# metric_names = [metric.name for metric in metrics]

# # Evaluate the acquisition function at each point
# for i, x in enumerate(x_eval):
#     for j, y in enumerate(y_eval):
#         # Check if the point satisfies the constraint
#         if x + y >= 0.95 and x + y <= 1:
#             plot_parameters = [ObservationFeatures(parameters={'I': x, 'Br': y})]
#             # Compute the acquisition function value
#             acq_values_dict, _ = model.predict(plot_parameters)
#             # Initialize scalarized_acq_value
#             scalarized_acq_value = 0
#             # Loop over each metric name
#             for metric_name, weight in zip(metric_names, weights):
#                 # Get the acquisition value for this metric
#                 acq_value = acq_values_dict[metric_name][0]
#                 # Add the weighted acquisition value to the scalarized acquisition value
#                 scalarized_acq_value += weight * acq_value
#             # Store the scalarized acquisition value
#             acq_values[i, j] = scalarized_acq_value
#         else:
#             acq_values[i, j] = np.nan  # Assign NaN to points that don't satisfy the constraint




# # Evaluate your function with the suggested parameters and complete the trials
# # new_rows = []
# # for trial_index in batch_trial_indices:
# #     trial = ax_client.get_trial(trial_index)
# #     parameters = trial.arm.parameters
# #     metric_value = np.random.uniform(low=0.1, high=.5, size=(1,))[0]  # build sample and measure via pascal #TODO 
# #     ax_client.complete_trial(trial_index=trial_index, raw_data={'emission_energy_difference': (metric_value, 0.0)})
# #     parameters['emission_energy_difference'] = metric_value  # store the metric value in the parameters
# #     new_rows.append(parameters)

# # # Append the new rows to the original DataFrame, which will then be fed back into Ax for the next batch
# # df_new = pd.DataFrame(new_rows)
# # df_new = pd.concat([df, df_new], ignore_index=True)
# # df_new['Cl'] = 1 - df_new['I'] - df_new['Br']  # calculate Cl from I and B

# # # If I + Br is close to 1 within a tolerance, set Cl to exactly 0
# # df_new.loc[np.isclose(df_new['I'] + df_new['Br'], 1, atol=1e-8), 'Cl'] = 0
