-
Notifications
You must be signed in to change notification settings - Fork 0
/
workflow-client.ts
97 lines (73 loc) · 3.99 KB
/
workflow-client.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import { SWF } from 'aws-sdk';
import { Observable, Observer } from 'rxjs/Rx';
import {
ActivityPollParameters, DecisionTask, ActivityTask, DecisionPollParameters,
WorkflowStartParameters, Run, RegisterWorkflowTypeInput,
} from './aws.types';
import { injectable, inject } from 'inversify';
import { AWSClientProvider } from './aws-client-provider';
import { AWS_ADAPTER } from '../symbols';
import { RegisterActivityTypeInput, RespondActivityTaskCompletedInput, RespondActivityTaskFailedInput, RespondDecisionTaskCompletedInput } from 'aws-sdk/clients/swf';
import { LOGGER, Logger } from '../core/logging/logger';
export interface WorkflowClient {
pollForActivityTask(params: ActivityPollParameters): Observable<ActivityTask>;
pollForDecisionTask(params: DecisionPollParameters): Observable<DecisionTask>;
startWorkflow(params: WorkflowStartParameters): Observable<Run>;
registerWorkflowType(params: RegisterWorkflowTypeInput): Observable<RegisterWorkflowTypeInput>;
registerActivityType(params: RegisterActivityTypeInput): Observable<RegisterActivityTypeInput>;
respondDecisionTaskCompleted(params: RespondDecisionTaskCompletedInput): Observable<{}>;
respondActivityTaskCompleted(params: RespondActivityTaskCompletedInput): Observable<{}>;
respondActivityTaskFailed(params: RespondActivityTaskFailedInput): Observable<{}>;
}
@injectable()
export class BaseWorkflowClient implements WorkflowClient {
private swfClient: SWF;
constructor(@inject(AWS_ADAPTER) private awsAdapter: AWSClientProvider, @inject(LOGGER) private logger: Logger) {
this.swfClient = awsAdapter.getNativeSWFClient();
}
pollForActivityTask(params: ActivityPollParameters): Observable<ActivityTask> {
return this.fromSwfFunction<ActivityTask>(this.swfClient.pollForActivityTask.bind(this.swfClient), params);
}
pollForDecisionTask(params: DecisionPollParameters): Observable<DecisionTask> {
return this.fromSwfFunction<DecisionTask>(this.swfClient.pollForDecisionTask.bind(this.swfClient), params);
}
startWorkflow(params: WorkflowStartParameters): Observable<Run> {
return this.fromSwfFunction<Run>(this.swfClient.startWorkflowExecution.bind(this.swfClient), params);
}
registerWorkflowType(params: RegisterWorkflowTypeInput): Observable<RegisterWorkflowTypeInput> {
return this.fromSwfFunction<RegisterWorkflowTypeInput>(this.swfClient.registerWorkflowType.bind(this.swfClient), params);
}
registerActivityType(params: RegisterActivityTypeInput): Observable<RegisterActivityTypeInput> {
return this.fromSwfFunction<RegisterActivityTypeInput>(this.swfClient.registerActivityType.bind(this.swfClient), params);
}
respondDecisionTaskCompleted(params: RespondDecisionTaskCompletedInput): Observable<{}> {
return this.fromSwfFunction(this.swfClient.respondDecisionTaskCompleted.bind(this.swfClient), params);
}
respondActivityTaskCompleted(params: RespondActivityTaskCompletedInput): Observable<{}> {
return this.fromSwfFunction(this.swfClient.respondActivityTaskCompleted.bind(this.swfClient), params);
}
respondActivityTaskFailed(params: RespondActivityTaskFailedInput): Observable<{}> {
return this.fromSwfFunction(this.swfClient.respondActivityTaskFailed.bind(this.swfClient), params);
}
public fromSwfFunction<T>(fnc: <T>(params: any, cb: (error: any, data: T) => void) => any, params: any): Observable<T> {
return Observable.create((obs: Observer<T>) => {
this.logger.debug('AWS request: %s, with %j params in progress', fnc.name, params);
const handler = (error: any, data: T) => {
if (error) {
this.logger.debug('AWS request error: %s, error: %s', fnc.name, error);
obs.error(error);
} else {
this.logger.debug('AWS request completed: %s, with %j result', fnc.name, data);
obs.next(data);
obs.complete();
}
};
try {
fnc(params, handler);
} catch (e) {
this.logger.debug('Internal error "%s", error: "%s"', fnc.name, e);
obs.error(e);
}
});
}
}