Skip to content

Commit

Permalink
Merge pull request #17 from danielgerlag/parallel
Browse files Browse the repository at this point in the history
Parallel sequences
  • Loading branch information
danielgerlag committed Mar 25, 2018
2 parents c462c7b + 8db6060 commit 63faf7e
Show file tree
Hide file tree
Showing 16 changed files with 433 additions and 8 deletions.
2 changes: 1 addition & 1 deletion core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "workflow-es",
"version": "2.1.0",
"version": "2.2.0",
"description": "A lightweight workflow engine for Node.js",
"main": "./build/src/index.js",
"typings": "./build/src/index.d.ts",
Expand Down
110 changes: 110 additions & 0 deletions core/spec/scenarios/parallel.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { WorkflowHost, WorkflowBuilder, WorkflowStatus, WorkflowBase, StepBody, StepExecutionContext, ExecutionResult, WorkflowInstance, configureWorkflow, ConsoleLogger } from "../../src";
import { MemoryPersistenceProvider } from "../../src/services/memory-persistence-provider";
import { spinWaitCallback, spinWait } from "../helpers/spin-wait";

describe("parallel sequences", () => {

let workflowScope = {
step0Ticker: 0,
step1Ticker: 0,
step2Ticker: 0,
step3Ticker: 0
}

class Step0 extends StepBody {
public run(context: StepExecutionContext): Promise<ExecutionResult> {
workflowScope.step0Ticker++;
return ExecutionResult.next();
}
}

class Step1 extends StepBody {
public run(context: StepExecutionContext): Promise<ExecutionResult> {
workflowScope.step1Ticker++;
return ExecutionResult.next();
}
}

class Step2 extends StepBody {
public run(context: StepExecutionContext): Promise<ExecutionResult> {
workflowScope.step2Ticker++;
return ExecutionResult.next();
}
}

class Step3 extends StepBody {
public run(context: StepExecutionContext): Promise<ExecutionResult> {
workflowScope.step3Ticker++;
return ExecutionResult.next();
}
}

class Parallel_Workflow implements WorkflowBase<any> {
public id: string = "parallel-workflow";
public version: number = 1;

public build(builder: WorkflowBuilder<any>) {
builder
.startWith(Step0)
.parallel()
.do(branch1 => branch1
.startWith(Step1)
.waitFor("my-event", data => "0")
)
.do(branch2 => branch2
.startWith(Step2)
)
.join()
.then(Step3);
}
}

let workflowId = null;
let instance = null;
let persistence = new MemoryPersistenceProvider();
let config = configureWorkflow();
config.useLogger(new ConsoleLogger());
config.usePersistence(persistence);
let host = config.getHost();
jasmine.DEFAULT_TIMEOUT_INTERVAL = 20000;

beforeAll(async (done) => {
host.registerWorkflow(Parallel_Workflow);
await host.start();

workflowId = await host.startWorkflow("parallel-workflow", 1, null);

await spinWait(async () => {
let subs = await persistence.getSubscriptions("my-event", "0", new Date());
return (subs.length > 0);
});

expect(workflowScope.step0Ticker).toBe(1);
expect(workflowScope.step1Ticker).toBe(1);
expect(workflowScope.step2Ticker).toBe(1);
expect(workflowScope.step3Ticker).toBe(0);

await host.publishEvent("my-event", "0", "Pass", new Date());

spinWaitCallback(async () => {
instance = await persistence.getWorkflowInstance(workflowId);
return (instance.status != WorkflowStatus.Runnable);
}, done);
});

afterAll(() => {
host.stop();
});

it("should be marked as complete", function() {
expect(instance.status).toBe(WorkflowStatus.Complete);
});

it("should have taken correct execution path", function() {
expect(workflowScope.step0Ticker).toBe(1);
expect(workflowScope.step1Ticker).toBe(1);
expect(workflowScope.step2Ticker).toBe(1);
expect(workflowScope.step3Ticker).toBe(1);
});

});
1 change: 1 addition & 0 deletions core/src/fluent-builders.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from "./fluent-builders/workflow-builder";
export * from "./fluent-builders/step-builder";
export * from "./fluent-builders/parallel-step-builder";
29 changes: 29 additions & 0 deletions core/src/fluent-builders/parallel-step-builder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { StepBody, InlineStepBody } from "../abstractions";
import { WorkflowDefinition, WorkflowStepBase, WorkflowStep, StepOutcome, StepExecutionContext, ExecutionResult, WorkflowErrorHandling } from "../models";
import { WorkflowBuilder } from "./workflow-builder";
import { StepBuilder } from "./step-builder";
import { Sequence } from "../primitives";

