A Simple Way of Creating Job Workflows in Go running in Processes, Containers, Tasks, Pods, or Jobs
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
.circleci Update config.yml Aug 5, 2018
ci better handling of failed tasks in job May 13, 2018
examples added job observer Aug 25, 2018
jstream added job observer Aug 25, 2018
test example of Docker builder pattern in wfl; MultiSync's signature is va… Aug 19, 2018
test_scripts example of Docker builder pattern in wfl; MultiSync's signature is va… Aug 19, 2018
vendor added support for Kubernetes batch jobs Aug 5, 2018
.gitignore example of Docker builder pattern in wfl; MultiSync's signature is va… Aug 19, 2018
Gopkg.lock resubmit and child proc issues Mar 10, 2018
Gopkg.toml resubmit and child proc issues Mar 10, 2018
LICENSE Update LICENSE Dec 20, 2017
README.md Update README.md Aug 19, 2018
context.go added job observer Aug 25, 2018
context_test.go added support for Kubernetes batch jobs Aug 5, 2018
job.go added job observer Aug 25, 2018
job_observer.go added job observer Aug 25, 2018
job_observer_test.go added job observer Aug 25, 2018
job_test.go added job observer Aug 25, 2018
notifier.go Docker example / err and outpath must not be identical anymore Dec 27, 2017
notifier_test.go added notifier; added configuration for context Dec 6, 2017
template.go better handling of failed tasks in job May 13, 2018
template_iterators.go better handling of failed tasks in job May 13, 2018
template_iterators_test.go iterator test Dec 29, 2017
template_test.go Tee() Join() MultiSync() Merge() and iterator bug fixed Aug 15, 2018
vendor.sh Update vendor.sh Dec 29, 2017
wfl_suite_test.go go wfl Nov 18, 2017
workflow.go added job observer Aug 25, 2018
workflow_test.go go wfl Nov 18, 2017

README.md

wfl - A Simple and Pluggable Workflow Language for Go

Don't mix wfl with WFL.

CircleCI codecov

Creating process, container, pod, task, or job workflows based on raw interfaces of operating systems, Docker, Kubernetes, Cloud Foundry, and HPC job schedulers can be a tedios. Lots of repeating code is required. All workload management systems have a different API.

wfl abstracts away from the underlying details of the processes, containers, and workload management systems. wfl provides a simple, unified interface which allows to quickly define and execute a job workflow and change between different execution backends without changing the workflow itself.

wfl does not come with many features but is simple to use and enough to define and run jobs and job workflows with inter-job dependencies.

In its simplest form a process can be started and waited for:

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("convert", "image.jpg", "image.png").Wait()

Running a job as a Docker container requires a different context (and the image already pulled before).

    ctx := wfl.NewDockerContextByCfg(wfl.DockerConfig{DefaultDockerImage: "golang:latest"})
    wfl.NewWorkflow(ctx).Run("sleep", "60").Wait()

Starting a Docker container without a run command which exposes ports requires more configuration which can be provided by using a JobTemplate together with the RunT() method.

    jt := drmaa2interface.JobTemplate{
        JobCategory: "swaggerapi/swagger-editor",
    }
    jt.ExtensionList = map[string]string{"exposedPorts": "80:8080/tcp"}
    
    wfl.NewJob(wfl.NewWorkflow(wfl.NewDockerContext())).RunT(jt).Wait()

Starting a Kubernetes batch job and waiting for its end is not much different.

    wfl.NewWorkflow(wfl.NewKubernetesContext()).Run("sleep", "60").Wait()

wfl aims to work for any kind of workload. It works on a Mac and Raspberry Pi the same way as on a high-performance compute cluster. Things missing: On small scale you probably miss data management - moving results from one job to another. That's deliberately not implemented. On large scale you are missing checkpoint and restart functionality or HA of the workflow process itself.

wfl works with simple primitives: context, workflow, job, and jobtemplate

Jobs can also be processed in streams.

Getting Started

Dependencies of wfl (like drmaa2) are vendored in. The only external package required to be installed manually is the drmaa2interface.

    go get github.com/dgruber/drmaa2interface

Context

A context defines the execution backend for the workflow. Contexts can be easily created with the New functions which are defined in the context.go file.

For creating a context which executes the jobs of a workflow in operating system processses use:

    wfl.NewProcessContext()

If the workflow needs to be executed in containers the DockerContext can be used:

    wfl.NewDockerContext()

If the Docker context needs to be configured with a default Docker image (when Run() is used or RunT() without a configured JobCategory (which is the Docker image)) then the ContextByCfg() can be called.

    wfl.NewDockerContextByCfg(wfl.DockerConfig{DefaultDockerImage: "golang:latest"})

When you want to run the workflow as Cloud Foundry tasks the CloudFoundryContext can be used:

    wfl.NewCloudFoundryContext()

Without a config it uses following environment variables to access the Cloud Foundry cloud controller API:

For submitting Kubernetes batch jobs a Kubernetes context exists.

   ctx := wfl.NewKubernetesContext()

Note that each job requires a container image specified which can be done by using the JobTemplate's JobCategory. When the same container image is used within the whole job workflow it makes sense to use the Kubernetes config.

   ctx := wfl.NewKubernetesContextByCfg(wfl.KubernetesConfig{DefaultImage: "busybox:latest"})

