-
Notifications
You must be signed in to change notification settings - Fork 16
/
dead_letter.ex
61 lines (45 loc) · 1.47 KB
/
dead_letter.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
defmodule Conduit.Plug.DeadLetter do
use Conduit.Plug.Builder
@moduledoc """
Publishes messages that were nacked or raised an exception to a
dead letter destination.
The following options are required:
* `broker` - The broker to publish the message with.
* `publish_to` - The place to publish the message.
In order for this to work, you must also define an outgoing publish with the same
name as the `publish_to` option in your broker.
publish :error, to: "my_app.error"
## Examples
plug Conduit.Plug.DeadLetter, broker: MyApp.Broker, publish_to: :error
"""
def init(opts) do
# Fail if opts are missing
_ = Keyword.fetch!(opts, :publish_to)
_ = Keyword.fetch!(opts, :broker)
opts
end
@doc """
Publishes messages that were nacked or raised an exception to a
dead letter destination.
"""
def call(message, next, opts) do
message = next.(message)
case message.status do
:nack -> publish_dead_letter(message, opts)
:ack -> message
end
rescue
error ->
message
|> put_header("exception", Exception.format(:error, error))
|> publish_dead_letter(opts)
reraise error, System.stacktrace()
end
@spec publish_dead_letter(Conduit.Message.t(), Keyword.t()) :: Conduit.Message.t()
defp publish_dead_letter(message, opts) do
broker = Keyword.get(opts, :broker)
publish_to = Keyword.get(opts, :publish_to)
broker.publish(message, publish_to, opts)
message
end
end