-
Notifications
You must be signed in to change notification settings - Fork 51
/
producer.ex
638 lines (502 loc) · 22.2 KB
/
producer.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
defmodule BroadwayKafka.Producer do
@moduledoc """
A Kafka connector for Broadway.
BroadwayKafka can subscribe as a consumer to one or more topics and process streams
of records within the same consumer group. Communication is done through Kafka's
[Consumer API](https://kafka.apache.org/documentation.html#consumerapi) using the
[:brod](https://github.com/klarna/brod/) client.
## Options
* `:hosts` - Required. A list of host and port tuples or a single string of comma
separated HOST:PORT pairs to use for establishing the initial connection to Kafka,
e.g. [localhost: 9092]. Examples:
# Keyword
[kafka-vm1: 9092, kafka-vm2: 9092, kafka-vm3: 9092]
# List of tuples
[{"kafka-vm1", 9092}, {"kafka-vm2", 9092}, {"kafka-vm3", 9092}]
# String
"kafka-vm1:9092,kafka-vm2:9092,kafka-vm3:9092"
* `:group_id` - Required. A unique string that identifies the consumer group the producer
will belong to.
* `:topics` - Required. A list of topics that the producer will subscribe to.
* `:receive_interval` - Optional. The duration (in milliseconds) for which the producer
waits before making a request for more messages. Default is 2000 (2 seconds).
* `:offset_commit_on_ack` - Optional. Tells Broadway to send or not an offset commit
request after each acknowledgemnt. Default is `true`. Setting this value to `false` can
increase performance since commit requests will respect the `:offset_commit_interval_seconds`
option. However, setting long commit intervals might lead to a large number of duplicated
records to be processed after a server restart or connection loss. If that's the case, make
sure your logic is idempotent when consuming records to avoid inconsistencies. Also, bear
in mind the the negative performance impact might be insignificant if you're using batchers
since only one commit request will be performed per batch.
* `:offset_reset_policy` - Optional. Defines the offset to be used when there's no initial
offset in Kafka or if the current offset has expired. Possible values are `:earliest` or
`:latest`. Default is `:latest`.
* `:group_config` - Optional. A list of options used to configure the group
coordinator. See the ["Group config options"](#module-group-config-options) section below for a list of all available
options.
* `:fetch_config` - Optional. A list of options used when fetching messages. See the
["Fetch config options"](#module-fetch-config-options) section below for a list of all available options.
* `:client_config` - Optional. A list of options used when creating the client. See the
["Client config options"](#module-client-config-options) section below for a list of all available options.
## Group config options
The available options that will be passed to `:brod`'s group coordinator.
* `:offset_commit_interval_seconds` - Optional. The time interval between two
OffsetCommitRequest messages. Default is 5.
* `:rejoin_delay_seconds` - Optional. Delay in seconds before rejoining the group. Default is 1.
* `:session_timeout_seconds` - Optional. Time in seconds the group coordinator broker waits
before considering a member 'down' if no heartbeat or any kind of request is received.
A group member may also consider the coordinator broker 'down' if no heartbeat response
is received in the past N seconds. Default is 10 seconds.
## Fetch config options
The available options that will be internally passed to `:brod.fetch/5`.
* `:min_bytes` - Optional. The minimum amount of data to be fetched from the server.
If not enough data is available the request will wait for that much data to accumulate
before answering. Default is 1 byte. Setting this value greater than 1 can improve
server throughput a bit at the cost of additional latency.
* `:max_bytes` - Optional. The maximum amount of data to be fetched at a time from a single
partition. Default is 1048576 (1 MiB). Setting greater values can improve server
throughput at the cost of more memory consumption.
* `:max_wait_time` - Optional. Time in millisecond. Max number of milliseconds allowed for the broker to collect
`min_bytes` of messages in fetch response. Default is 1000ms.
## Client config options
The available options that will be internally passed to `:brod.start_client/3`.
* `:client_id_prefix` - Optional. A string that will be used to build the client id passed to `:brod`. The example
value `client_id_prefix: :"\#{Node.self()} -"` would generate the following connection log from our integration
tests:
20:41:37.717 [info] :supervisor: {:local, :brod_sup}
:started: [
pid: #PID<0.286.0>,
id: :"nonode@nohost - Elixir.BroadwayKafka.ConsumerTest.MyBroadway.Broadway.Producer_0.Client",
mfargs: {:brod_client, :start_link,
[
[localhost: 9092],
:"nonode@nohost - Elixir.BroadwayKafka.ConsumerTest.MyBroadway.Broadway.Producer_0.Client",
[client_id_prefix: :"nonode@nohost - "]
]},
restart_type: {:permanent, 10},
shutdown: 5000,
child_type: :worker
]
* `:sasl` - Optional. A a tuple of mechanism which can be `:plain`, `:scram_sha_256` or `:scram_sha_512`, username and password. See the `:brod`'s
[`Authentication Support`](https://github.com/klarna/brod#authentication-support) documentation
for more information. Default is no sasl options.
* `:ssl` - Optional. A boolean or a list of options to use when connecting via SSL/TLS. See the
[`tls_client_option`](http://erlang.org/doc/man/ssl.html#type-tls_client_option) documentation
for more information. Default is no ssl options.
* `:connect_timeout` - Optional. Time in milliseconds to be used as a timeout for `:brod`'s communication with Kafka.
Default is to use `:brod`'s default timeout which is currently 5 seconds.
> **Note**: Currently, Broadway does not support all options provided by `:brod`. If you
have a scenario where you need any extra option that is not listed above, please open an
issue, so we can consider adding it.
## Example
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module: {BroadwayKafka.Producer, [
hosts: [localhost: 9092],
group_id: "group_1",
topics: ["test"],
]},
concurrency: 1
],
processors: [
default: [
concurrency: 10
]
]
)
## Concurrency and partitioning
The concurrency model provided by Kafka is based on partitioning, i.e., the more partitions
you have, the more concurrency you get. However, in order to take advantage of this model
you need to set up the `:concurrency` options for your processors and batchers accordingly. Having
less concurrency than topic/partitions assigned will result in individual processors handling more
than one partition, decreasing the overall level of concurrency. Therefore, if you want to
always be able to process messages at maximum concurrency (assuming you have enough resources
to do it), you should increase the concurrency up front to make sure you have enough
processors to handle the extra messages received from new partitions assigned.
> **Note**: Even if you don't plan to add more partitions to a Kafka topic, your pipeline can still
receive more assignments than planned. For instance, if another consumer crashes, the server
will reassign all its topic/partition to other available consumers, including any Broadway
producer subscribed to the same topic.
## Handling failed messages
`broadway_kafka` never stops the flow of the stream, i.e. it will **always ack** the messages
even when they fail. Unlike queue-based connectors, where you can mark a single message as failed.
In Kafka that's not possible due to its single offset per topic/partition ack strategy. If you
want to reprocess failed messages, you need to roll your own strategy. A possible way to do that
is to implement `c:Broadway.handle_failed/2` and send failed messages to a separated stream or queue for
later processing.
"""
use GenStage
require Logger
import Record, only: [defrecord: 2, extract: 2]
alias Broadway.{Message, Acknowledger, Producer}
alias BroadwayKafka.Allocator
alias BroadwayKafka.Acknowledger
@behaviour Producer
@behaviour :brod_group_member
defrecord :kafka_message, extract(:kafka_message, from_lib: "brod/include/brod.hrl")
defrecord :brod_received_assignment,
extract(:brod_received_assignment, from_lib: "brod/include/brod.hrl")
@impl GenStage
def init(opts) do
client = opts[:client] || BroadwayKafka.BrodClient
case client.init(opts) do
{:error, message} ->
raise ArgumentError, "invalid options given to #{inspect(client)}.init/1, " <> message
{:ok, config} ->
{_, producer_name} = Process.info(self(), :registered_name)
prefix = get_in(config, [:client_config, :client_id_prefix])
client_id = :"#{prefix}#{Module.concat([producer_name, Client])}"
state = %{
client: client,
client_id: client_id,
group_coordinator: nil,
receive_timer: nil,
receive_interval: config.receive_interval,
reconnect_timeout: config.reconnect_timeout,
acks: Acknowledger.new(),
config: config,
allocator_names: allocator_names(opts[:broadway]),
revoke_caller: nil,
demand: 0,
shutting_down?: false,
buffer: :queue.new()
}
{:producer, connect(state)}
end
end
defp allocator_names(broadway_config) do
broadway_name = broadway_config[:name]
broadway_index = broadway_config[:index]
processors_allocators =
for {name, _} <- broadway_config[:processors] do
Module.concat([broadway_name, "Allocator_processor_#{name}"])
end
batchers_allocators =
for {name, _} <- broadway_config[:batchers] do
Module.concat([broadway_name, "Allocator_batcher_consumer_#{name}"])
end
{broadway_index, processors_allocators, batchers_allocators}
end
@impl GenStage
def handle_demand(incoming_demand, %{demand: demand} = state) do
maybe_schedule_poll(%{state | demand: demand + incoming_demand}, 0)
end
@impl GenStage
def handle_call(:drain_after_revoke, _from, %{group_coordinator: nil} = state) do
{:reply, :ok, [], state}
end
@impl GenStage
def handle_call(:drain_after_revoke, from, %{revoke_caller: nil} = state) do
state = reset_buffer(state)
if Acknowledger.all_drained?(state.acks) do
{:reply, :ok, [], %{state | acks: Acknowledger.new()}}
else
{:noreply, [], %{state | revoke_caller: from}}
end
end
@impl GenStage
def handle_cast({:update_topics, topics}, state) do
state.client.update_topics(state.group_coordinator, topics)
{:noreply, [], state}
end
@impl GenStage
def handle_info({:poll, key}, %{acks: acks, demand: demand} = state) do
# We only poll if:
#
# 1. We are not shutting down
# 2. Our assignments have not been revoked
# 3. We know the key being acked
#
# Note the key may be out of date when polling has been scheduled and
# assignments were revoked afterwards, which is why check 3 is necessary.
offset = Acknowledger.last_offset(acks, key)
if not state.shutting_down? and state.revoke_caller == nil and offset != nil do
messages = fetch_messages_from_kafka(state, key, offset)
{new_acks, new_demand, messages, pending} = split_demand(messages, acks, key, demand)
new_buffer = enqueue_many(state.buffer, key, pending)
new_state = %{state | acks: new_acks, demand: new_demand, buffer: new_buffer}
{:noreply, messages, new_state}
else
{:noreply, [], state}
end
end
@impl GenStage
def handle_info(:maybe_schedule_poll, state) do
maybe_schedule_poll(%{state | receive_timer: nil}, state.receive_interval)
end
@impl GenStage
def handle_info({:put_assignments, group_generation_id, assignments}, state) do
list =
Enum.map(assignments, fn assignment ->
brod_received_assignment(
topic: topic,
partition: partition,
begin_offset: begin_offset
) = assignment
offset_reset_policy = state.config[:offset_reset_policy]
offset =
state.client.resolve_offset(
topic,
partition,
begin_offset,
offset_reset_policy,
state.config
)
{group_generation_id, topic, partition, offset}
end)
topics_partitions = Enum.map(list, fn {_, topic, partition, _} -> {topic, partition} end)
{broadway_index, processors_allocators, batchers_allocators} = state.allocator_names
for allocator_name <- processors_allocators do
Allocator.allocate(allocator_name, broadway_index, topics_partitions)
end
for allocator_name <- batchers_allocators do
Allocator.allocate(allocator_name, broadway_index, topics_partitions)
end
{:noreply, [], %{state | acks: Acknowledger.add(state.acks, list)}}
end
@impl GenStage
def handle_info({:ack, key, offsets}, state) do
%{group_coordinator: group_coordinator, client: client, acks: acks, config: config} = state
{generation_id, topic, partition} = key
{drained?, new_offset, updated_acks} = Acknowledger.update_current_offset(acks, key, offsets)
if new_offset do
try do
client.ack(group_coordinator, generation_id, topic, partition, new_offset, config)
catch
kind, reason ->
Logger.error(Exception.format(kind, reason, __STACKTRACE__))
end
end
new_state =
if drained? && state.revoke_caller && Acknowledger.all_drained?(updated_acks) do
GenStage.reply(state.revoke_caller, :ok)
%{state | revoke_caller: nil, acks: Acknowledger.new()}
else
%{state | acks: updated_acks}
end
{:noreply, [], new_state}
end
def handle_info({:DOWN, _ref, _, {client_id, _}, _reason}, %{client_id: client_id} = state) do
state.client.stop_group_coordinator(state.group_coordinator)
state = reset_buffer(state)
schedule_reconnect(state.reconnect_timeout)
{:noreply, [], %{state | group_coordinator: nil}}
end
def handle_info({:DOWN, _ref, _, coord, _reason}, %{group_coordinator: coord} = state) do
state = reset_buffer(state)
schedule_reconnect(state.reconnect_timeout)
{:noreply, [], %{state | group_coordinator: nil}}
end
@impl GenStage
def handle_info(:reconnect, state) do
if state.client.connected?(state.client_id) do
{:noreply, [], connect(state)}
else
schedule_reconnect(state.reconnect_timeout)
{:noreply, [], state}
end
end
@impl GenStage
def handle_info(_, state) do
{:noreply, [], state}
end
@impl Producer
def prepare_for_draining(state) do
# On draining, we will continue scheduling the polls, but they will be a no-op.
{:noreply, [], %{state | shutting_down?: true}}
end
@impl Producer
def prepare_for_start(_module, opts) do
broadway_name = opts[:name]
producers_concurrency = opts[:producer][:concurrency]
[first_processor_entry | other_processors_entries] = opts[:processors]
{allocator, updated_processor_entry} =
build_allocator_spec_and_consumer_entry(
broadway_name,
:processors,
"processor",
producers_concurrency,
first_processor_entry
)
{allocators, updated_batchers_entries} =
Enum.reduce(opts[:batchers], {[allocator], []}, fn entry, {allocators, entries} ->
{allocator, updated_entry} =
build_allocator_spec_and_consumer_entry(
broadway_name,
:batchers,
"batcher_consumer",
producers_concurrency,
entry
)
{[allocator | allocators], [updated_entry | entries]}
end)
updated_opts =
opts
|> Keyword.put(:processors, [updated_processor_entry | other_processors_entries])
|> Keyword.put(:batchers, updated_batchers_entries)
{allocators, updated_opts}
end
@impl :brod_group_member
def get_committed_offsets(_pid, _topics_partitions) do
raise "not implemented"
end
@impl :brod_group_member
def assignments_received(pid, _group_member_id, group_generation_id, received_assignments) do
send(pid, {:put_assignments, group_generation_id, received_assignments})
:ok
end
@impl :brod_group_member
def assignments_revoked(producer_pid) do
GenStage.call(producer_pid, :drain_after_revoke, :infinity)
:ok
end
@impl GenStage
def terminate(_reason, state) do
state.client.stop_group_coordinator(state.group_coordinator)
:ok
end
defp maybe_schedule_poll(%{demand: 0} = state, _interval) do
{:noreply, [], state}
end
defp maybe_schedule_poll(state, interval) do
%{buffer: buffer, demand: demand, acks: acks, receive_timer: receive_timer} = state
case dequeue_many(buffer, acks, demand, []) do
{acks, 0, events, buffer} ->
{:noreply, events, %{state | demand: 0, buffer: buffer, acks: acks}}
{acks, demand, events, buffer} ->
receive_timer = receive_timer || schedule_poll(state, interval)
state = %{
state
| demand: demand,
buffer: buffer,
receive_timer: receive_timer,
acks: acks
}
{:noreply, events, state}
end
end
defp schedule_poll(state, interval) do
for key <- Acknowledger.keys(state.acks) do
Process.send_after(self(), {:poll, key}, interval)
end
Process.send_after(self(), :maybe_schedule_poll, interval)
end
defp fetch_messages_from_kafka(state, key, offset) do
%{
client: client,
client_id: client_id,
config: config
} = state
{generation_id, topic, partition} = key
case client.fetch(client_id, topic, partition, offset, config[:fetch_config], config) do
{:ok, {_watermark_offset, kafka_messages}} ->
Enum.map(kafka_messages, fn k_msg ->
wrap_message(k_msg, topic, partition, generation_id)
end)
{:error, reason} ->
raise "cannot fetch records from Kafka (topic=#{topic} partition=#{partition} " <>
"offset=#{offset}). Reason: #{inspect(reason)}"
end
end
defp wrap_message(kafka_msg, topic, partition, generation_id) do
kafka_message(value: data, offset: offset, key: key, ts: ts, headers: headers) = kafka_msg
ack_data = %{offset: offset}
ack_ref = {self(), {generation_id, topic, partition}}
message = %Message{
data: data,
metadata: %{
topic: topic,
partition: partition,
offset: offset,
key: key,
ts: ts,
headers: headers
},
acknowledger: {Acknowledger, ack_ref, ack_data}
}
Message.put_batch_key(message, {topic, partition})
end
defp connect(state) do
%{client: client, client_id: client_id, config: config} = state
case client.setup(self(), client_id, __MODULE__, config) do
{:ok, group_coordinator} ->
%{state | group_coordinator: group_coordinator}
error ->
raise "Cannot connect to Kafka. Reason #{inspect(error)}"
end
end
defp build_allocator_spec_and_consumer_entry(
broadway_name,
group,
prefix,
producers_concurrency,
consumer_entry
) do
{consumer_name, consumer_config} = consumer_entry
validate_partition_by(group, consumer_name, consumer_config)
consumer_concurrency = consumer_config[:concurrency]
allocator_name = Module.concat([broadway_name, "Allocator_#{prefix}_#{consumer_name}"])
partition_by = &Allocator.fetch!(allocator_name, {&1.metadata.topic, &1.metadata.partition})
new_config = Keyword.put(consumer_config, :partition_by, partition_by)
allocator =
{BroadwayKafka.Allocator, {allocator_name, producers_concurrency, consumer_concurrency}}
allocator_spec = Supervisor.child_spec(allocator, id: allocator_name)
{allocator_spec, {consumer_name, new_config}}
end
defp validate_partition_by(group, consumer_name, consumer_config) do
if Keyword.has_key?(consumer_config, :partition_by) do
raise ArgumentError,
"cannot set option :partition_by for #{group} #{inspect(consumer_name)}. " <>
"The option will be set automatically by BroadwayKafka.Producer"
end
end
## Buffer handling
defp split_demand(list, acks, key, demand) do
{rest, demand, reversed, acc} = reverse_split_demand(list, demand, [], [])
acks = update_last_offset(acks, key, reversed)
{acks, demand, Enum.reverse(acc), rest}
end
defp reverse_split_demand(rest, 0, reversed, acc) do
{rest, 0, reversed, acc}
end
defp reverse_split_demand([], demand, reversed, acc) do
{[], demand, reversed, acc}
end
defp reverse_split_demand([head | tail], demand, reversed, acc) do
reverse_split_demand(tail, demand - 1, [head | reversed], [head | acc])
end
defp enqueue_many(queue, _key, []), do: queue
defp enqueue_many(queue, key, list), do: :queue.in({key, list}, queue)
defp dequeue_many(queue, acks, demand, acc) when demand > 0 do
case :queue.out(queue) do
{{:value, {key, list}}, queue} ->
{rest, demand, reversed, acc} = reverse_split_demand(list, demand, [], acc)
acks = update_last_offset(acks, key, reversed)
case {demand, rest} do
{0, []} ->
{acks, demand, Enum.reverse(acc), queue}
{0, _} ->
{acks, demand, Enum.reverse(acc), :queue.in_r({key, rest}, queue)}
{_, []} ->
dequeue_many(queue, acks, demand, acc)
end
{:empty, queue} ->
{acks, demand, Enum.reverse(acc), queue}
end
end
defp update_last_offset(acks, key, [message | _] = reversed) do
last = message.metadata.offset + 1
offsets = Enum.reduce(reversed, [], &[&1.metadata.offset | &2])
Acknowledger.update_last_offset(acks, key, last, offsets)
end
defp update_last_offset(acks, _key, []) do
acks
end
defp reset_buffer(state) do
put_in(state.buffer, :queue.new())
end
defp schedule_reconnect(timeout) do
Process.send_after(self(), :reconnect, timeout)
end
end