export class ParallelStepBuilder<TData, TStepBody extends StepBody> {

private workflowBuilder: WorkflowBuilder<TData>;
private referenceBuilder: StepBuilder<Sequence, TData>;
private step: WorkflowStep<TStepBody>;

constructor(workflowBuilder: WorkflowBuilder<TData>, step: WorkflowStep<TStepBody>, refBuilder: StepBuilder<Sequence, TData>) {
this.workflowBuilder = workflowBuilder;
this.step = step;
this.referenceBuilder = refBuilder;
}

public do(builder: (then: WorkflowBuilder<TData>) => void): ParallelStepBuilder<TData, TStepBody> {
let lastStep = this.workflowBuilder.lastStep();
builder(this.workflowBuilder);
this.step.children.push(lastStep + 1); //TODO: make more elegant
return this;
}

public join(): StepBuilder<Sequence, TData> {
return this.referenceBuilder;
}
}
1 change: 0 additions & 1 deletion core/src/fluent-builders/return-step-builder.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { StepBody, InlineStepBody } from "../abstractions";
import { WorkflowDefinition, WorkflowStepBase, WorkflowStep, StepOutcome, StepExecutionContext, ExecutionResult, WorkflowErrorHandling } from "../models";
import { WaitFor, Foreach, While, If, Delay, Schedule } from "../primitives";
import { WorkflowBuilder } from "./workflow-builder";
import { StepBuilder } from "./step-builder";

Expand Down
16 changes: 15 additions & 1 deletion core/src/fluent-builders/step-builder.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { StepBody, InlineStepBody } from "../abstractions";
import { WorkflowDefinition, WorkflowStepBase, WorkflowStep, StepOutcome, StepExecutionContext, ExecutionResult, WorkflowErrorHandling } from "../models";
import { WaitFor, Foreach, While, If, Delay, Schedule } from "../primitives";
import { WaitFor, Foreach, While, If, Delay, Schedule, Sequence } from "../primitives";
import { WorkflowBuilder } from "./workflow-builder";
import { ReturnStepBuilder } from "./return-step-builder";
import { OutcomeBuilder } from "./outcome-builder";
import { ParallelStepBuilder } from "./parallel-step-builder";

