-
Notifications
You must be signed in to change notification settings - Fork 9
/
eventsourcing-aggregate.ts
320 lines (289 loc) · 11.6 KB
/
eventsourcing-aggregate.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
/*
* Copyright 2023 Fraktalio D.O.O. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "
* AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
* language governing permissions and limitations under the License.
*/
/* eslint-disable functional/no-classes,@typescript-eslint/no-explicit-any,functional/no-loop-statements */
import { IDecider } from '../domain/decider';
import { ISaga } from '../domain/saga';
/**
* Event repository interface
*
* @typeParam C - Command
* @typeParam E - Event
* @typeParam V - Version
* @typeParam CM - Command Metadata
* @typeParam EM - Event Metadata
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
export interface IEventRepository<C, E, V, CM, EM> {
/**
* Fetch events
*
* @param command - Command of type `C` with metadata of type `CM`
*
* @return list of Events with Version and Event Metadata
*/
readonly fetch: (command: C & CM) => Promise<readonly (E & V & EM)[]>;
/**
* Get the latest event stream version / sequence
*
* @param event - Event of type `E`
*
* @return the latest version / sequence of the event stream that this event belongs to.
*/
readonly versionProvider: (event: E) => Promise<V | null>;
/**
* Save events
*
* @param events - list of Events
* @param commandMetadata - Command Metadata of the command that initiated `events`
* @param versionProvider - A provider for the Latest Event in this stream and its Version/Sequence
* @return a list of newly saved Event(s) of type `E` with Version of type `V` and with Event Metadata of type `EM`
*/
readonly save: (
events: readonly E[],
commandMetadata: CM,
versionProvider: (e: E) => Promise<V | null>
) => Promise<readonly (E & V & EM)[]>;
}
/**
* Event sourcing aggregate interface is using/delegating a `decider` of type `IDecider`<`C`, `S`, `E`> to handle commands and produce events.
* In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via `IEventRepository.fetchEvents` function, and then delegate the command to the `decider` which can produce new event(s) as a result.
*
* Produced events are then stored via `IEventRepository.save` function.
*
* @typeParam C - Commands of type `C` that this aggregate can handle
* @typeParam S - Aggregate state of type `S`
* @typeParam E - Events of type `E` that this aggregate can publish
* @typeParam V - Version
* @typeParam CM - Command Metadata
* @typeParam EM - Event Metadata
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
export interface IEventSourcingAggregate<C, S, E, V, CM, EM>
extends IDecider<C, S, E>,
IEventRepository<C, E, V, CM, EM> {
/**
* Handles the command of type `C`, and returns new persisted list of pairs of event and its version.
*
* @param command - Command of type `C` with Command Metadata
* @return list of persisted events with Version and Event Metadata
*/
readonly handle: (command: C & CM) => Promise<readonly (E & V & EM)[]>;
}
/**
* Event sourcing orchestrating aggregate interface is using/delegating a `decider` of type `IDecider`<`C`, `S`, `E`> to handle commands and produce events.
* In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via `IEventRepository.fetchEvents` function, and then delegate the command to the `decider` which can produce new event(s) as a result.
*
* If the `decider` is combined out of many deciders via `combine` function, an optional `EventSourcingOrchestratingAggregate.saga` could be used to react on new events and send new commands to the `decider` recursively, in one transaction.
*
* Produced events are then stored via `IEventRepository.save` function.
*
* @typeParam C - Commands of type `C` that this aggregate can handle
* @typeParam S - Aggregate state of type `S`
* @typeParam E - Events of type `E` that this aggregate can publish
* @typeParam V - Version
* @typeParam CM - Command Metadata
* @typeParam EM - Event Metadata
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
export interface IEventSourcingOrchestratingAggregate<C, S, E, V, CM, EM>
extends IEventSourcingAggregate<C, S, E, V, CM, EM>,
ISaga<E, C> {}
/**
* An abstract algorithm to compute new events based on the old events and the command being handled.
*/
export abstract class EventComputation<C, S, E> implements IDecider<C, S, E> {
protected constructor(protected readonly decider: IDecider<C, S, E>) {
this.initialState = decider.initialState;
}
readonly initialState: S;
decide(command: C, state: S): readonly E[] {
return this.decider.decide(command, state);
}
evolve(state: S, event: E): S {
return this.decider.evolve(state, event);
}
protected computeNewEvents(events: readonly E[], command: C): readonly E[] {
const currentState = events.reduce(
this.decider.evolve,
this.decider.initialState
);
return this.decider.decide(command, currentState);
}
}
/**
* An abstract algorithm to compute new events based on the old events and the command being handled.
* It returns all the events, including the events created by handling commands which are triggered by Saga - orchestration included.
*/
export abstract class EventOrchestratingComputation<C, S, E, CM>
implements IDecider<C, S, E>, ISaga<E, C>
{
protected constructor(
protected readonly decider: IDecider<C, S, E>,
protected readonly saga: ISaga<E, C>
) {
this.initialState = decider.initialState;
}
readonly initialState: S;
decide(command: C, state: S): readonly E[] {
return this.decider.decide(command, state);
}
evolve(state: S, event: E): S {
return this.decider.evolve(state, event);
}
react(event: E): readonly C[] {
return this.saga.react(event);
}
private computeNewEventsInternally(
events: readonly E[],
command: C
): readonly E[] {
const currentState = events.reduce(
this.decider.evolve,
this.decider.initialState
);
return this.decider.decide(command, currentState);
}
protected async computeNewEvents(
events: readonly E[],
command: C & CM,
fetch: (c: C & CM) => Promise<readonly E[]>
): Promise<readonly E[]> {
// eslint-disable-next-line functional/no-let
let resultingEvents = this.computeNewEventsInternally(events, command);
await asyncForEach(
resultingEvents.flatMap((evt) => this.saga.react(evt)),
async (cmd) => {
const newEvents = this.computeNewEvents(
(await fetch(cmd)).map((evt) => evt as E).concat(resultingEvents),
cmd,
fetch
);
resultingEvents = resultingEvents.concat(await newEvents);
}
);
return resultingEvents;
}
}
/**
* Event sourcing aggregate is using/delegating a `EventSourcingAggregate.decider` of type `IDecider`<`C`, `S`, `E`> to handle commands and produce events.
* In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via `IEventRepository.fetchEvents` function, and then delegate the command to the `EventSourcingAggregate.decider` which can produce new event(s) as a result.
*
*
* Produced events are then stored via `IEventRepository.save` function.
*
* @typeParam C - Commands of type `C` that this aggregate can handle
* @typeParam S - Aggregate state of type `S`
* @typeParam E - Events of type `E` that this aggregate can publish
* @typeParam E - Version
* @typeParam CM - Command Metadata
* @typeParam EM - Event Metadata
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
export class EventSourcingAggregate<C, S, E, V, CM, EM>
extends EventComputation<C, S, E>
implements IEventSourcingAggregate<C, S, E, V, CM, EM>
{
constructor(
decider: IDecider<C, S, E>,
protected readonly eventRepository: IEventRepository<C, E, V, CM, EM>
) {
super(decider);
}
async fetch(command: C & CM): Promise<readonly (E & V & EM)[]> {
return this.eventRepository.fetch(command);
}
async versionProvider(event: E): Promise<V | null> {
return this.eventRepository.versionProvider(event);
}
async save(
events: readonly E[],
commandMetadata: CM,
versionProvider: (e: E) => Promise<V | null>
): Promise<readonly (E & V & EM)[]> {
return this.eventRepository.save(events, commandMetadata, versionProvider);
}
async handle(command: C & CM): Promise<readonly (E & V & EM)[]> {
const currentEvents = await this.eventRepository.fetch(command);
return this.eventRepository.save(
this.computeNewEvents(currentEvents, command),
command,
async () => currentEvents[currentEvents.length - 1]
);
}
}
/**
* Event sourcing orchestrating aggregate is using/delegating a `EventSourcingOrchestratingAggregate.decider` of type `IDecider`<`C`, `S`, `E`> to handle commands and produce events.
* In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via `IEventRepository.fetchEvents` function, and then delegate the command to the `EventSourcingOrchestratingAggregate.decider` which can produce new event(s) as a result.
*
* If the `EventSourcingOrchestratingAggregate.decider` is combined out of many deciders via `combine` function, an optional `EventSourcingOrchestratingAggregate.saga` could be used to react on new events and send new commands to the `EventSourcingOrchestratingAggregate.decider` recursively, in one transaction.
*
* Produced events are then stored via `IEventRepository.save` function.
*
* @typeParam C - Commands of type `C` that this aggregate can handle
* @typeParam S - Aggregate state of type `S`
* @typeParam E - Events of type `E` that this aggregate can publish
* @typeParam V - Version
* @typeParam CM - Command Metadata
* @typeParam EM - Event Metadata
*
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
export class EventSourcingOrchestratingAggregate<C, S, E, V, CM, EM>
extends EventOrchestratingComputation<C, S, E, CM>
implements IEventSourcingOrchestratingAggregate<C, S, E, V, CM, EM>
{
constructor(
decider: IDecider<C, S, E>,
protected readonly eventRepository: IEventRepository<C, E, V, CM, EM>,
saga: ISaga<E, C>
) {
super(decider, saga);
}
async fetch(command: C & CM): Promise<readonly (E & V & EM)[]> {
return this.eventRepository.fetch(command);
}
async versionProvider(event: E): Promise<V | null> {
return this.eventRepository.versionProvider(event);
}
async save(
events: readonly E[],
commandMetadata: CM,
versionProvider: (e: E) => Promise<V | null>
): Promise<readonly (E & V & EM)[]> {
return this.eventRepository.save(events, commandMetadata, versionProvider);
}
async handle(command: C & CM): Promise<readonly (E & V & EM)[]> {
const currentEvents = await this.eventRepository.fetch(command);
return this.eventRepository.save(
await this.computeNewEvents(
currentEvents,
command,
async (cmd: C & CM) => await this.eventRepository.fetch(cmd)
),
command,
this.versionProvider.bind(this)
);
}
}
async function asyncForEach(
array: readonly any[],
callback: (arg0: any, arg1: number, arg2: any) => any
) {
for (let index = 0; index < array.length; index++) {
await callback(array[index], index, array);
}
}