/
handler.ex
614 lines (465 loc) · 18.1 KB
/
handler.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
defmodule Commanded.Event.Handler do
@moduledoc """
Defines the behaviour an event handler must implement and
provides a convenience macro that implements the behaviour, allowing you to
handle only the events you are interested in processing.
You should start your event handlers using a [Supervisor](supervision.html) to
ensure they are restarted on error.
## Example
defmodule ExampleHandler do
use Commanded.Event.Handler, name: "ExampleHandler"
def handle(%AnEvent{..}, _metadata) do
# ... process the event
:ok
end
end
Start your event handler process (or use a [Supervisor](supervision.html)):
{:ok, _handler} = ExampleHandler.start_link()
## Event handler name
The name you specify is used when subscribing to the event store. Therefore
you *should not* change the name once the handler has been deployed. A new
subscription will be created when you change the name, and you event handler
will receive already handled events.
You can use the module name of your event handler using the `__MODULE__`
special form:
defmodule ExampleHandler do
use Commanded.Event.Handler,
name: __MODULE__
end
## Subscription options
You can choose to start the event handler's event store subscription from
`:origin`, `:current` position, or an exact event number using the
`start_from` option. The default is to use the origin so your handler will
receive *all* events.
Use the `:current` position when you don't want newly created event handlers
to go through all previous events. An example would be adding an event handler
to send transactional emails to an already deployed system containing many
historical events.
### Example
Set the `start_from` option (`:origin`, `:current`, or an explicit event
number) when using `Commanded.Event.Handler`:
defmodule ExampleHandler do
use Commanded.Event.Handler,
name: "ExampleHandler",
start_from: :origin
end
You can optionally override `:start_from` by passing it as option when
starting your handler:
{:ok, _handler} = ExampleHandler.start_link(start_from: :current)
### Subscribing to an individual stream
By default event handlers will subscribe to all events appended to any stream.
Provide a `subscribe_to` option to subscribe to a single stream.
defmodule ExampleHandler do
use Commanded.Event.Handler,
name: __MODULE__,
subscribe_to: "stream1234"
end
This will ensure the handler only receives events appended to that stream.
## `c:init/0` callback
You can define an `c:init/0` function in your handler to be called once it has
started and successfully subscribed to the event store.
This callback function must return `:ok`, any other return value will
terminate the event handler with an error.
defmodule ExampleHandler do
use Commanded.Event.Handler, name: "ExampleHandler"
def init do
# optional initialisation
:ok
end
def handle(%AnEvent{..}, _metadata) do
# ... process the event
:ok
end
end
## `c:error/3` callback
You can define an `c:error/3` callback function to handle any errors returned
from your event handler's `handle/2` functions. The `c:error/3` function is
passed the actual error (e.g. `{:error, :failure}`), the failed event, and a
failure context.
Use pattern matching on the error and/or failed event to explicitly handle
certain errors or events. You can choose to retry, skip, or stop the event
handler after an error.
The default behaviour if you don't provide an `c:error/3` callback is to stop
the event handler using the exact error reason returned from the `handle/2`
function. You should supervise event handlers to ensure they are correctly
restarted on error.
### Example error handling
defmodule ExampleHandler do
use Commanded.Event.Handler, name: __MODULE__
require Logger
alias Commanded.Event.FailureContext
def handle(%AnEvent{}, _metadata) do
# simulate event handling failure
{:error, :failed}
end
def error({:error, :failed}, %AnEvent{} = event, %FailureContext{context: context}) do
context = record_failure(context)
case Map.get(context, :failures) do
too_many when too_many >= 3 ->
# skip bad event after third failure
Logger.warn(fn -> "Skipping bad event, too many failures: " <> inspect(event) end)
:skip
_ ->
# retry event, failure count is included in context map
{:retry, context}
end
end
defp record_failure(context) do
Map.update(context, :failures, 1, fn failures -> failures + 1 end)
end
end
## Consistency
For each event handler you can define its consistency, as one of either
`:strong` or `:eventual`.
This setting is used when dispatching commands and specifying the
`consistency` option.
When you dispatch a command using `:strong` consistency, after successful
command dispatch the process will block until all event handlers configured to
use `:strong` consistency have processed the domain events created by the
command. This is useful when you have a read model updated by an event handler
that you wish to query for data affected by the command dispatch. With
`:strong` consistency you are guaranteed that the read model will be
up-to-date after the command has successfully dispatched. It can be safely
queried for data updated by any of the events created by the command.
The default setting is `:eventual` consistency. Command dispatch will return
immediately upon confirmation of event persistence, not waiting for any event
handlers.
### Example
defmodule ExampleHandler do
use Commanded.Event.Handler,
name: "ExampleHandler",
consistency: :strong
end
"""
use GenServer
use Commanded.Registration
require Logger
alias Commanded.Event.{FailureContext, Handler}
alias Commanded.EventStore
alias Commanded.EventStore.RecordedEvent
alias Commanded.Subscriptions
@type domain_event :: struct()
@type metadata :: map()
@type subscribe_from :: :origin | :current | non_neg_integer()
@type consistency :: :eventual | :strong
@doc """
Optional initialisation callback function called when the handler starts.
Can be used to start any related processes when the event handler is started.
Return `:ok` on success, or `{:stop, reason}` to stop the handler process.
"""
@callback init() :: :ok | {:stop, reason :: any()}
@doc """
Event handler behaviour to handle a domain event and its metadata.
Return `:ok` on success, `{:error, :already_seen_event}` to ack and skip the
event, or `{:error, reason}` on failure.
"""
@callback handle(domain_event, metadata) ::
:ok
| {:error, :already_seen_event}
| {:error, reason :: any()}
@doc """
Called when an event `handle/2` callback returns an error.
The `c:error/3` function allows you to control how event handling failures
are handled. The function is passed the error returned by the event handler
(e.g. `{:error, :failure}`), the event causing the error, and a context map
containing state passed between retries. Use the context map to track any
transient state you need to access between retried failures.
You can return one of the following responses depending upon the
error severity:
- `{:retry, context}` - retry the failed event, provide a context
map containing any state passed to subsequent failures. This could be used
to count the number of failures, stopping after too many.
- `{:retry, delay, context}` - retry the failed event, after sleeping for
the requested delay (in milliseconds). Context is a map as described in
`{:retry, context}` above.
- `:skip` - skip the failed event by acknowledging receipt.
- `{:stop, reason}` - stop the event handler with the given reason.
"""
@callback error(
error :: term(),
failed_event :: domain_event,
failure_context :: FailureContext.t()
) ::
{:retry, context :: map()}
| {:retry, delay :: non_neg_integer(), context :: map()}
| :skip
| {:stop, reason :: term()}
@doc """
Macro as a convenience for defining an event handler.
"""
defmacro __using__(opts) do
quote location: :keep do
@before_compile unquote(__MODULE__)
@behaviour Commanded.Event.Handler
@opts unquote(opts) || []
@name Commanded.Event.Handler.parse_name(__MODULE__, @opts[:name])
@doc false
def start_link(opts \\ []) do
opts = Commanded.Event.Handler.start_opts(__MODULE__, Keyword.drop(@opts, [:name]), opts)
Commanded.Event.Handler.start_link(@name, __MODULE__, opts)
end
@doc """
Provides a child specification to allow the event handler to be easily
supervised.
## Example
Supervisor.start_link([
{ExampleHandler, []}
], strategy: :one_for_one)
"""
def child_spec(opts) do
default = %{
id: {__MODULE__, @name},
start: {__MODULE__, :start_link, [opts]},
restart: :permanent,
type: :worker
}
Supervisor.child_spec(default, [])
end
@doc false
def __name__, do: @name
@doc false
def init, do: :ok
defoverridable init: 0
end
end
@doc false
def parse_name(module, name) when name in [nil, ""],
do: raise("#{inspect(module)} expects `:name` to be given")
def parse_name(_module, name) when is_binary(name), do: name
def parse_name(_module, name), do: inspect(name)
@doc false
def start_opts(module, module_opts, local_opts, additional_allowed_opts \\ []) do
{valid, invalid} =
module_opts
|> Keyword.merge(local_opts)
|> Keyword.split([:consistency, :start_from, :subscribe_to] ++ additional_allowed_opts)
if Enum.any?(invalid) do
raise "#{inspect(module)} specifies invalid options: #{inspect(Keyword.keys(invalid))}"
else
valid
end
end
# Include default `handle/2` and `error/3` callback functions in module
@doc false
defmacro __before_compile__(_env) do
quote generated: true do
@doc false
def handle(_event, _metadata), do: :ok
@doc false
def error({:error, reason}, _failed_event, _failure_context), do: {:stop, reason}
end
end
@doc false
defstruct [
:consistency,
:handler_name,
:handler_module,
:last_seen_event,
:subscribe_from,
:subscribe_to,
:subscription
]
@doc false
def start_link(handler_name, handler_module, opts \\ []) do
name = name(handler_name)
handler = %Handler{
handler_name: handler_name,
handler_module: handler_module,
consistency: consistency(opts),
subscribe_from: start_from(opts),
subscribe_to: subscribe_to(opts)
}
Registration.start_link(name, __MODULE__, handler)
end
defp name(name), do: {__MODULE__, name}
@doc false
def init(%Handler{} = state) do
:ok = GenServer.cast(self(), :subscribe_to_events)
{:ok, state}
end
@doc false
def handle_call(:last_seen_event, _from, %Handler{} = state) do
%Handler{last_seen_event: last_seen_event} = state
{:reply, last_seen_event, state}
end
@doc false
def handle_call(:config, _from, %Handler{} = state) do
%Handler{consistency: consistency, subscribe_from: subscribe_from, subscribe_to: subscribe_to} =
state
config = [consistency: consistency, start_from: subscribe_from, subscribe_to: subscribe_to]
{:reply, config, state}
end
@doc false
def handle_cast(:subscribe_to_events, %Handler{} = state) do
{:noreply, subscribe_to_events(state)}
end
@doc false
# Subscription to event store has successfully subscribed, init event handler
def handle_info({:subscribed, subscription}, %Handler{subscription: subscription} = state) do
Logger.debug(fn -> describe(state) <> " has successfully subscribed to event store" end)
%Handler{
consistency: consistency,
handler_module: handler_module,
handler_name: handler_name
} = state
case handler_module.init() do
:ok ->
# Register this event handler as a subscription with the given consistency
:ok = Subscriptions.register(handler_name, consistency)
{:noreply, state}
{:stop, reason} ->
Logger.debug(fn -> describe(state) <> " `init/0` callback has requested to stop" end)
{:stop, reason, state}
end
end
@doc false
def handle_info({:events, events}, %Handler{} = state) do
Logger.debug(fn -> describe(state) <> " received events: #{inspect(events)}" end)
try do
state = Enum.reduce(events, state, &handle_event/2)
{:noreply, state}
catch
{:error, reason} ->
# stop after event handling returned an error
{:stop, reason, state}
end
end
defp subscribe_to_events(%Handler{} = state) do
%Handler{
handler_name: handler_name,
subscribe_from: subscribe_from,
subscribe_to: subscribe_to
} = state
{:ok, subscription} =
EventStore.subscribe_to(subscribe_to, handler_name, self(), subscribe_from)
%Handler{state | subscription: subscription}
end
defp handle_event(event, handler, context \\ %{})
# Ignore already seen event.
defp handle_event(
%RecordedEvent{event_number: event_number} = event,
%Handler{last_seen_event: last_seen_event} = state,
_context
)
when not is_nil(last_seen_event) and event_number <= last_seen_event do
Logger.debug(fn -> describe(state) <> " has already seen event ##{inspect(event_number)}" end)
confirm_receipt(event, state)
end
# Delegate event to handler module.
defp handle_event(%RecordedEvent{} = event, %Handler{} = state, context) do
case delegate_event_to_handler(event, state) do
:ok ->
confirm_receipt(event, state)
{:error, :already_seen_event} ->
confirm_receipt(event, state)
{:error, reason} = error ->
Logger.error(fn ->
describe(state) <>
" failed to handle event #{inspect(event)} due to: #{inspect(reason)}"
end)
handle_event_error(error, event, state, context)
end
end
defp delegate_event_to_handler(%RecordedEvent{} = event, %Handler{} = state) do
%RecordedEvent{data: data} = event
%Handler{handler_module: handler_module} = state
metadata = enrich_metadata(event)
try do
handler_module.handle(data, metadata)
rescue
e ->
{:error, e}
end
end
defp handle_event_error(error, %RecordedEvent{} = failed_event, %Handler{} = state, context) do
%RecordedEvent{data: data} = failed_event
%Handler{handler_module: handler_module} = state
failure_context = %FailureContext{
context: context,
metadata: enrich_metadata(failed_event)
}
case handler_module.error(error, data, failure_context) do
{:retry, context} when is_map(context) ->
# Retry the failed event
Logger.info(fn -> describe(state) <> " is retrying failed event" end)
handle_event(failed_event, state, context)
{:retry, delay, context} when is_map(context) and is_integer(delay) and delay >= 0 ->
# Retry the failed event after waiting for the given delay, in milliseconds
Logger.info(fn ->
describe(state) <> " is retrying failed event after #{inspect(delay)}ms"
end)
:timer.sleep(delay)
handle_event(failed_event, state, context)
:skip ->
# Skip the failed event by confirming receipt
Logger.info(fn -> describe(state) <> " is skipping event" end)
confirm_receipt(failed_event, state)
{:stop, reason} ->
# Stop event handler
Logger.warn(fn -> describe(state) <> " has requested to stop: #{inspect(reason)}" end)
throw({:error, reason})
invalid ->
Logger.warn(fn ->
describe(state) <> " returned an invalid error reponse: #{inspect(invalid)}"
end)
# Stop event handler with original error
throw(error)
end
end
# Confirm receipt of event
defp confirm_receipt(%RecordedEvent{} = event, %Handler{} = state) do
%RecordedEvent{event_number: event_number} = event
Logger.debug(fn ->
describe(state) <> " confirming receipt of event ##{inspect(event_number)}"
end)
ack_event(event, state)
%Handler{state | last_seen_event: event_number}
end
defp ack_event(event, %Handler{} = state) do
%Handler{
consistency: consistency,
handler_name: handler_name,
subscription: subscription
} = state
:ok = EventStore.ack_event(subscription, event)
:ok = Subscriptions.ack_event(handler_name, consistency, event)
end
@enrich_metadata_fields [
:event_id,
:event_number,
:stream_id,
:stream_version,
:correlation_id,
:causation_id,
:created_at
]
defp enrich_metadata(%RecordedEvent{} = event) do
%RecordedEvent{metadata: metadata} = event
event
|> Map.from_struct()
|> Map.take(@enrich_metadata_fields)
|> Map.merge(metadata || %{})
end
defp consistency(opts) do
case opts[:consistency] || Application.get_env(:commanded, :default_consistency, :eventual) do
consistency when consistency in [:eventual, :strong] -> consistency
invalid -> raise "Invalid `consistency` option: #{inspect(invalid)}"
end
end
defp start_from(opts) do
case opts[:start_from] || :origin do
start_from when start_from in [:origin, :current] -> start_from
start_from when is_integer(start_from) -> start_from
invalid -> "Invalid `start_from` option: #{inspect(invalid)}"
end
end
defp subscribe_to(opts) do
case opts[:subscribe_to] || :all do
:all -> :all
stream when is_binary(stream) -> stream
invalid -> "Invalid `subscribe_to` option: #{inspect(invalid)}"
end
end
defp describe(%Handler{handler_module: handler_module}),
do: inspect(handler_module)
end