Contexts for other container engines or workload managers like DRMAA compatible HPC schedulers, etc. will be supported when the DRMAA2 job tracker implementation is available.

Workflow

A workflow encapsulates a set of jobs using the same backend (context). Depending on the execution backend it can be seen as a namespace.

It can be created by using:

    wf := wfl.NewWorkflow(ctx)

Errors during creation can be catched with

    wf := wfl.NewWorkflow(ctx).OnError(func(e error) {panic(e)})

or with

    if wf.HasError() {
        panic(wf.Error())
    }

Job

Jobs are the main objects in wfl. A job defines helper methods. Many of them return the job object itself to allow chaining calls in an easy way. A job can also be seen as a container and control unit for tasks.

Methods can be classified in blocking, non-blocking, job template based, function based, and error handlers.

Job submission:

  • Run() -> Starts a process, container, or submits a task and comes back immediately
  • RunT() -> Like above but with a JobTemplate as parameter
  • Resubmit() -> Run().Run().Run()...
  • RunEvery() -> Submits a task every d time.Duration
  • RunEveryT()

Job control:

  • Suspend() -> Stops a task from execution (e.g. sending SIGTSTP to the process)...
  • Resume() -> Continues a task (e.g. sending SIGCONT)...
  • Kill()

Function execution:

  • Do() -> Executes a function
  • Then() -> Waits for end of process and executes function
  • OnSuccess() -> Executes a function if the task run successfully (exit code 0)
  • OnFailure() -> Executes a function if the task failed (exit code != 0)
  • OnError() -> Executes a function if the task could not be created

Blocker:

  • After()
  • Wait()
  • Synchronize()

Job flow control:

  • ThenRun() // wait() + run()
  • ThenRunT()
  • OnSuccessRun() // wait() + success() + run()
  • OnFailureRun()
  • Retry() // wait() + !success() + resubmit() + wait() + !success() ...
  • AnyFailed() // checks if one of the tasks in the job failed

Job status and general checks:

  • JobID() -> Returns the ID of the submitted job.
  • JobInfo() -> Returns the DRMAA2 JobInfo of the job.
  • Template()
  • State()
  • LastError()
  • Failed()
  • Success()
  • ExitStatus()

JobTemplate

JobTemplates are specifying the details about a job. In the simplest case the job is specified by the application name and its arguments like it is typically done in the OS shell. In that case the Run() methods (ThenRun(), OnSuccessRun(), OnFailureRun()) can be used. If more details for specifying the jobs are required the RunT() methods needs to be used. I'm using currently the DRMAA2 Go JobTemplate as parameters for them. In most cases only RemoteCommand, Args, WorkingDirectory, JobCategory, JobEnvironment, StageInFiles are evaluated. Functionality and semantic is up to the underlying drmaa2os job tracker.

The Template object provides helper functions for job templates and required as generators of job streams. For an example see here.

Examples

For examples please have a look into the examples directory. template is a canonical example of a pre-processing job, followed by parallel execution, followed by a post-processing job.

test is an use case for testing. It compiles all examples with the local go compiler and then within a Docker container using the golang:latest image and reports errors.

cloudfoundry demonstrates how a Cloud Foundry taks can be created.

Creating a Workflow which is Executed as OS Processes

The allocated context defines which workload management system / job execution backend is used.

    ctx := wfl.NewProcessContext()

Different contexts can be used within a single program. That way multi-clustering potentially over different cloud solutions is supported.

Using a context a workflow can be established.

    wfl.NewWorkflow(wfl.NewProcessContext())

Handling an error during workflow generation can be done by specifying a function which is only called in the case of an error.

    wfl.NewWorkflow(wfl.NewProcessContext()).OnError(func(e error) {
		panic(e)
	})

The workflow is used in order to instantiate the first job using the Run() method.

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("sleep", "123")

But you can also create an initial job like that:

    job := wfl.NewJob(wfl.NewWorkflow(wfl.NewProcessContext()))

For more detailed settings (like resource limits) the DRMAA2 job template can be used as parameter for RunT().

Jobs allow the execution of workload as well as expressing dependencies.

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("sleep", "2").ThenRun("sleep", "1").Wait()

The line above executes two OS processes sequentially and waits until the last job in chain is finished.

In the following example the two sleep processes are executed in parallel. Wait() only waitf for the sleep 1 job. Hence sleep 2 still runs after the wait call comes back.

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("sleep", "2").Run("sleep", "1").Wait()

Running two jobs in parallel and waiting until all jobs finished can be done Synchronize().

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("sleep", "2").Run("sleep", "1").Synchronize()

Jobs can also be suspended (stopped) and resumed (continued) - if supported by the execution backend (like OS, Docker).

    wf.Run("sleep", "1").After(time.Millisecond * 100).Suspend().After(time.Millisecond * 100).Resume().Wait()

The exit status is available as well. ExitStatus() blocks until the previously submitted job is finished.

    wfl.NewWorkflow(ctx).Run("echo", "hello").ExitStatus()

In order to run jobs depending on the exit status the OnFailure and OnSuccess methods can be used:

    wf.Run("false").OnFailureRun("true").OnSuccessRun("false")

For executing a function on a submission error OnError() can be used.

More methods can be found in the sources.

For missing functionality or bugs please open an issue on github. Contributions welcome!