/
ExpressRequestProcessor.ts
107 lines (95 loc) · 4.03 KB
/
ExpressRequestProcessor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import { Configuration } from "../../../configuration";
import { automationClientInstance } from "../../../globals";
import {
AutomationContextAware,
HandlerContext,
} from "../../../HandlerContext";
import { AutomationEventListener } from "../../../server/AutomationEventListener";
import { AutomationServer } from "../../../server/AutomationServer";
import { GraphClient } from "../../../spi/graph/GraphClient";
import { GraphClientFactory } from "../../../spi/graph/GraphClientFactory";
import {
Destination,
MessageClient,
MessageOptions,
RequiredMessageOptions,
} from "../../../spi/message/MessageClient";
import { AbstractRequestProcessor } from "../AbstractRequestProcessor";
import {
CommandIncoming,
EventIncoming,
isCommandIncoming,
isEventIncoming,
workspaceId,
} from "../RequestProcessor";
import { WebSocketLifecycle } from "../websocket/WebSocketLifecycle";
import {
WebSocketCommandMessageClient,
WebSocketEventMessageClient,
} from "../websocket/WebSocketMessageClient";
/**
* RequestProcessor implementation used by the Express infrastructure to process
* inbound events via HTTP REST apis.
*/
export class ExpressRequestProcessor extends AbstractRequestProcessor {
private readonly graphClientFactory: GraphClientFactory;
constructor(protected automations: AutomationServer,
protected configuration: Configuration,
protected listeners: AutomationEventListener[] = []) {
super(automations, listeners);
this.graphClientFactory = this.configuration.graphql.client.factory;
}
protected sendStatusMessage(payload: any, ctx: HandlerContext & AutomationContextAware): Promise<any> {
return Promise.resolve();
}
protected createGraphClient(event: EventIncoming | CommandIncoming,
context: AutomationContextAware): GraphClient {
return !!this.configuration.http.graphClientFactory ?
this.configuration.http.graphClientFactory(context) :
this.graphClientFactory.create(
workspaceId(event),
this.configuration);
}
protected createMessageClient(event: EventIncoming | CommandIncoming,
context: AutomationContextAware): MessageClient {
return !!this.configuration.http.messageClientFactory ?
this.configuration.http.messageClientFactory(context) :
new ExpressMessageClient(event, this.configuration);
}
}
class ExpressMessageClient implements MessageClient {
private readonly delegate: MessageClient;
constructor(private readonly event: EventIncoming | CommandIncoming, configuration: Configuration) {
if (automationClientInstance().webSocketHandler
&& (automationClientInstance().webSocketHandler as any).webSocketLifecycle) {
const ws = (automationClientInstance().webSocketHandler as any).webSocketLifecycle as WebSocketLifecycle;
if (isCommandIncoming(this.event)) {
this.delegate = new WebSocketCommandMessageClient(this.event, ws, configuration);
} else if (isEventIncoming(this.event)) {
this.delegate = new WebSocketEventMessageClient(this.event, ws, configuration);
}
}
}
public respond(msg: any, options?: MessageOptions): Promise<any> {
if (this.delegate) {
return this.delegate.respond(msg, options);
} else {
return Promise.resolve();
}
}
public send(msg: any, destinations: Destination | Destination[], options?: MessageOptions): Promise<any> {
if (this.delegate) {
return this.delegate.send(msg, destinations, options);
} else {
return Promise.resolve();
}
}
public delete(destinations: Destination | Destination[],
options: RequiredMessageOptions): Promise<void> {
if (this.delegate) {
return this.delegate.delete(destinations, options);
} else {
return Promise.resolve();
}
}
}