# Molecular design ML-in-the-loop workflow with Parsl

This notebook demonstrates an increasingly commmon ML-in-the-loop molecular design application. We use ML to guide the choice of simulations to perform. 
The objective of this application is to identify which molecules have the largest ionization energies (IE, the amount of energy required to remove an electron). 

IE can be computed using various simulation packages (here we use [xTB](https://xtb-docs.readthedocs.io/en/latest/contents.html) ); however, execution of these simulations is expensive, and thus, given a finite compute budget, we must carefully select which molecules to explore. We use machine learning to predict high IE molecules based on previous computations (a process often called [active learning](https://pubs.acs.org/doi/abs/10.1021/acs.chemmater.0c00768)). We iteratively retrain the machine learning model to improve the accuracy of predictions. The resulting ML-in-the-loop workflow proceeds as follows. 

![workflow](./figures/workflow.svg)

In this notebook, we use Globus Compute to execute functions (simulation, model training, and inference) in parallel on remote computers. We show how Globus Compute's use of (i.e., [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)) allows applications to be easily written that dynamically respond to the completion of asynchronous tasks.

In [None]:
def hello():
    return "hello"
    
future = gce.submit(hello)

In [None]:
future.result()

In [None]:

from chemfunctions import compute_vertical
from concurrent.futures import as_completed
from tqdm.notebook import tqdm
from time import monotonic
from random import sample
from pathlib import Path
import numpy as np
import os

In [None]:
%matplotlib inline
from matplotlib import pyplot as plt
import pandas as pd
from concurrent.futures import as_completed
from matplotlib import pyplot as plt
from tqdm.notebook import tqdm
from time import monotonic


## Make an initial dataset

We need data to train our ML models. We'll do that by selecting a set of molecules at random from our search space, performing some simulations on those molecules, and training on the results.

Below, we define two functions: 
`compute_vertical` that computes the "vertical ionization energy" of a molecule (a measure of how much energy it takes to strip an electron off the molecule). `compute_vertical` takes a string representation of a molecule in [SMILES format](https://en.wikipedia.org/wiki/Simplified_molecular-input_line-entry_system) as input and returns the ionization energy as a float. Under the hood, it is running [xTB](https://xtb-docs.readthedocs.io/en/latest/contents.html) to perform a series of quantum chemistry computations.

In preparation for running remotely with Globus Compute, we include all import statements within the function.


In [None]:
from typing import List

def compute_vertical(mol_string: str) -> float:
    """Run the ionization potential computation

    Args:
        xyz: XYZ coordinates for the molecule to evaluate
    Returns:
        Ionization energy in Ha
    """
    from rdkit import Chem, DataStructs
    from rdkit.Chem import AllChem

    from io import StringIO
    from ase.optimize import LBFGSLineSearch
    from xtb.ase.calculator import XTB
    from ase.io import read
    import numpy as np
   
    # Generate 3D coordinates for the molecule
    mol = Chem.MolFromSmiles(mol_string)
    if mol is None:
        raise ValueError(f'Parse failure for {mol_string}')
    mol = Chem.AddHs(mol)
    AllChem.EmbedMolecule(mol, randomSeed=1)
    AllChem.MMFFOptimizeMolecule(mol)

    # Save geometry as 3D coordinates
    xyz = f"{mol.GetNumAtoms()}\n"
    xyz += mol_string + "\n"
    conf = mol.GetConformer()
    for i, a in enumerate(mol.GetAtoms()):
        s = a.GetSymbol()
        c = conf.GetAtomPosition(i)
        xyz += f"{s} {c[0]} {c[1]} {c[2]}\n"
        
    # Make the XTB calculator
    calc = XTB(accuracy=0.05)
    
    # Parse the molecule
    atoms = read(StringIO(xyz), format='xyz')

    # Compute the neutral geometry
    # Uses QCEngine (https://github.com/MolSSI/QCEngine) to handle interfaces to XTB
    atoms.calc = calc
    dyn = LBFGSLineSearch(atoms, logfile=None)
    dyn.run(fmax=0.02, steps=250)
    
    neutral_energy = atoms.get_potential_energy()

    # Compute the energy of the relaxed geometry in charged form
    charges = np.ones((len(atoms),)) * (1 / len(atoms))
    atoms.set_initial_charges(charges)
    charged_energy = atoms.get_potential_energy()
    
    return charged_energy - neutral_energy


First, we can run these functions locally to compute the ionization potential .

In [None]:
ie = compute_vertical('O') #  Run water as a demonstration (O is the SMILES for water)

print(f"The ionization energy of O is {ie} eV")

### Run the simulation on a remote computer using Globus Compute

We now aim to run our simulation on a remote computer using Globus Compute. First we need to choose a specific endpoint for execution and create the Globus Compute Executor. 



In [None]:
from globus_compute_sdk import Client, Executor
from globus_compute_sdk.serialize import CombinedCode

compute_endpoint = 'a7de978c-6c0f-476d-a3f1-bed6309b7c2b' 

gc = Client(code_serialization_strategy=CombinedCode())
gce = Executor(endpoint_id=compute_endpoint, client=gc)

Now we can use Globus Compute to submit the ``compute_vertical'' function for execution. 

Because we never call `.result()`, this code does not wait for any results to be ready. Instead, Parsl is running the computations in the background. Parsl manages sending work to each worker process, collecting results, and feeding new work to workers as new tasks are submitted.

In [None]:
ie_future = gce.submit(compute_vertical, 'O')

In [None]:
ie_future.result()

# Running many simulations

We now need to run a number of simulations. We define configuration parameters for the ML-in-the-loop app, specifically the search space of molecules (selected randomly from the QM9 database) and parameters controlling the optimization algorithm (the number of initial simulations, total moleucles to be evaluated, and the number of molecules to be evaluated in a batch).

In [None]:
search_space = pd.read_csv('data/QM9-search.tsv', sep='\s+')  # Our search space of molecules

initial_count = 2

We use a standard Python loop to submit a set of simulations for execution. As above, each invocation returns a `Future` immediately, so this code should finish within a few milliseconds.

In [None]:
smiles = search_space.sample(initial_count)['smiles']
smiles = ['C', 'O', 'CO']
ie_futures = [gce.submit(compute_vertical, s) for s in smiles]
smiles_futures = dict(zip(ie_futures,smiles)) # Mapping from future to smiles
print(f'Submitted {len(ie_futures)} calculations')

The futures produced by Globus Compute are based on Python's [native "Future"](https://docs.python.org/3/library/concurrent.futures.html#future-objects) object,
so we can use Python's utility functions to work with them. 
We use `as_completed` to take an iterable (in this case a list) of futures and to yeild as each future completes.  Thus, we progress and handle each simulation as it completes


In [None]:
while len(ie_futures) > 0: 
    # First, get the next completed computation from the list
    future = next(as_completed(ie_futures))
    
    # Remove it from the list of still-running tasks
    ie_futures.remove(future)

    print(future.result())

You may see that some functions fail (this is common with stochastic simulations). To overcome these errors, we can write a loop that runs a new computation if previous ones fail. 

We use, `Future.exception()` rather than the similar `Future.result()`. `Future.exception()` behaves similarly in that it will block until the relevant task is completed, but rather than return the result, it returns any exception that was raised during execution (or `None` if not). In this case, if the future returns an exception we simply pick a new molecule and re-execute the simulation.

In [None]:
train_data = []
while len(ie_futures) > 0: 
    # First, get the next completed computation from the list
    future = next(as_completed(ie_futures))
    
    # Remove it from the list of still-running tasks
    ie_futures.remove(future)
    
    # Get the input 
    smiles = smiles_futures[future]  #future.task_def['args'][0]
    
    # Check if the run completed successfully
    if future.exception() is not None:
        # If it failed, pick a new SMILES string at random and submit it    
        print(f'Computation for {smiles} failed, submitting a replacement computation')
        smiles = search_space.sample(1).iloc[0]['smiles'] # pick one molecule
        new_future = gce.submit(compute_vertical, smiles) # launch a new simulation
        smiles_futures[future] = smiles
        ie_futures.append(new_future) # store the Future so we can keep track of it
    else:
        # If it succeeded, store the result
        print(f'Computation for {smiles} succeeded')
        train_data.append({
            'smiles': smiles,
            'ie': future.result(),
            'batch': 0,
            'time': monotonic()
        })

We now have an initial set of training data. We load this training data into a pandas `DataFrame` containing the randomly samples molecules alongside the simulated ionization energy (`ie`). In addition, the code above has stored some metadata (`batch` and `time`) which we will use later.

In [None]:
train_data = pd.DataFrame(train_data)
train_data

## Train a machine learning model to screen candidate molecules
Our next step is to create a machine learning model to estimate the outcome of new computations (i.e., ionization energy) and use it to rapidly scan the search space.

To start, let's make a function that uses our prior simulations to train a model. We are going to use RDKit and scikit-learn to train a nearest-neighbor model that uses Morgan fingerprints to define similarity (see [notes from a UChicago AI course](https://github.com/WardLT/applied-ai-for-materials/blob/main/molecular-property-prediction/chemoinformatics/2_ml-with-fingerprints.ipynb) for more detail). In short, the function trains a model that first populates a list of certain substructures (Morgan fingerprints, specifically) and then trains a model which predicts the IE of a new molecule by averaging those with the most similar substructures.

In [None]:
def train_model(train_data):
    """Train a machine learning model using Morgan Fingerprints.
    
    Args:
        train_data: Dataframe with a 'smiles' and 'ie' column
            that contains molecule structure and property, respectfully.
    Returns:
        A trained model
    """
    # Imports for python functions run remotely must be defined inside the function
    from chemfunctions import MorganFingerprintTransformer
    from sklearn.neighbors import KNeighborsRegressor
    from sklearn.pipeline import Pipeline
    
    
    model = Pipeline([
        ('fingerprint', MorganFingerprintTransformer()),
        ('knn', KNeighborsRegressor(n_neighbors=3, weights='distance', metric='jaccard', n_jobs=-1))  # n_jobs = -1 lets the model run all available processors
    ])
    
    return model.fit(train_data['smiles'], train_data['ie'])

Now let's execute the function and run it asynchronously with Parsl

In [None]:
train_future = gce.submit(train_model, train_data)


In [None]:
model = train_future.result()

In [None]:
def run_model(model, smiles):
    """Run a model on a list of smiles strings
    
    Args:
        model: Trained model that takes SMILES strings as inputs
        smiles: List of molecules to evaluate
    Returns:
        A dataframe with the molecules and their predicted outputs
    """
    import pandas as pd
    pred_y = model.predict(smiles)
    return pd.DataFrame({'smiles': smiles, 'ie': pred_y})

inference = gce.submit(model, 'O')

In [None]:
inference.result()

Now we can chop up the search space into chunks, and invoke `run_model`  once for each chunk of the search space.

Note: we pass `train_future` (the future created from the training function above) as input to `run_model`. Globus Compute will wait for the training to be complete (i.e., the future to be resolved) before executing `run_model`.

In [None]:
# Chunk the search space into smaller pieces, so that each can run in parallel
chunks = np.array_split(search_space['smiles'], 64)
inference_futures = [run_model(train_future, chunk) for chunk in chunks]

Now we can wait for all the inferences to complete and concatenate the results into a single dataframe. 

In [None]:
dfs = []
for r in inference_futures:
    dfs.append(r)
predictions = pd.concat(dfs, ignore_index=True)

#### Results

After completing the inference process we now have predicted IE values for all molecules in our search space. We can print out the best five molecules, according to the trained model:

In [None]:
predictions.sort_values('ie', ascending=False).head(5)

We have now created a workflow that is able to train a model and use it to identify molecules that are likely to be good next choices for simulations. 

## Model-in-the-Loop Workflow
We are going to build an application that uses a machine learning model to pick a batch of simulations, runs the simulations in parallel, and then uses the data to retrain the model before repeating the loop.

Our application uses `train_model`, `run_model`, and `combine_inferences` as above, but after running an iteration it picks the predicted best molecules and runs the `compute_vertical_app` to run the xTB simulation.  The workflow then repeatedly retrains the model using these results until a fixed number of molecule simulations have been trained. 

Make the search space a list so that we can remove completed molecules more easily

In [None]:
with tqdm(total=search_count) as prog_bar: # setup a graphical progress bar
    # Mark when we started
    start_time = monotonic()
    
    # Submit with some random guesses
    train_data = []
    init_mols = search_space.sample(initial_count)['smiles']
    sim_futures = [compute_vertical_app(mol) for mol in init_mols]
    already_ran = set()
    
    # Loop until you finish populating the initial set
    while len(sim_futures) > 0: 
        # First, get the next completed computation from the list
        future = next(as_completed(sim_futures))

        # Remove it from the list of still-running tasks
        sim_futures.remove(future)

        # Get the input 
        smiles = future.task_def['args'][0]
        already_ran.add(smiles)

        # Check if the run completed successfully
        if future.exception() is not None:
            # If it failed, pick a new SMILES string at random and submit it    
            smiles = search_space.sample(1).iloc[0]['smiles'] # pick one molecule
            new_future = compute_vertical_app(smiles) # launch a simulation in Parsl
            sim_futures.append(new_future) # store the Future so we can keep track of it
        else:
            # If it succeeded, store the result
            prog_bar.update(1)
            train_data.append({
                'smiles': smiles,
                'ie': future.result(),
                'batch': 0,
                'time': monotonic() - start_time
            })
    
    
    
    # Create the initial training set as a 
    train_data = pd.DataFrame(train_data)
    
    # Loop until complete
    batch = 1
    while len(train_data) < search_count:
        
        # Train and predict as show in the previous section.
        train_future = train_model(train_data)
        inference_futures = [run_model(train_future, chunk) for chunk in np.array_split(search_space['smiles'], 64)]
        predictions = combine_inferences(inputs=inference_futures).result()

        # Sort the predictions in descending order, and submit new molecules from them
        predictions.sort_values('ie', ascending=False, inplace=True)
        sim_futures = []
        for smiles in predictions['smiles']:
            if smiles not in already_ran:
                sim_futures.append(compute_vertical_app(smiles))
                already_ran.add(smiles)
                if len(sim_futures) >= batch_size:
                    break

        # Wait for every task in the current batch to complete, and store successful results
        new_results = []
        for future in as_completed(sim_futures):
            if future.exception() is None:
                prog_bar.update(1)
                new_results.append({
                    'smiles': future.task_def['args'][0],
                    'ie': future.result(),
                    'batch': batch, 
                    'time': monotonic() - start_time
                })
                
        # Update the training data and repeat
        batch += 1
        train_data = pd.concat((train_data, pd.DataFrame(new_results)), ignore_index=True)

We can plot the training data against the time of simulation, showing that the model is finding better molecules over time. 

In [None]:
fig, ax = plt.subplots(figsize=(4.5, 3.))

ax.scatter(train_data['time'], train_data['ie'])
ax.step(train_data['time'], train_data['ie'].cummax(), 'k--')

ax.set_xlabel('Walltime (s)')
ax.set_ylabel('Ion. Energy (Ha)')

fig.tight_layout()

Save that data for comparison with another application later

In [None]:
Path('run-data').mkdir(exist_ok=True)
train_data.to_csv('run-data/parsl-results.csv', index=False)