# Interleaving Simulation and Steering
We use a batch strategy [in our previous example](./0_molecular-design-with-parsl.ipynb) that, while simple to implement, leads to under-utilization.
The core problem of a batch strategy is that only one type of task - simulation, training, or inference - at a single time.
The serial nature results in several points during the workflow where either there are not enough tasks (e.g., one model to train)
or tail-down loses while we wait for the last tasks from a batch to complete before starting the next type.
This example shows how to increase parallelism by using [Colmena](https://colmena.readthedocs.io/en/latest/) to run multiple kinds of tasks concurrently.

> **NOTE**: Make sure to start Redis before running this notebook if it is not already running on your system.
```bash
redis-server redis.conf &> redis.log
```

In [1]:
%matplotlib inline
from matplotlib import pyplot as plt
from ipywidgets import widgets
from colmena.models import Result
from colmena.task_server.parsl import ParslTaskServer
from colmena.redis.queue import make_queue_pairs
from colmena.thinker.resources import ResourceCounter
from colmena.thinker import BaseThinker, event_responder, task_submitter, result_processor
from parsl.executors import HighThroughputExecutor
from parsl.config import Config
from random import shuffle
from time import perf_counter
from threading import Lock, Event
from typing import List
from chemfunctions import compute_vertical, train_model, run_model
from tqdm.notebook import tqdm
import pandas as pd
import numpy as np
import logging
import os

Configuration

In [2]:
n_workers = min(4, os.cpu_count())

Log the Colmena output to disk

In [3]:
handlers = [logging.FileHandler('colmena.log', mode='w')]
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                    level=logging.INFO, handlers=handlers)

## Load in the Data
We're going to use the same problem as the previous example.

In [4]:
search_space = pd.read_csv('data/QM9-search.tsv', delim_whitespace=True)  # Our search space of molecules

In [5]:
initial_count: int = 8  # Number of calculations to run at first

In [6]:
search_count: int = 64   # Number of molecules to evaluate in total

In [7]:
batch_size: int = 4  # Number of molecules to evaluate in each batch of simulations

## Building a Colmena Application
Colmena applications have three parts: a _Task Server_ that manages execution of computations at the direction of a _Thinker_ through a _Task Queue_.

### Creating Task Queue
A task queue is responsible for conveying requests to perform a computation to a Task Server, and then supplying results back to the Thinker.
Creating a task queue requires defining connection information to Redis and the names of separate topics used to separate different kinds of tasks

In [8]:
client_queues, server_queues = make_queue_pairs(hostname='localhost', topics=['simulate', 'train', 'infer'], serialization_method='pickle')

