## How to create a custom executor

<font size="3">

_Executors_ define the low-level directions for the computation. They can specify different capabilities, _eg_, different hardware, different computation strategy, different logic, or simply different goals.

Executors are plugins; any executor-plugins which are found are imported as classes in the covalent.executor name-space.
See the how-to on [choosing an executor to be used in an electron](choosing_executors.ipynb) for details on simply choosing an executor.
    
You can write your own executor to execute Covalent electrons in any way you like, using particular environments, cloud resources, or hardware.
</font>

<font size="3">

Start by cloning the [executor template](https://github.com/AgnostiqHQ/covalent-executor-template).
</font>

In [1]:
!git clone https://github.com/AgnostiqHQ/covalent-executor-template

Cloning into 'covalent-executor-template'...
remote: Enumerating objects: 75, done.[K
remote: Counting objects: 100% (30/30), done.[K
remote: Compressing objects: 100% (16/16), done.[K
remote: Total 75 (delta 24), reused 14 (delta 14), pack-reused 45[K
Receiving objects: 100% (75/75), 49.23 KiB | 6.15 MiB/s, done.
Resolving deltas: 100% (34/34), done.


<font size="3">

Rename `covalent_executor_template` and `custom.py`
</font>

In [13]:
import os
os.chdir('covalent-executor-template')
!mv covalent_executor_template covalent_mycustom_plugin
!mv covalent_mycustom_plugin/custom.py covalent_mycustom_plugin/mycustom.py

<font size="3">

In `setup.py`, modify `plugins_list` and `setup_info` to reflect the name of your executor plugin.
</font>

In [18]:
import site
import sys

from setuptools import find_packages, setup

with open("VERSION") as f:
    version = f.read().strip()
    
with open("requirements.txt") as f:
    required = f.read().splitlines()

# Modify this to be the name of your plugin file. Here, "covalent_executor_template"
# is the name of the directory the plugin is in. "custom" is name of the module.
plugins_list = ["mycustom = covalent_mycustom_plugin.mycustom"]

setup_info = {
    # Your plugin should use the naming convention 'covalent-abcdef-plugin'
    "name": "covalent-mycustom-plugin",
    "packages": find_packages("."),
    "version": version,
    # Modify any contact information as you see fit
    "maintainer": "Agnostiq",
    "url": "https://github.com/AgnostiqHQ/covalent-executor-template",
    "download_url": f"https://github.com/AgnostiqHQ/covalent-executor-template/archive/v{version}.tar.gz",
    "license": "GNU Affero GPL v3.0",
    "author": "Agnostiq",
    "author_email": "support@agnostiq.ai",
    "description": "Covalent Custom Executor Plugin",
    "long_description": open("README.md").read(),
    "long_description_content_type": "text/markdown",
    "include_package_data": True,
    "install_requires": required,
    "classifiers": [
        "Development Status :: 3 - Alpha",
        "Environment :: Console",
        "Environment :: Plugins",
        "Intended Audience :: Developers",
        "Intended Audience :: Education",
        "Intended Audience :: Science/Research",
        "License :: Other/Proprietary License",
        "Natural Language :: English",
        "Operating System :: MacOS",
        "Operating System :: POSIX :: Linux",
        "Programming Language :: Python :: 3",
        "Programming Language :: Python :: 3 :: Only",
        "Programming Language :: Python :: 3.8",
        "Programming Language :: Python :: 3.9",
        "Topic :: Adaptive Technologies",
        "Topic :: Scientific/Engineering",
        "Topic :: Scientific/Engineering :: Interface Engine/Protocol Translator",
        "Topic :: Software Development",
        "Topic :: System :: Distributed Computing",
    ],
    "entry_points": {
        "covalent.executor.executor_plugins": plugins_list,
    },
}


Now you can write the execution logic in `mycustom.py`. You'll need to update `_EXECUTOR_PLUGIN_DEFAULTS`, `executor_plugin_name`, `CustomExecutor`, and `execute()`. As an example, here we'll add logic to execute electrons in Docker containers. 

Here is a docker image that we will use later to launch containers and run electrons.
```docker
# Dockerfile
FROM python:3.8-buster

RUN pip install cloudpickle && \
    pip install covalent

CMD ["/bin/bash"]
```

In this Dockerfile, we install the bare minimum dependencies required to execute electrons. Next we build an image `docker-executor-demo:latest` from the Dockerfile template to be used in our workflows.

```shell
docker build -f Dockerfile --tag docker-executor-demo:latest .
```

In [21]:
_EXECUTOR_PLUGIN_DEFAULTS = {
    "image": "python:3.8",   # Default docker image to be used if not specified
    "workdir": ".", # Path to the working directory on disk where files generated during execution will be stored. Defaults to current working directory
    "options": {}    # Any options supported by the docker run CLI
}

In [22]:
executor_plugin_name = 'DockerExecutor'

The `executor_plugin_name` must match the subclass name.

In [27]:
from covalent.executor import BaseExecutor
from typing import Any, Dict, List, Tuple
class DockerExecutor(BaseExecutor):
    """Docker executor plugin class
    
    Args:
        :param str image: Name of the docker image to be used for running electrons.
        :param str workdir: Path on the disk where files generated during execution will be created/stored.
        :param dict options: Python dictionary of keyword arguments of the different options supported by the `docker run` CLI.
    """

    def __init__(self,
        image: str,
        workdir: str,
        options: Dict = {},
        container_workdir: str = "/tmp/covalent",  # Workdir inside the container
        *args,
        **kwargs
    ):

        self.image = image
        self.wordir = workdir
        self.options = options
        self._container_workdir = container_workdir

        super().__init__(*args, **kwargs)

The command to be executed can be rendered as a multiline string. To this end, we create a method `_format_exec_command` in our `DockerExecutor` to facilitate generation of the `exec` command

```python
def _format_exec_command(self, func_filename: str, result_filename: str) -> str:
    """
    Create a python string that can be used to execute the task inside the container

    Args:
        :param str func_filename: Name of the pickle file from which to read the function to be executed.
        :param str result_filename: Name of the pickle file into which the serialize and write the task result

    Returns:
        :param str script: Python string that can be parsed to execute the function
    """
    return "\n".join(
        [
            "import cloudpickle as pickle",
            "",
            "result = None",
            "exception = None",
            "",
            "with open('{func_filename}', 'rb') as f:",
            "   function, args, kwargs = pickle.load(f)",
            "   try:",
            "       result = function(*args, **kwargs)",
            "   except Exception as e:",
            "       exception = e",
            "   finally:",
            "       with open('{result_filename}', 'wb') as f_out:",
            "           pickle.dump((result, exception), f_out)",
            "",
        ]
    ).format(
        func_filename = os.path.join(self._container_workdir, func_filename),
        result_filename = os.path.join(self._container_workdir, result_filename)
    )
```


With the `_format_exec_command` helper method defined, we can now implement the executor's `execute` method that contains the core logic

```python
def execute(self,
    function: TransportableObject,
    args: List,
    kwargs: List,
    call_before: List,
    call_after: List,
    dispatch_id: str,
    results_dir: str,
    node_id: int - 1
):
    """
        Execute the callable inside the respective container and return the result

        Args:
            :param Callable function: The input python function to be executed inside the container whose result is to be returned back
            :param List args: List of positional arguments required by the function
            :param Dict kwargs: Dictionary of keyword arguments required by the function
            :param List call_before: List of callables that will be invoked before the actual function execution
            :param List  call_after: List of callables to be invoked after the function has finished execution
            :param str dispatch_id: The unique identifier of the parent workflow that this electron is a part of
            :param str results_dir: The location where the results from the function execution ought to be saved. This will be mounted inside the container
            :param int node_id: ID of the node in the transport graph which is using this executor
    """

    # set the necessary variables such as filenames to be used as part of the execution.
    # Use the dispatch_id and the node_id in order to uniquely identify each file/container being
    # created during execution
    dispatch_info = DispatchInfo(dispatch_id=dispatch_id)
    result_filename = f"result-{dispatch_id}-{node_id}.pkl"
    container_name = f"container-{dispatch_id}-{node_id}"
    execute_script_name = f"dkrexec-{dispatch_id}-{node_id}.py"

    with self.get_dispatch_context(dispatch_info), tempfile.NamedTemporaryFile(
            dir=self.workdir) as f, tempfile.NamedTemporaryFile(dir=self.workdir, mode="w") as g:

            # Wrap the `function` between the call_before and call_after hooks
            new_args = [function, call_before, call_after]
            for arg in args:
                new_args.append(arg)

            pickle.dump((wrapper_fn, new_args, kwargs), f)
            f.flush()

            # Format the command to be executed inside the container
            func_filename = f"func-{dispatch_id}-{node_id}.pkl"
            shutil.copy(f.name, os.path.join(self.workdir, func_filename))

            cmd = self.format_exec_command(func_filename, result_filename)
            g.write(cmd)
            g.flush()
            shutil.copy(g.name, os.path.join(self.workdir, execute_script_name))

            # Start the container in detached mode
            subprocess.run(
                [
                    "docker",
                    "container",
                    "run",
                    "-dit",
                     "--rm",
                    "--mount",
                    f"type=bind,source={self.workdir},target={self._container_workdir}",
                    "--name",
                    container_name,
                    self.image
                ],
                check=True,
                capture_output=True,
            )

            # Execute the script/command inside the container
            subprocess.run(
                [
                    "docker",
                    "container",
                    "exec",
                    container_name,
                    "python",
                    f"{self._container_workdir}/{execute_script_name}"
                ],
                check=True,
                capture_output=True
            )

            # Assert that a result object was created
            assert os.path.exists(os.path.join(self.workdir, result_filename))

            # Read the generated result object from the result pickle file
            with open(os.path.join(self.workdir, result_filename), "rb") as f_read:
                result, exception = pickle.load(f_read)
                if exception:
                    raise exception

            # Terminate the container
            subprocess.run(
                [
                    "docker",
                    "container",
                    "stop",
                    container_name
                ],
                check=True,
                capture_output=True
            )

            # Return
            return result, exception, None
```

For convenience, here is the entire executor as part of a single executable block

```python
import os
import copy
import shutil
import cloudpickle as pickle
import tempfile
import subprocess
from pathlib import Path
from typing import List, Dict

from covalent.executor import BaseExecutor, wrapper_fn
from covalent._shared_files.util_classes import DispatchInfo
from covalent._shared_files import logger
from covalent._workflow.transport import TransportableObject

app_log = logger.app_log
log_stack_info = logger.log_stack_info

_EXECUTOR_PLUGIN_DEFAULTS = {
    "image": "python:3.8"   # Default docker image to be used if not specified
    "workdir": "." # Path to the working directory on disk where files generated during execution will be        stored. Defaults to current working directory
    "options": {}    # Any options supported by the docker run CLI
}

executor_plugin_name = 'DockerExecutor'

class DockerExecutor(BaseExecutor):
    """Docker executor plugin class
    
    Args:
        :param str image: Name of the docker image to be used for running electrons.
        :param str workdir: Path on the disk where files generated during execution will be created/stored.
        :param dict options: Python dictionary of keyword arguments of the different options supported by the `docker run` CLI.
    """

    def __init__(self,
        image: str,
        workdir: str,
        options: Dict = {}
        container_workdir: str = "/tmp/covalent",  # Workdir inside the container
        *args,
        **kwargs
    ):

        self.image = image
        self.wordir = workdir
        self.options = options
        self._container_workdir = container_workdir

        super().__init__(*args, **kwargs)

    def _format_exec_command(self, func_filename: str, result_filename: str) -> str:
    """
    Create a python string that can be used to execute the task inside the container

    Args:
        :param str func_filename: Name of the pickle file from which to read the function to be executed.
        :param str result_filename: Name of the pickle file into which the serialize and write the task result

    Returns:
        :param str script: Python string that can be parsed to execute the function
    """
        return "\n".join(
            [
                "import cloudpickle as pickle",
                "",
                "result = None",
                "exception = None",
                "",
                "with open('{func_filename}', 'rb') as f:",
                "   function, args, kwargs = pickle.load(f)",
                "   try:",
                "       result = function(*args, **kwargs)",
                "   except Exception as e:",
                "       exception = e",
                "   finally:",
                "       with open('{result_filename}', 'wb') as f_out:",
                "           pickle.dump((result, exception), f_out)",
                "",
            ]
        ).format(
            func_filename = os.path.join(self._container_workdir, func_filename),
            result_filename = os.path.join(self._container_workdir, result_filename)
        )


    def execute(self,
        function: TransportableObject,
        args: List,
        kwargs: List,
        call_before: List,
        call_after: List,
        dispatch_id: str,
        results_dir: str,
        node_id: int - 1
    ):
        """
            Execute the callable inside the respective container and return the result
    
            Args:
                :param Callable function: The input python function to be executed inside the container whose result is to be returned back
                :param List args: List of positional arguments required by the function
                :param Dict kwargs: Dictionary of keyword arguments required by the function
                :param List call_before: List of callables that will be invoked before the actual function execution
                :param List  call_after: List of callables to be invoked after the function has finished execution
                :param str dispatch_id: The unique identifier of the parent workflow that this electron is a part of
                :param str results_dir: The location where the results from the function execution ought to be saved. This will be mounted inside the container
                :param int node_id: ID of the node in the transport graph which is using this executor
        """
    
        # set the necessary variables such as filenames to be used as part of the execution.
        # Use the dispatch_id and the node_id in order to uniquely identify each file/container being
        # created during execution
        dispatch_info = DispatchInfo(dispatch_id=dispatch_id)
        result_filename = f"result-{dispatch_id}-{node_id}.pkl"
        container_name = f"container-{dispatch_id}-{node_id}"
        execute_script_name = f"dkrexec-{dispatch_id}-{node_id}.py"
    
        with self.get_dispatch_context(dispatch_info), tempfile.NamedTemporaryFile(
                dir=self.workdir) as f, tempfile.NamedTemporaryFile(dir=self.workdir, mode="w") as g:
    
                # Wrap the `function` between the call_before and call_after hooks
                new_args = [function, call_before, call_after]
                for arg in args:
                    new_args.append(arg)
    
                pickle.dump((wrapper_fn, new_args, kwargs), f)
                f.flush()
    
                # Format the command to be executed inside the container
                func_filename = f"func-{dispatch_id}-{node_id}.pkl"
                shutil.copy(f.name, os.path.join(self.workdir, func_filename))
    
                cmd = self.format_exec_command(func_filename, result_filename)
                g.write(cmd)
                g.flush()
                shutil.copy(g.name, os.path.join(self.workdir, execute_script_name))
    
                # Start the container in detached mode
                subprocess.run(
                    [
                        "docker",
                        "container",
                        "run",
                        "-dit",
                         "--rm",
                        "--mount",
                        f"type=bind,source={self.workdir},target={self._container_workdir}",
                        "--name",
                        container_name,
                        self.image
                    ],
                    check=True,
                    capture_output=True,
                )
    
                # Execute the script/command inside the container
                subprocess.run(
                    [
                        "docker",
                        "container",
                        "exec",
                        container_name,
                        "python",
                        f"{self._container_workdir}/{execute_script_name}"
                    ],
                    check=True,
                    capture_output=True
                )
    
                # Assert that a result object was created
                assert os.path.exists(os.path.join(self.workdir, result_filename))
    
                # Read the generated result object from the result pickle file
                with open(os.path.join(self.workdir, result_filename), "rb") as f_read:
                    result, exception = pickle.load(f_read)
                    if exception:
                        raise exception
    
                # Terminate the container
                subprocess.run(
                    [
                        "docker",
                        "container",
                        "stop",
                        container_name
                    ],
                    check=True,
                    capture_output=True
                )
    
        # Return
        return result, exception, None
```

This executor can now be used as part of workflows to execute electons inside Docker containers after being properly imported. Following is a quick example

```python
import os
import covalent as ct
from covalent.executor import DockerExecutor

docker = DockerExecutor(image='docker-executor-demo:latest', workdir=os.getcwd())

@ct.electron(executor=docker)
def add(x, y):
    return x + y

@ct.electron(executor=docker)
def multiply(x, y):
    return x*y


@ct.lattice
def workflow(x, y):
    r1 = add(x, y)
    return multiply(y, r1)

result = ct.dispatch_sync(workflow)(2, 3)
print(result)


Lattice Result
==============
status: COMPLETED
result: 15
inputs: {'args': [2, 3], 'kwargs': {}}
error: None

start_time: 2022-07-18 16:41:48.106224+00:00
end_time: 2022-07-18 16:41:55.053115+00:00

results_dir: results
dispatch_id: 865fb0fd-7be3-4d15-a24d-97e5327b4253

Node Outputs
------------
add(0): 5
:parameter:2(1): 2
:parameter:3(2): 3
multiply(3): 15
:parameter:3(4): 3
```