-
Notifications
You must be signed in to change notification settings - Fork 159
/
message.ex
172 lines (141 loc) · 6.23 KB
/
message.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
defmodule Broadway.Message do
@moduledoc """
This struct holds all information about a message.
A message is first created by the producers. It is then
sent downstream and gets updated multiple times, either
by a module implementing the `Broadway` behaviour
through the `c:Broadway.handle_message/3` callback
or internally by one of the built-in stages of Broadway.
Instead of modifying the struct directly, you should use the functions
provided by this module to manipulate messages.
"""
alias __MODULE__, as: Message
alias Broadway.{Acknowledger, NoopAcknowledger}
@type t :: %Message{
data: term,
metadata: %{optional(atom) => term},
acknowledger: {module, ack_ref :: term, data :: term},
batcher: atom,
batch_key: term,
batch_mode: :bulk | :flush,
status:
:ok
| {:failed, reason :: binary}
| {:throw | :error | :exit, term, Exception.stacktrace()}
}
@enforce_keys [:data, :acknowledger]
defstruct data: nil,
metadata: %{},
acknowledger: nil,
batcher: :default,
batch_key: :default,
batch_mode: :bulk,
status: :ok
@doc """
Updates the data in the message.
This function is usually used inside the `c:Broadway.handle_message/3` implementation
to update data with new processed data.
"""
@spec update_data(message :: Message.t(), fun :: (term -> term)) :: Message.t()
def update_data(%Message{} = message, fun) when is_function(fun, 1) do
%Message{message | data: fun.(message.data)}
end
@doc """
Stores the given data in the message.
This function is usually used inside the `c:Broadway.handle_message/3` implementation
to replace data with new processed data.
"""
@spec put_data(message :: Message.t(), term) :: Message.t()
def put_data(%Message{} = message, data) do
%Message{message | data: data}
end
@doc """
Defines the target batcher which the message should be forwarded to.
"""
@spec put_batcher(message :: Message.t(), batcher :: atom) :: Message.t()
def put_batcher(%Message{} = message, batcher) when is_atom(batcher) do
%Message{message | batcher: batcher}
end
@doc """
Defines the message batch key.
The batch key identifies the batch the message belongs to, within
a given batcher. Each batcher then groups batches with the same
`batch_key`, with size of at most `batch_size` within period
`batch_timeout`. Both `batch_size` and `batch_timeout` are managed
per batch key, so a batcher is capable of grouping multiple batch
keys at the same time, regardless of the concurrency level.
If a given batcher has multiple batch processors (concurrency > 1),
all messages with the same batch key are routed to the same processor.
So different batch keys may run concurrently but the same batch key
is always run serially and in the same batcher processor.
"""
@spec put_batch_key(message :: Message.t(), batch_key :: term) :: Message.t()
def put_batch_key(%Message{} = message, batch_key) do
%Message{message | batch_key: batch_key}
end
@doc """
Sets the batching mode for the message.
When the mode is `:bulk`, the batch that the message is in is delivered after
the batch size or batch timeout is reached.
When the mode is `:flush`, the batch that the message is in is delivered
immediately after processing. Note it doesn't mean the batch contains only a single element
but rather that all messages received from the processor are delivered without waiting.
The default mode for messages is `:bulk`.
"""
@spec put_batch_mode(message :: Message.t(), mode :: :bulk | :flush) :: Message.t()
def put_batch_mode(%Message{} = message, mode) when mode in [:bulk, :flush] do
%Message{message | batch_mode: mode}
end
@doc """
Configures the acknowledger of this message.
This function calls the `c:Broadway.Acknowledger.configure/3` callback to
change the configuration of the acknowledger for the given `message`.
This function can only be called if the acknowledger implements the `configure/3`
callback. If it doesn't, an error is raised.
"""
@spec configure_ack(message :: Message.t(), options :: keyword) :: Message.t()
def configure_ack(%Message{} = message, options) when is_list(options) do
%{acknowledger: {module, ack_ref, ack_data}} = message
if Code.ensure_loaded?(module) and function_exported?(module, :configure, 3) do
{:ok, ack_data} = module.configure(ack_ref, ack_data, options)
%{message | acknowledger: {module, ack_ref, ack_data}}
else
raise "the configure/3 callback is not defined by acknowledger #{inspect(module)}"
end
end
@doc """
Mark a message as failed.
Failed messages are sent directly to the related acknowledger at the end
of this step and therefore they're not forwarded to the next step in the
pipeline.
Failing a message does not emit any log but it does trigger the
`c:Broadway.handle_failed/2` callback.
"""
@spec failed(message :: Message.t(), reason :: binary) :: Message.t()
def failed(%Message{} = message, reason) do
%Message{message | status: {:failed, reason}}
end
@doc """
Immediately acknowledges the given message or list of messages.
This function can be used to acknowledge a message (or list of messages)
immediately without waiting for the rest of the pipeline.
Acknowledging a message sets that message's acknowledger to a no-op
acknowledger so that it's safe to ack at the end of the pipeline.
Returns the updated acked message if a message is passed in,
or the updated list of acked messages if a list of messages is passed in.
"""
@spec ack_immediately(message :: Message.t()) :: Message.t()
@spec ack_immediately(messages :: [Message.t(), ...]) :: [Message.t(), ...]
def ack_immediately(message_or_messages)
def ack_immediately(%Message{} = message) do
[message] = ack_immediately([message])
message
end
def ack_immediately(messages) when is_list(messages) and messages != [] do
{successful, failed} = Enum.split_with(messages, &(&1.status == :ok))
_ = Acknowledger.ack_messages(successful, failed)
for message <- messages do
%{message | acknowledger: {NoopAcknowledger, _ack_ref = nil, _data = nil}}
end
end
end