### Defining Task Server
The Task Server requires a task queue to communicate through, a list of methods, and a set of computational resources to run them on. (See [Colmena Docs](https://colmena.readthedocs.io/en/latest/how-to.html#configuring-a-task-server))

The computation resources are defined using Parsl's definitions. We'll use the same one as the previous example.

In [9]:
config = Config(
    executors=[HighThroughputExecutor(
        max_workers=n_workers, # Allows a maximum of two workers
        cpu_affinity='block' # Prevents workers from using the same cores
    )]
)

We supply a list of Python functions to define the methods and also give the constructor a link to the queues.

In [10]:
task_server = ParslTaskServer(
    methods=[compute_vertical, train_model, run_model],
    queues=server_queues,
    config=config,
)

The task server runs in the background. 
> *NOTE*: You must kill it before exiting the notebook by sending a kill signal. We do that in the last cell of this notebook.

In [11]:
task_server.start()

The server will run tasks on request from a queue and send them back on a different Redis queue.
The client queue object provides a `send_inputs` and `get_result` method to perform these operations.

In [12]:
%%time
client_queues.send_inputs('C', method='compute_vertical')
result = client_queues.get_result()

CPU times: user 3.61 ms, sys: 0 ns, total: 3.61 ms
Wall time: 3.42 s


Both accept a "topic" option that allows for multiplexing.

In [13]:
client_queues.send_inputs('C', method='compute_vertical', topic='simulate')

# Show that we do not pull results on other topics
result = client_queues.get_result(topic='infer', timeout=15)
assert result is None  # None means a timeout occurred

# Pull from the correct queue
result = client_queues.get_result(topic='simulate')

## Building a Thinker
The Thinker part of a Colmena application coordinates what tasks are run by the Task Server.

Thinker applications are built using a collection of threads ("agents") that cooperate to perform some task. 
For example, you can have an agent that records a simulation being completed and launches a second agent that manages retraining the models.

Below, we walk through how to build a thinker application though progressively more complex examples.

### Example 1: Simulating molecules in a predefined list
A steering policy in Colmena is defined through a [Thinker](https://colmena.readthedocs.io/en/latest/how-to.html#creating-a-thinker-application) class. 

The Thinker class has methods which can run as parallel threads and share information with each other via class attributes, 
which always includes a ["resource counter"](https://colmena.readthedocs.io/en/latest/thinker.html#resource-counter) used to signal when resources are free.
We use the `ResourceCounter` in this example to only submit as many tasks as we have workers so that we submit tasks based on the most up-to-date guidance from machine learning.

We denote which functions in a Thinker are agents using decorators for the methods (e.g., `@agent`).
Each of the functions marked with these decorators will started as threads when `.run` or `.start` is called.

Colmena provides [many kinds of decorators for common types of agents](https://colmena.readthedocs.io/en/latest/thinker.html#special-purpose-agents). 
In this demo, we use three of them:

- `task_submitter` runs when resources are available.
- `result_processor` runs when a certain topic of task completes
- `event_responder` runs when an [`Event`](https://docs.python.org/3/library/threading.html#event-objects) is set

A simple example for a Thinker is one that submits a new calculation from a list when another completes.

In [14]:
class RandomThinker(BaseThinker):
    """A thinker which evaluates molecules in a random order."""
    
    def __init__(self, queues, n_to_evaluate: int, n_parallel: int, 
                 molecule_list: List[str]):
        """Initialize the thinker
        
        Args:
            queues: Client side of queues
            n_to_evaluate: Number of molecules to evaluate
            n_parallel: Number of computations to run in parallel
            molecule_list: List of SMILES strings
        """
        super().__init__(
            queues, 
            # Establishes pools of resources for each kind of task
            #  We'll only use the "simulation" pool
            ResourceCounter(n_parallel, ['simulate', 'train', 'infer'])
        )
        
        # Store the user settings
        self.molecule_list = set(molecule_list)
        self.n_to_evaluate = n_to_evaluate
        
        # Create a database of evaluated molecules
        self.database = dict()
        
        # Create a record of completed calculations
        self.simulation_results = []
        
        # Create a priority list of molecules, starting with them ordered randomly
        self.priority_list = list(self.molecule_list)
        shuffle(self.priority_list)
        self.priority_list_lock = Lock()  # Ensures two agents cannot use it 
        
        # Create a tracker for how many sent and how many complete
        self.rec_progbar = tqdm(total=n_to_evaluate, desc='started')
        self.sent_progbar = tqdm(total=n_to_evaluate, desc='successful')
        
        # Assign all of the resources over to simulation
        self.rec.reallocate(None, 'simulate', n_parallel)

    @task_submitter(task_type='simulate', n_slots=1)
    def submit_calc(self):
        """Submit a calculation when resources are available"""
        
        with self.priority_list_lock:
            next_mol = self.priority_list.pop()  # Get the next best molecule
        
        # Send it to the task server to run
        self.queues.send_inputs(next_mol, method='compute_vertical')
        self.rec_progbar.update(1)
        
    @result_processor
    def receive_calc(self, result: Result):
        """Store the output of a run if it is successful"""
        
        # Mark that the resources are now free
        self.rec.release('simulate', 1)
        
        # Store the result if successful
        if result.success:
            # Store the result in a database
            self.database[result.args[0]] = result.value
            
            # Mark that we've received a result
            self.sent_progbar.update(1)
            
            # If we've got all of the simulations complete, stop
            if len(self.database) >= self.n_to_evaluate:
                self.done.set()
            
        # Store the result object for later processing
        self.simulation_results.append(result)

We instantiate a copy of this thinker with the settings we want and then call `run` to start it working

In [15]:
random_thinker = RandomThinker(client_queues, search_count, n_workers, search_space['smiles'].values)
random_thinker.run()

started:   0%|          | 0/64 [00:00<?, ?it/s]

successful:   0%|          | 0/64 [00:00<?, ?it/s]

Watch how the thinker only starts new calculations after another one finishes. 
The ability to throttle will be important when we don't know which calculations to submit next until others have finished.

> The thinker will receive more than the requested number of calculations, as we stop submitting only after enough have completed and wait until all submitted tasks complete.

## Example 2: Batch optimization with slight overlap between simulation and ML
We implement the same batch optimization as the Parsl example but with a slight addition: we start the ML tasks *while the simulation are still running*.

The overlap between the simulation and ML tasks requires some conceptual changes in how we've approached workflows:
1. *Concurrency in workflow planning*. Our tasks which manage simulation and ML act in parallel, which we manage with separate Colmena agents.
1. *Multiplexed communication with task server*. The result queue now contains different types of task, which we separate with "topic" flags.
1. *Workflow events change behavior*. Our machine learning tasks are launched only after enough data are acquired, which we trigger using the [Python threading library](https://docs.python.org/3/library/threading.html).
    - Specifically, we use Events and use them to start agents using the [`event_responder` agent](https://colmena.readthedocs.io/en/latest/thinker.html#event-responder-agents)

The example class, `BatchedThinker`, in [`thinkers.py`](./thinkers.py)  shows one way realizing the desired policy.

> *WARNING*: We acknolwedge programming an object and debuging multi-threaded code is best done in an IDE. We present the completed result in a notebook for parismony, but recommend using a better programming environment for creating complex software.

In [16]:
from thinkers import BatchedThinker

In [17]:
output = widgets.Output()
display(output)
batched_thinker = BatchedThinker(
    queues=client_queues,
    n_to_evaluate=search_count,
    n_parallel=n_workers,
    initial_count=initial_count,
    batch_size=batch_size,
    molecule_list=search_space['smiles'].values,
    dashboard=output
)
batched_thinker.run()

Output()

started:   0%|          | 0/64 [00:00<?, ?it/s]

successful:   0%|          | 0/64 [00:00<?, ?it/s]

Starting training. Database size: 8...Training complete...Inference done. Elapsed time: 3.71s
Starting training. Database size: 12...Training complete...Inference done. Elapsed time: 3.23s
Starting training. Database size: 16...Training complete...Inference done. Elapsed time: 2.80s
Starting training. Database size: 20...Training complete...Inference done. Elapsed time: 2.45s
Starting training. Database size: 24...Training complete...Inference done. Elapsed time: 3.74s
Starting training. Database size: 28...Training complete...Inference done. Elapsed time: 3.34s
Starting training. Database size: 32...Training complete...Inference done. Elapsed time: 3.17s
Starting training. Database size: 36...Training complete...Inference done. Elapsed time: 3.17s
Starting training. Database size: 40...Training complete...Inference done. Elapsed time: 3.88s
Starting training. Database size: 44...Training complete...Inference done. Elapsed time: 3.32s
Starting training. Database size: 48...Training com

You can observe a few things about the policy from the notebook:
1. Task submission is paused while training is on-going, and happens rapidly after the inference is complete. 
1. The thinker continues to record successful simulations while training is still underway.

There is more detailed runtime information in the [`colmena.log`](./colmena.log). You can see the status messages as different agents start up and coordinate with each other.

## Wrap up Parsl
Once complete, we send a "kill" signal to shutdown the task server. The task server will clean up any computational resources being used, then exit.

In [18]:
client_queues.send_kill_signal()
task_server.join()
print(f'Process exited with {task_server.exitcode} code')

Process exited with 0 code


## Take-home Points
This example scratches the surface of the kind of steering policies you can write with Colmena.
Colmena is a new code so examples of them are still growing, but include:

1. Automatically re-allocating nodes between different tasks to maximize the effect of machine learning. ([paper](https://arxiv.org/pdf/2110.02827.pdf), [code](https://github.com/exalearn/electrolyte-design/tree/master/colmena/ip-single-fidelity), [data](https://doi.org/10.18126/bnfu-uk7f))
1. Coordinating tasks between multiple compute sites. (*Paper under review*, [code](https://github.com/exalearn/multi-site-campaigns))

## Save Results for Later
Lets save the run-traces for later analysis

In [19]:
with open('run-data/random-results.json', 'w') as fp:
    for result in random_thinker.simulation_results:
        print(result.json(), file=fp)

In [20]:
with open('run-data/batched-results.json', 'w') as fp:
    for result in batched_thinker.simulation_results:
        print(result.json(), file=fp)
    for result in batched_thinker.learning_results:
        # Write the learning results w/o the inputs and outputs
        #  because they are not JSON-serializable
        print(result.json(exclude={'inputs', 'value'}), file=fp)