In [None]:
from dflow import upload_artifact, Workflow, Step
from dflow.python import PythonOPTemplate, OP, OPIO, OPIOSign, Artifact
from dflow.plugins.dispatcher import DispatcherExecutor
from pathlib import Path

**Duplicate OP**

This OP duplicate input message and content of input file by a specified number of times as output message and output file, respectively.
- input:
    - "msg": the input message
    - "num": the number of times
    - "foo": the input file
- output:
    - "msg": the output message
    - "bar": the output file

In [None]:
class Duplicate(OP):
    def __init__(self):
        pass

    @classmethod
    def get_input_sign(cls):
        return OPIOSign({
            "msg": str,
            "num": int,
            "foo": Artifact(Path),
        })

    @classmethod
    def get_output_sign(cls):
        return OPIOSign({
            "msg": str,
            "bar": Artifact(Path),
        })

    @OP.exec_sign_check
    def execute(
            self,
            op_in: OPIO,
    ) -> OPIO:
        with open(op_in["foo"], "r") as f:
            content = f.read()
        with open("bar.txt", "w") as f:
            f.write(content * op_in["num"])

        op_out = OPIO({
            "msg": op_in["msg"] * op_in["num"],
            "bar": Path("bar.txt"),
        })
        return op_out

[DPDispatcher](https://github.com/deepmodeling/dpdispatcher) is a python package used to generate HPC scheduler systems (Slurm/PBS/LSF) or Bohrium jobs input scripts and submit these scripts and poke until they finish. Dflow provides `DispatcherExecutor` plugin to invoke dispatcher as executor to complete a certain step

For SSH authentication, one can use password, specify path of private key file locally, or upload authorized private key to each node (or equivalently add each node to the authorized host list). In this example, password is used.

For configuring extra machine, resources or task parameters for dispatcher, use `DispatcherExecutor(..., machine_dict=m, resources_dict=r, task_dict=t)`.

In [None]:
dispatcher_executor = DispatcherExecutor(
    host="your-cluster-address",
    username="your-login-username",
    password="your-login-password",
)

with open("foo.txt", "w") as f:
    f.write("Hello world!")

step = Step(
    "duplicate",
    PythonOPTemplate(Duplicate),
    parameters={"msg": "Hello", "num": 2},
    artifacts={"foo": upload_artifact("foo.txt")},
    executor=dispatcher_executor,
)

wf = Workflow(name="slurm")
wf.add(step)
wf.submit()