# Xopt Evaluator Basic Usage 

The `Evaluator` handles the execution of the user-provided `function` with optional `function_kwags`, asyncrhonously and parallel, with exception handling. 

In [1]:
# needed for macos
import platform
if platform.system() == "Darwin": import multiprocessing;multiprocessing.set_start_method("fork")


In [2]:
from xopt import Xopt, Evaluator, VOCS
from xopt.generators.random import RandomGenerator

import pandas as pd

from time import sleep
from numpy.random import randint

from typing import Dict

import numpy as np
np.random.seed(666) # for reproducibility

Define a custom function `f(inputs: Dict) -> outputs: Dict`. 

In [3]:
def f(inputs: Dict, enable_errors=True) -> Dict:

    sleep(randint(1, 5)*.1)  # simulate computation time
    # Make some occasional errors
    if enable_errors and np.any(inputs["x"] > 0.8):
        raise ValueError("x > 0.8")

    return {"f1": inputs["x"] ** 2 + inputs["y"] ** 2}

Define variables, objectives, constraints, and other settings (VOCS)

In [4]:
vocs = VOCS(variables={"x": [0, 1], "y": [0, 1]}, objectives={"f1": "MINIMIZE"})
vocs



VOCS(variables={'x': [0.0, 1.0], 'y': [0.0, 1.0]}, constraints={}, objectives={'f1': 'MINIMIZE'}, constants={}, observables=[])

This can be used to make some random inputs for evaluating the function. 

In [None]:
in1 = vocs.random_inputs()

f(in1, enable_errors=False)

In [None]:
# Add in occasional errors. 
try:
    f({"x": 1, "y": 0})
except Exception as ex:
    print(f"Caught error in f: {ex}")

In [None]:
# Create Evaluator
ev = Evaluator(function=f)

In [None]:
# Single input evaluation
ev.evaluate(in1)

In [None]:
# Dataframe evaluation
in10 = pd.DataFrame({
    "x":np.linspace(0,1,10),
    "y":np.linspace(0,1,10)
})
ev.evaluate_data(in10)


In [None]:
# Dataframe submission (returns futures dict)
futures = ev.submit_data(in10)
for future in futures:
    print(future.result())

In [None]:
# Dataframe evaluation, vectorized
ev.vectorized = True
ev.evaluate_data(in10)


In [None]:
# Vectorized submission. This returns a single future.
ev.vectorized = True
futures = ev.submit_data(in10)
len(futures)

In [None]:
futures[0].result()

In [None]:
# Collect in a dataframe
res = futures[0].result()
# If there is an error, all outputs are spoiled.
if res['xopt_error']:
    res = [res]
pd.DataFrame(res)

# Executors

In [None]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
MAX_WORKERS = 10

In [None]:
# Create Executor insance
executor = ProcessPoolExecutor(max_workers=MAX_WORKERS)
executor

In [None]:
# Dask (Optional)
# from dask.distributed import Client
# import logging
# client = Client( silence_logs=logging.ERROR)
# executor = client.get_executor()
# client

In [None]:
# This calls `executor.map`
ev = Evaluator(function=f, executor=executor, max_workers=MAX_WORKERS)

In [None]:
# This will run in parallel
ev.evaluate_data(in10)

# Evaluator in the Xopt object

In [5]:
X = Xopt(generator=RandomGenerator(vocs=vocs), evaluator=Evaluator(function=f),
         vocs=vocs)
X.options.strict = False

# Submit to the evaluator some new inputs
X.submit_data(vocs.random_inputs(4))

# Unevaluated inputs are collected in a dataframe
X._input_data

Unnamed: 0,x,y
1,0.299563,0.048542
2,0.155813,0.987297
3,0.323486,0.586412
4,0.272142,0.951187


In [6]:
# Internal futures dictionary
X._futures

{1: <Future at 0x1491db34c70 state=finished returned dict>,
 2: <Future at 0x1491db501f0 state=finished returned dict>,
 3: <Future at 0x1491db50310 state=finished returned dict>,
 4: <Future at 0x1491db50250 state=finished returned dict>}

In [7]:
# Collect all finished futures and updata dataframe
X.process_futures()
X.data

Unnamed: 0,x,y,f1,xopt_runtime,xopt_error
1,0.299563,0.048542,0.092094,0.305222,False
2,0.155813,0.987297,0.999033,0.200008,False
3,0.323486,0.586412,0.448522,0.404915,False
4,0.272142,0.951187,0.978818,0.106974,False


In [8]:
# Futures are now cleared out
X._futures

{}

In [9]:
# This is the internal counter
X._ix_last

4

In [10]:
# This causes immediate evaluation
X.evaluate_data(vocs.random_inputs(4))

Unnamed: 0,x,y,f1,xopt_runtime,xopt_error,xopt_error_str
5,0.799752,0.706772,1.139131,0.113785,False,
6,0.255846,0.225521,0.116317,0.104937,False,
7,0.807108,0.994891,,0.310399,True,"Traceback (most recent call last):\n File ""C:..."
8,0.299155,0.887142,0.876515,0.311601,False,


In [11]:
# Singe generation step
X.step()
X.data

Unnamed: 0,x,y,f1,xopt_runtime,xopt_error,xopt_error_str
1,0.299563,0.048542,0.092094,0.305222,False,
2,0.155813,0.987297,0.999033,0.200008,False,
3,0.323486,0.586412,0.448522,0.404915,False,
4,0.272142,0.951187,0.978818,0.106974,False,
5,0.799752,0.706772,1.139131,0.113785,False,
6,0.255846,0.225521,0.116317,0.104937,False,
7,0.807108,0.994891,,0.310399,True,"Traceback (most recent call last):\n File ""C:..."
8,0.299155,0.887142,0.876515,0.311601,False,
9,0.976764,0.272679,,0.111411,True,"Traceback (most recent call last):\n File ""C:..."


In [None]:
# Usage with a parallel executor. 
X2 = Xopt(
    generator=RandomGenerator(vocs=vocs),
    evaluator=Evaluator(function=f, executor=executor, max_workers=MAX_WORKERS),
    vocs=vocs,
)
X2.options.asynch = True
X2.options.strict = False

In [None]:
X2.step()

In [None]:
for _ in range(20):
    X2.step()

len(X2.data)

In [None]:
X2.data.plot.scatter("x", "y")

In [None]:
# Asynchronous, Vectorized
X2 = Xopt(
    generator=RandomGenerator(vocs=vocs),
    evaluator=Evaluator(function=f, executor=executor, max_workers=MAX_WORKERS),
    vocs=vocs,
)
X2.options.asynch = True
X2.evaluator.vectorized = True
X2.options.strict = False

# This takes fewer steps to achieve a similar number of evaluations
for _ in range(3):
    X2.step()

len(X2.data)