export class StepBuilder<TStepBody extends StepBody, TData> {

Expand Down Expand Up @@ -179,6 +180,19 @@ export class StepBuilder<TStepBody extends StepBody, TData> {
return stepBuilder;
}

public parallel(): ParallelStepBuilder<TData, Sequence> {
var newStep = new WorkflowStep<Sequence>();
newStep.body = Sequence;
this.workflowBuilder.addStep(newStep);
var newBuilder = new StepBuilder<Sequence, TData>(this.workflowBuilder, newStep);
let stepBuilder = new ParallelStepBuilder<TData, Sequence>(this.workflowBuilder, newStep, newBuilder);
let outcome = new StepOutcome();
outcome.nextStep = newStep.id;
this.step.outcomes.push(outcome);

return stepBuilder;
}

public schedule(interval: (data :TData) => number): ReturnStepBuilder<TData, Schedule, TStepBody> {
let newStep = new WorkflowStep<Schedule>();
newStep.body = Schedule;
Expand Down
5 changes: 5 additions & 0 deletions core/src/fluent-builders/workflow-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ export class WorkflowBuilder<TData> {
public getUpstreamSteps(id: number): Array<WorkflowStepBase> {
return this.steps.filter(step => step.outcomes.filter(outcome => outcome.nextStep == id).length > 0);
}

public lastStep(): number {
let last = this.steps.reduce((prev, current) => prev.id > current.id ? prev : current);
return last.id;
}
}
3 changes: 2 additions & 1 deletion core/src/primitives.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export * from "./primitives/while";
export * from "./primitives/if";
export * from "./primitives/delay";
export * from "./primitives/schedule";
export * from "./primitives/waitFor";
export * from "./primitives/waitFor";
export * from "./primitives/sequence";
29 changes: 29 additions & 0 deletions core/src/primitives/sequence.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { ExecutionResult, StepExecutionContext, ContainerData } from "../models";
import { StepBody } from "../abstractions";
import { ContainerStepBody } from "./container-step-body";

export class Sequence extends ContainerStepBody {

public run(context: StepExecutionContext): Promise<ExecutionResult> {

if (!context.persistenceData) {
let containerData = new ContainerData();
containerData.childrenActive = true;
return ExecutionResult.branch([null], containerData);
}

if ((context.persistenceData as ContainerData).childrenActive) {
let complete: boolean = true;

for(let childId of context.pointer.children)
complete = complete && this.isBranchComplete(context.workflow.executionPointers, childId);

if (complete)
return ExecutionResult.next();
else
return ExecutionResult.persist(context.persistenceData);
}

return ExecutionResult.persist(context.persistenceData);
}
}
31 changes: 31 additions & 0 deletions es2017-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,37 @@ build(builder) {
}
```

#### Parallel Sequences

Run several sequences of steps in parallel

```javascript
class Parallel_Workflow {

build(builder) {
builder
.startWith(SayHello)
.parallel()
.do(branch1 => branch1
.startWith(DoSomething)
.then(WaitForSomething)
.then(DoSomethingElse)
)
.do(branch2 => branch2
.startWith(DoSomething)
.then(DoSomethingElse)
)
.do(branch3 => branch3
.startWith(DoSomething)
.then(DoSomethingElse)
)
.join()
.then(SayGoodbye);
}
}
```


### Host

The workflow host is the service responsible for executing workflows. It does this by polling the persistence provider for workflow instances that are ready to run, executes them and then passes them back to the persistence provider to by stored for the next time they are run. It is also responsible for publishing events to any workflows that may be waiting on one.
Expand Down
6 changes: 3 additions & 3 deletions release-notes/2.1.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Workflow ES 2.1

* Fixed typescript 2.4 issue
### Fixed typescript 2.4 issue

* Delay step
### Delay step

Put the workflow to sleep for a specifed number of milliseconds.

Expand All @@ -15,7 +15,7 @@ build(builder) {
}
```

* Schedule step
### Schedule step

Schedule a sequence of steps to execution asynchronously in the future.

Expand Down
32 changes: 32 additions & 0 deletions release-notes/2.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Workflow ES 2.2

### Parallel Sequences

Run several sequences of steps in parallel

```javascript
class Parallel_Workflow {

build(builder) {
builder
.startWith(SayHello)
.parallel()
.do(branch1 => branch1
.startWith(PrintMessage)
.input((step, data) => step.message = "Running in branch 1")
.delay(data => 5000)
.then(DoSomething)
)
.do(branch2 => branch2
.startWith(PrintMessage)
.input((step, data) => step.message = "Running in branch 2")
)
.do(branch3 => branch3
.startWith(PrintMessage)
.input((step, data) => step.message = "Running in branch 3")
)
.join()
.then(SayGoodbye);
}
}
```

0 comments on commit 63faf7e

Please sign in to comment.