-
Notifications
You must be signed in to change notification settings - Fork 4
/
EventStreamer.ts
157 lines (137 loc) · 3.98 KB
/
EventStreamer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/**
* @module @ddes/event-streaming
*/
import {
Commit,
EventWithMetadata,
StorePoller,
StorePollerParams,
} from '@ddes/core'
import * as debug from 'debug'
import {IncomingMessage} from 'http'
import {get} from 'lodash'
import {Server as WebSocketServer} from 'ws'
import {FilterSet} from './types'
export default class EventStreamer {
public wss: WebSocketServer
protected debug: debug.IDebugger
protected chronologicalGroups: string[]
protected storePollers: StorePoller[] = []
constructor(
params: StorePollerParams & {
chronologicalGroups?: string[]
port: number
authenticateClient?: (info: {
origin: string
req: IncomingMessage
secure: boolean
}) => boolean
}
) {
const {
authenticateClient: verifyClient,
port = 80,
chronologicalGroups = ['default'],
...storePollerParams
} = params
this.wss = new WebSocketServer({verifyClient, port})
this.wss.on('connection', this.onClientConnected.bind(this))
this.chronologicalGroups = chronologicalGroups
for (const chronologicalGroup of chronologicalGroups) {
this.storePollers.push(
new StorePoller({
...storePollerParams,
processCommit: this.processCommit.bind(this),
})
)
}
this.debug = debug('DDES.EventStreamer.Server')
}
public close() {
if (this.wss) {
this.wss.close()
}
for (const storePoller of this.storePollers) {
storePoller.stop()
}
}
public async processCommit(commit: Commit) {
const {
events,
aggregateType,
aggregateKey,
aggregateVersion,
timestamp,
sortKey,
} = commit
for (const [commitEventIndex, event] of Object.entries(events)) {
this.publishEventToSubscribers({
...event,
aggregateType,
aggregateKey,
aggregateVersion,
timestamp,
commitEventIndex: parseInt(commitEventIndex, 10),
})
}
}
public publishEventToSubscribers(eventWithMetadata: EventWithMetadata) {
clients: for (const client of this.wss.clients) {
if (!(client as any).filterSets) {
continue // skip clients that have not sent filtersets yet
}
const {filterSets}: {filterSets: FilterSet[]} = client as any
let clientShouldReceiveEvent = false
filtersets: for (const filterSet of filterSets) {
for (const [filterKey, filterValue] of Object.entries(filterSet)) {
const eventValue = get(eventWithMetadata, filterKey)
if (Array.isArray(filterValue)) {
if (!filterValue.includes(eventValue)) {
continue filtersets
}
} else if (typeof filterValue === 'object' && filterValue.regexp) {
if (
!(
typeof eventValue === 'string' &&
eventValue.match(filterValue.regexp)
)
) {
continue filtersets
}
} else {
if (eventValue !== filterValue) {
continue filtersets
}
}
}
clientShouldReceiveEvent = true
}
if (client.OPEN && clientShouldReceiveEvent) {
client.send(JSON.stringify(eventWithMetadata))
}
}
}
protected onClientConnected(client: any) {
const clientAddress = client._socket.remoteAddress
this.debug(`new client (${clientAddress})`)
client.filterSets = []
client.on('message', (json: string) => {
this.debug(`filter sets for ${clientAddress} set to ${json}`)
client.filterSets = JSON.parse(json)
})
client.on('close', this.onClientDisconnected.bind(this))
if (this.wss.clients.size === 1) {
for (const storePoller of this.storePollers) {
storePoller.sortKeyCursor = new Date()
storePoller.start()
}
}
}
protected onClientDisconnected() {
if (this.wss.clients.size === 0) {
for (const storePoller of this.storePollers) {
storePoller.stop()
}
}
}
}