diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3e258a4..887ce3f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,8 +36,8 @@ jobs: elixir: 1.7.4 otp: 21.3.8.17 - pair: - elixir: 1.11.3 - otp: 23.2.5 + elixir: 1.15.6 + otp: 26.1.1 lint: lint steps: - uses: actions/checkout@v2 diff --git a/lib/broadway_kafka/brod_client.ex b/lib/broadway_kafka/brod_client.ex index d6ed97c..e291eb3 100644 --- a/lib/broadway_kafka/brod_client.ex +++ b/lib/broadway_kafka/brod_client.ex @@ -47,6 +47,8 @@ defmodule BroadwayKafka.BrodClient do @default_begin_offset :assigned + @default_shared_client false + @impl true def init(opts) do with {:ok, hosts} <- validate(opts, :hosts, required: true), @@ -62,29 +64,34 @@ defmodule BroadwayKafka.BrodClient do validate(opts, :offset_reset_policy, default: @default_offset_reset_policy), {:ok, begin_offset} <- validate(opts, :begin_offset, default: @default_begin_offset), + {:ok, shared_client} <- + validate(opts, :shared_client, default: @default_shared_client), {:ok, group_config} <- validate_group_config(opts), {:ok, fetch_config} <- validate_fetch_config(opts), {:ok, client_config} <- validate_client_config(opts) do - {:ok, - %{ - hosts: parse_hosts(hosts), - group_id: group_id, - topics: topics, - receive_interval: receive_interval, - reconnect_timeout: reconnect_timeout, - offset_commit_on_ack: offset_commit_on_ack, - offset_reset_policy: offset_reset_policy, - begin_offset: begin_offset, - group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config], - fetch_config: Map.new(fetch_config || []), - client_config: client_config - }} + config = %{ + hosts: parse_hosts(hosts), + group_id: group_id, + topics: topics, + receive_interval: receive_interval, + reconnect_timeout: reconnect_timeout, + offset_commit_on_ack: offset_commit_on_ack, + offset_reset_policy: offset_reset_policy, + begin_offset: begin_offset, + group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config], + fetch_config: Map.new(fetch_config || []), + client_config: client_config, + shared_client: shared_client, + shared_client_id: build_shared_client_id(opts) + } + + {:ok, shared_client_child_spec(config), config} end end @impl true def setup(stage_pid, client_id, callback_module, config) do - with :ok <- :brod.start_client(config.hosts, client_id, config.client_config), + with :ok <- do_start_brod_client(config.hosts, client_id, config.client_config), {:ok, group_coordinator} <- start_link_group_coordinator(stage_pid, client_id, callback_module, config) do Process.monitor(client_id) @@ -147,6 +154,19 @@ defmodule BroadwayKafka.BrodClient do end end + defp shared_client_child_spec(%{shared_client: false}), do: [] + + defp shared_client_child_spec(%{shared_client: true} = config) do + [ + %{ + id: config.shared_client_id, + start: + {:brod, :start_link_client, + [config.hosts, config.shared_client_id, config.client_config]} + } + ] + end + defp lookup_offset(hosts, topic, partition, policy, client_config) do case :brod.resolve_offset(hosts, topic, partition, policy, client_config) do {:ok, offset} -> @@ -268,6 +288,9 @@ defmodule BroadwayKafka.BrodClient do defp validate_option(:client_id_prefix, value) when not is_binary(value), do: validation_error(:client_id_prefix, "a string", value) + defp validate_option(:shared_client, value) when not is_boolean(value), + do: validation_error(:shared_client, "a boolean", value) + defp validate_option(:sasl, :undefined), do: {:ok, :undefined} @@ -387,4 +410,29 @@ defmodule BroadwayKafka.BrodClient do end defp parse_hosts(hosts), do: hosts + + defp build_shared_client_id(opts) do + if opts[:shared_client] do + prefix = get_in(opts, [:client_config, :client_id_prefix]) + broadway_name = opts[:broadway][:name] + :"#{prefix}#{Module.concat(broadway_name, SharedClient)}" + end + end + + defp do_start_brod_client(hosts, client_id, client_config) do + case :brod.start_client(hosts, client_id, client_config) do + :ok -> + :ok + + # Because we are starting the client on the broadway supervison tree + # instead of the :brod supervisor, the already_started error + # is not properly handled by :brod.start_client/3 for shared clients + # So we must handle it here. + {:error, {{:already_started, _}, _}} -> + :ok + + error -> + error + end + end end diff --git a/lib/broadway_kafka/kafka_client.ex b/lib/broadway_kafka/kafka_client.ex index 5ba44d2..dda9c3e 100644 --- a/lib/broadway_kafka/kafka_client.ex +++ b/lib/broadway_kafka/kafka_client.ex @@ -9,7 +9,9 @@ defmodule BroadwayKafka.KafkaClient do offset_commit_on_ack: boolean, topics: [:brod.topic()], group_config: keyword, - client_config: keyword + client_config: keyword, + shared_client: boolean(), + shared_client_id: atom() | nil } @typep offset_reset_policy :: :earliest | :latest diff --git a/lib/broadway_kafka/producer.ex b/lib/broadway_kafka/producer.ex index 40e9728..0a42952 100644 --- a/lib/broadway_kafka/producer.ex +++ b/lib/broadway_kafka/producer.ex @@ -49,6 +49,10 @@ defmodule BroadwayKafka.Producer do When set to `:reset`, the starting offset will be dictated by the `:offset_reset_policy` option, either starting from the `:earliest` or the `:latest` offsets of the topic. Default is `:assigned`. + * `:shared_client` - Optional. A boolean that defines how many clients will be started. + If `true`, only one shared client will be started for all producers, if `false` each producer + will have it's own client. Default is `false` + * `:group_config` - Optional. A list of options used to configure the group coordinator. See the ["Group config options"](#module-group-config-options) section below for a list of all available options. @@ -231,51 +235,48 @@ defmodule BroadwayKafka.Producer do def init(opts) do Process.flag(:trap_exit, true) - client = opts[:client] || BroadwayKafka.BrodClient + config = opts[:initialized_client_config] - case client.init(opts) do - {:error, message} -> - raise ArgumentError, "invalid options given to #{inspect(client)}.init/1, " <> message + draining_after_revoke_flag = + self() + |> drain_after_revoke_table_name!() + |> drain_after_revoke_table_init!() - {:ok, config} -> - {_, producer_name} = Process.info(self(), :registered_name) + prefix = get_in(config, [:client_config, :client_id_prefix]) - draining_after_revoke_flag = - self() - |> drain_after_revoke_table_name!() - |> drain_after_revoke_table_init!() + {_, producer_name} = Process.info(self(), :registered_name) - prefix = get_in(config, [:client_config, :client_id_prefix]) - client_id = :"#{prefix}#{Module.concat([producer_name, Client])}" + client_id = + config[:shared_client_id] || :"#{prefix}#{Module.concat([producer_name, Client])}" - max_demand = - with [{_first, processor_opts}] <- opts[:broadway][:processors], - max_demand when is_integer(max_demand) <- processor_opts[:max_demand] do - max_demand - else - _ -> 10 - end + max_demand = + with [{_first, processor_opts}] <- opts[:broadway][:processors], + max_demand when is_integer(max_demand) <- processor_opts[:max_demand] do + max_demand + else + _ -> 10 + end - state = %{ - client: client, - client_id: client_id, - group_coordinator: nil, - receive_timer: nil, - receive_interval: config.receive_interval, - reconnect_timeout: config.reconnect_timeout, - acks: Acknowledger.new(), - config: config, - allocator_names: allocator_names(opts[:broadway]), - revoke_caller: nil, - draining_after_revoke_flag: draining_after_revoke_flag, - demand: 0, - shutting_down?: false, - buffer: :queue.new(), - max_demand: max_demand - } + state = %{ + client: opts[:client] || BroadwayKafka.BrodClient, + client_id: client_id, + group_coordinator: nil, + receive_timer: nil, + receive_interval: config.receive_interval, + reconnect_timeout: config.reconnect_timeout, + acks: Acknowledger.new(), + config: config, + allocator_names: allocator_names(opts[:broadway]), + revoke_caller: nil, + draining_after_revoke_flag: draining_after_revoke_flag, + demand: 0, + shutting_down?: false, + buffer: :queue.new(), + max_demand: max_demand, + shared_client: config.shared_client + } - {:producer, connect(state)} - end + {:producer, connect(state)} end defp allocator_names(broadway_config) do @@ -509,7 +510,23 @@ defmodule BroadwayKafka.Producer do |> Keyword.put(:processors, [updated_processor_entry | other_processors_entries]) |> Keyword.put(:batchers, updated_batchers_entries) - {allocators, updated_opts} + {producer_mod, producer_opts} = opts[:producer][:module] + + client = producer_opts[:client] || BroadwayKafka.BrodClient + + case client.init(Keyword.put(producer_opts, :broadway, opts)) do + {:error, message} -> + raise ArgumentError, "invalid options given to #{client}.init/1, " <> message + + {:ok, extra_child_specs, config} -> + new_producer_opts = + Keyword.put(producer_opts, :initialized_client_config, config) + + updated_opts = + put_in(updated_opts, [:producer, :module], {producer_mod, new_producer_opts}) + + {allocators ++ extra_child_specs, updated_opts} + end end @impl :brod_group_member @@ -547,7 +564,11 @@ defmodule BroadwayKafka.Producer do def terminate(_reason, state) do %{client: client, group_coordinator: group_coordinator, client_id: client_id} = state group_coordinator && Process.exit(group_coordinator, :shutdown) - client.disconnect(client_id) + + if state.shared_client == false do + client.disconnect(client_id) + end + :ok end diff --git a/test/brod_client_test.exs b/test/brod_client_test.exs index 83fad3d..121ad05 100644 --- a/test/brod_client_test.exs +++ b/test/brod_client_test.exs @@ -33,16 +33,16 @@ defmodule BroadwayKafka.BrodClientTest do assert BrodClient.init(opts) == {:error, expected_msg <> ~s/"host:9092,"/} opts = Keyword.put(@opts, :hosts, host: 9092) - assert {:ok, %{hosts: [host: 9092]}} = BrodClient.init(opts) + assert {:ok, [], %{hosts: [host: 9092]}} = BrodClient.init(opts) opts = Keyword.put(@opts, :hosts, [{"host", 9092}]) - assert {:ok, %{hosts: [{"host", 9092}]}} = BrodClient.init(opts) + assert {:ok, [], %{hosts: [{"host", 9092}]}} = BrodClient.init(opts) opts = Keyword.put(@opts, :hosts, "host:9092") - assert {:ok, %{hosts: [{"host", 9092}]}} = BrodClient.init(opts) + assert {:ok, [], %{hosts: [{"host", 9092}]}} = BrodClient.init(opts) opts = Keyword.put(@opts, :hosts, "host1:9092,host2:9092") - assert {:ok, %{hosts: [{"host1", 9092}, {"host2", 9092}]}} = BrodClient.init(opts) + assert {:ok, [], %{hosts: [{"host1", 9092}, {"host2", 9092}]}} = BrodClient.init(opts) end test ":group_id is a required string" do @@ -55,7 +55,7 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :group_id to be a non empty string, got: :an_atom"} opts = Keyword.put(@opts, :group_id, "my_group") - assert {:ok, %{group_id: "my_group"}} = BrodClient.init(opts) + assert {:ok, [], %{group_id: "my_group"}} = BrodClient.init(opts) end test ":topics is a required list of strings" do @@ -68,12 +68,12 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :topics to be a list of strings, got: :an_atom"} opts = Keyword.put(@opts, :topics, ["topic_1", "topic_2"]) - assert {:ok, %{topics: ["topic_1", "topic_2"]}} = BrodClient.init(opts) + assert {:ok, [], %{topics: ["topic_1", "topic_2"]}} = BrodClient.init(opts) end test ":receive_interval is a non-negative integer with default value 2000" do opts = Keyword.delete(@opts, :receive_interval) - assert {:ok, %{receive_interval: 2000}} = BrodClient.init(opts) + assert {:ok, [], %{receive_interval: 2000}} = BrodClient.init(opts) opts = Keyword.put(@opts, :receive_interval, :an_atom) @@ -81,11 +81,11 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :receive_interval to be a non-negative integer, got: :an_atom"} opts = Keyword.put(@opts, :receive_interval, 1000) - assert {:ok, %{receive_interval: 1000}} = BrodClient.init(opts) + assert {:ok, [], %{receive_interval: 1000}} = BrodClient.init(opts) end test ":reconnect_timeout is a non-negative integer with default value 1000" do - assert {:ok, %{reconnect_timeout: 1000}} = BrodClient.init(@opts) + assert {:ok, [], %{reconnect_timeout: 1000}} = BrodClient.init(@opts) opts = Keyword.put(@opts, :reconnect_timeout, :an_atom) @@ -93,11 +93,11 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :reconnect_timeout to be a non-negative integer, got: :an_atom"} opts = Keyword.put(@opts, :reconnect_timeout, 2000) - assert {:ok, %{reconnect_timeout: 2000}} = BrodClient.init(opts) + assert {:ok, [], %{reconnect_timeout: 2000}} = BrodClient.init(opts) end test ":offset_commit_on_ack is a boolean with default value true" do - assert {:ok, %{offset_commit_on_ack: true}} = BrodClient.init(@opts) + assert {:ok, [], %{offset_commit_on_ack: true}} = BrodClient.init(@opts) opts = Keyword.put(@opts, :offset_commit_on_ack, :an_atom) @@ -105,11 +105,11 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :offset_commit_on_ack to be a boolean, got: :an_atom"} opts = Keyword.put(@opts, :offset_commit_on_ack, false) - assert {:ok, %{offset_commit_on_ack: false}} = BrodClient.init(opts) + assert {:ok, [], %{offset_commit_on_ack: false}} = BrodClient.init(opts) end test ":offset_reset_policy can be :earliest or :latest. Default is :latest" do - assert {:ok, %{offset_reset_policy: :latest}} = BrodClient.init(@opts) + assert {:ok, [], %{offset_reset_policy: :latest}} = BrodClient.init(@opts) opts = Keyword.put(@opts, :offset_reset_policy, :an_atom) @@ -118,14 +118,14 @@ defmodule BroadwayKafka.BrodClientTest do "expected :offset_reset_policy to be one of [:earliest, :latest], got: :an_atom"} opts = Keyword.put(@opts, :offset_reset_policy, :earliest) - assert {:ok, %{offset_reset_policy: :earliest}} = BrodClient.init(opts) + assert {:ok, [], %{offset_reset_policy: :earliest}} = BrodClient.init(opts) opts = Keyword.put(@opts, :offset_reset_policy, :latest) - assert {:ok, %{offset_reset_policy: :latest}} = BrodClient.init(opts) + assert {:ok, [], %{offset_reset_policy: :latest}} = BrodClient.init(opts) end test ":begin_offset can be :assigned or :reset. Default is :assigned" do - assert {:ok, %{begin_offset: :assigned}} = BrodClient.init(@opts) + assert {:ok, [], %{begin_offset: :assigned}} = BrodClient.init(@opts) opts = Keyword.put(@opts, :begin_offset, :an_atom) @@ -133,10 +133,10 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :begin_offset to be one of [:assigned, :reset], got: :an_atom"} opts = Keyword.put(@opts, :begin_offset, :assigned) - assert {:ok, %{begin_offset: :assigned}} = BrodClient.init(opts) + assert {:ok, [], %{begin_offset: :assigned}} = BrodClient.init(opts) opts = Keyword.put(@opts, :begin_offset, :reset) - assert {:ok, %{begin_offset: :reset}} = BrodClient.init(opts) + assert {:ok, [], %{begin_offset: :reset}} = BrodClient.init(opts) end test ":offset_commit_interval_seconds is an optional non-negative integer" do @@ -148,7 +148,7 @@ defmodule BroadwayKafka.BrodClientTest do "a positive integer, got: :an_atom"} opts = put_in(@opts, [:group_config, :offset_commit_interval_seconds], 3) - {:ok, %{group_config: group_config}} = BrodClient.init(opts) + {:ok, [], %{group_config: group_config}} = BrodClient.init(opts) assert group_config[:offset_commit_interval_seconds] == 3 end @@ -160,7 +160,7 @@ defmodule BroadwayKafka.BrodClientTest do "expected :rejoin_delay_seconds to be a non-negative integer, got: :an_atom"} opts = put_in(@opts, [:group_config, :rejoin_delay_seconds], 3) - {:ok, %{group_config: group_config}} = BrodClient.init(opts) + {:ok, [], %{group_config: group_config}} = BrodClient.init(opts) assert group_config[:rejoin_delay_seconds] == 3 end @@ -172,7 +172,7 @@ defmodule BroadwayKafka.BrodClientTest do "expected :session_timeout_seconds to be a positive integer, got: :an_atom"} opts = put_in(@opts, [:group_config, :session_timeout_seconds], 3) - {:ok, %{group_config: group_config}} = BrodClient.init(opts) + {:ok, [], %{group_config: group_config}} = BrodClient.init(opts) assert group_config[:session_timeout_seconds] == 3 end @@ -184,7 +184,7 @@ defmodule BroadwayKafka.BrodClientTest do "expected :heartbeat_rate_seconds to be a positive integer, got: :an_atom"} opts = put_in(@opts, [:group_config, :heartbeat_rate_seconds], 3) - {:ok, %{group_config: group_config}} = BrodClient.init(opts) + {:ok, [], %{group_config: group_config}} = BrodClient.init(opts) assert group_config[:heartbeat_rate_seconds] == 3 end @@ -196,7 +196,7 @@ defmodule BroadwayKafka.BrodClientTest do "expected :rebalance_timeout_seconds to be a positive integer, got: :an_atom"} opts = put_in(@opts, [:group_config, :rebalance_timeout_seconds], 3) - {:ok, %{group_config: group_config}} = BrodClient.init(opts) + {:ok, [], %{group_config: group_config}} = BrodClient.init(opts) assert group_config[:rebalance_timeout_seconds] == 3 end @@ -207,7 +207,7 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :min_bytes to be a positive integer, got: :an_atom"} opts = put_in(@opts, [:fetch_config, :min_bytes], 3) - {:ok, %{fetch_config: fetch_config}} = BrodClient.init(opts) + {:ok, [], %{fetch_config: fetch_config}} = BrodClient.init(opts) assert fetch_config[:min_bytes] == 3 end @@ -218,7 +218,7 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :max_bytes to be a positive integer, got: :an_atom"} opts = put_in(@opts, [:fetch_config, :max_bytes], 3) - {:ok, %{fetch_config: fetch_config}} = BrodClient.init(opts) + {:ok, [], %{fetch_config: fetch_config}} = BrodClient.init(opts) assert fetch_config[:max_bytes] == 3 end @@ -228,11 +228,11 @@ defmodule BroadwayKafka.BrodClientTest do assert BrodClient.init(opts) == {:error, "expected :max_wait_time to be a positive integer, got: :an_atom"} - {:ok, %{fetch_config: fetch_config}} = BrodClient.init(@opts) + {:ok, [], %{fetch_config: fetch_config}} = BrodClient.init(@opts) assert not Map.has_key?(fetch_config, :max_wait_time) opts = put_in(@opts, [:fetch_config, :max_wait_time], 3) - {:ok, %{fetch_config: fetch_config}} = BrodClient.init(opts) + {:ok, [], %{fetch_config: fetch_config}} = BrodClient.init(opts) assert fetch_config[:max_wait_time] == 3 end @@ -244,7 +244,7 @@ defmodule BroadwayKafka.BrodClientTest do opts = put_in(@opts, [:client_config, :client_id_prefix], "a string") - assert {:ok, + assert {:ok, [], %{ client_config: [ client_id_prefix: "a string" @@ -267,7 +267,7 @@ defmodule BroadwayKafka.BrodClientTest do opts = put_in(@opts, [:client_config, :sasl], {:plain, "username", "password"}) - assert {:ok, + assert {:ok, [], %{ client_config: [ sasl: {:plain, "username", "password"} @@ -278,7 +278,7 @@ defmodule BroadwayKafka.BrodClientTest do test ":sasl is an optional tuple of :callback, SASL Authentication Plugin module and opts" do opts = put_in(@opts, [:client_config, :sasl], {:callback, FakeSaslMechanismPlugin, {}}) - assert {:ok, + assert {:ok, [], %{ client_config: [ sasl: {:callback, FakeSaslMechanismPlugin, {}} @@ -300,7 +300,7 @@ defmodule BroadwayKafka.BrodClientTest do certfile: "client.crt" ) - assert {:ok, + assert {:ok, [], %{ client_config: [ ssl: [cacertfile: "ca.crt", keyfile: "client.key", certfile: "client.crt"] @@ -309,7 +309,7 @@ defmodule BroadwayKafka.BrodClientTest do opts = put_in(@opts, [:client_config, :ssl], true) - assert {:ok, + assert {:ok, [], %{ client_config: [ssl: true] }} = BrodClient.init(opts) @@ -323,7 +323,7 @@ defmodule BroadwayKafka.BrodClientTest do opts = put_in(@opts, [:client_config, :connect_timeout], 5000) - assert {:ok, + assert {:ok, [], %{ client_config: [ connect_timeout: 5000 @@ -345,7 +345,7 @@ defmodule BroadwayKafka.BrodClientTest do opts = put_in(@opts, [:client_config, :request_timeout], 5000) - assert {:ok, + assert {:ok, [], %{ client_config: [ request_timeout: 5000 @@ -361,7 +361,61 @@ defmodule BroadwayKafka.BrodClientTest do opts = put_in(@opts, [:client_config, :query_api_versions], false) - assert {:ok, %{client_config: [query_api_versions: false]}} = BrodClient.init(opts) + assert {:ok, [], %{client_config: [query_api_versions: false]}} = BrodClient.init(opts) + end + + test ":shared_client is an optional boolean" do + opts = Keyword.put(@opts, :shared_client, "true") + + assert BrodClient.init(opts) == + {:error, "expected :shared_client to be a boolean, got: \"true\""} + + opts = + @opts + |> Keyword.put(:shared_client, true) + |> Keyword.put(:broadway, name: :my_broadway_name) + |> put_in([:client_config, :client_id_prefix], "my_prefix.") + + assert {:ok, _specs, %{shared_client: true}} = BrodClient.init(opts) + end + + test "return shared_client_id when :shared_client is true" do + opts = + @opts + |> Keyword.put(:shared_client, true) + |> Keyword.put(:broadway, name: :my_broadway_name) + |> put_in([:client_config, :client_id_prefix], "my_prefix.") + + assert {:ok, child_specs, + %{ + shared_client: true, + shared_client_id: :"my_prefix.Elixir.my_broadway_name.SharedClient" + }} = + BrodClient.init(opts) + + assert [ + %{ + id: shared_client_id, + start: {:brod, :start_link_client, [hosts, shared_client_id, client_config]} + } + ] = child_specs + + assert [{:host, 9092}] = hosts + assert :"my_prefix.Elixir.my_broadway_name.SharedClient" = shared_client_id + assert [client_id_prefix: "my_prefix."] = client_config + + opts = + @opts + |> Keyword.put(:shared_client, false) + |> Keyword.put(:broadway, name: :my_broadway_name) + |> put_in([:client_config, :client_id_prefix], "my_prefix.") + + assert {:ok, [], + %{ + shared_client: false, + shared_client_id: nil + }} = + BrodClient.init(opts) end end diff --git a/test/integration/consume_test.exs b/test/integration/consume_test.exs index 02419e1..5071264 100644 --- a/test/integration/consume_test.exs +++ b/test/integration/consume_test.exs @@ -130,7 +130,270 @@ defmodule BroadwayKafka.ConsumerTest do end # Let's wait a bit to see if we get more messages - Process.sleep(1000) + Process.sleep(2000) + + messages = Agent.get(messages_agent, & &1) + + on_exit(fn -> + stop_broadway(broadway_pid) + end) + + {:ok, %{broadway_pid: broadway_pid, messages: messages}} + end + + test "number of processed messages = total messages ", %{messages: messages} do + assert length(messages) == Config.n_messages() + end + + test "messages are not duplicated", %{messages: messages} do + messages_with_count = + Enum.reduce(messages, %{}, fn msg, acc -> + Map.update(acc, msg.data, %{count: 1, list: [msg]}, fn %{count: count, list: list} -> + %{list: [msg | list], count: count + 1} + end) + end) + + duplicated_messages = Enum.filter(messages_with_count, fn {_k, v} -> v.count > 1 end) + + assert duplicated_messages == [] + end + + test "order of messages and offsets", %{messages: messages} do + assert get_ordering_proplems(messages) == [] + end + + defp reset_topic(topic) do + brokers = [{"localhost", 9092}] + + :brod.delete_topics(brokers, [topic], 1_000) + + topic_config = [ + %{ + num_partitions: 3, + replication_factor: 1, + name: topic, + assignments: [], + configs: [] + } + ] + + wait_until_create_topic(brokers, topic_config, %{timeout: 1_000}) + end + + defp wait_until_create_topic(brokers, topic_config, opts) do + case :brod.create_topics(brokers, topic_config, opts) do + :ok -> + :ok + + _error -> + Process.sleep(10) + wait_until_create_topic(brokers, topic_config, opts) + end + end + + defp send_messages(n_messages, hosts, topic) do + client_id = :test_client + :ok = :brod.start_client(hosts, client_id, _client_config = []) + :ok = :brod.start_producer(client_id, topic, _producer_config = []) + + Enum.each(1..n_messages, fn i -> + partition = rem(i, 3) + :ok = :brod.produce_sync(client_id, topic, partition, _key = "", "#{i}") + end) + + :brod.stop_client(client_id) + end + + defp start_broadway() do + {:ok, messages_agent} = Agent.start_link(fn -> [] end) + context = %{messages_agent: messages_agent, caller_pid: self()} + {:ok, broadway_pid} = MyBroadway.start_link(context) + {broadway_pid, messages_agent} + end + + defp stop_broadway(pid) do + ref = Process.monitor(pid) + Process.exit(pid, :normal) + + receive do + {:DOWN, ^ref, _, _, _} -> :ok + end + end + + defp get_ordering_proplems(messages) do + init_acc = %{last_messages: %{0 => nil, 1 => nil, 2 => nil}, problems: []} + + %{problems: ordering_problems} = + Enum.reduce(messages, init_acc, fn msg, acc -> + %{last_messages: last_messages, problems: problems} = acc + partition = msg.metadata.partition + + problems = + case last_messages[partition] do + nil -> + problems + + last_message -> + if String.to_integer(msg.data) <= String.to_integer(last_message.data) do + message = + "Data out of order #{msg.data}->#{last_message.data} in partition #{partition}" + + [message | problems] + else + problems + end + end + + last_messages = Map.put(last_messages, partition, msg) + %{acc | problems: Enum.reverse(problems), last_messages: last_messages} + end) + + Enum.reverse(ordering_problems) + end + + defp wait_for_assignments(broadway_name) do + producers = + broadway_name + |> Broadway.producer_names() + |> Enum.map(fn producer -> + pid = Process.whereis(producer) + :erlang.trace(pid, true, [:receive, tracer: self()]) + pid + end) + + Enum.each(producers, fn pid -> + receive do + {:trace, ^pid, :receive, {:put_assignments, _, _}} -> + IO.puts("Assignment received. Producer: #{inspect(pid)}") + end + end) + end +end + +defmodule BroadwayKafka.ConsumerSharedClientTest do + @moduledoc """ + Kafka integration tests. + + # Setup + + 1. Run Docker + $ docker compose up -d + + # Running only integration tests + + mix test --only integration + + # Running all tests + + mix test --include integration + """ + + use ExUnit.Case + require Logger + + @moduletag :integration + + alias BroadwayKafka.ConsumerTest.Config + + defmodule MyBroadway do + use Broadway + + alias BroadwayKafka.ConsumerTest.Config + + def start_link(context) do + Broadway.start_link(__MODULE__, + name: __MODULE__, + context: context, + producer: [ + module: + {BroadwayKafka.Producer, + [ + hosts: [localhost: 9092], + group_id: "brod_my_group_2", + topics: ["test_2"], + receive_interval: 100, + group_config: [ + offset_commit_interval_seconds: 1, + rejoin_delay_seconds: 2 + ], + fetch_config: [ + max_bytes: 10_240 + ], + shared_client: true + ]}, + concurrency: 3 + ], + processors: [ + default: [ + concurrency: 3 + ] + ], + batchers: [ + default: [ + batch_size: 20, + batch_timeout: 50, + concurrency: 4 + ] + ] + ) + end + + def handle_message(_, message, %{caller_pid: caller_pid}) do + if message.data in Config.last_messages() do + send(caller_pid, {:last_message, message.metadata.partition, message.data}) + end + + message + end + + def handle_batch(_, messages, _info, %{messages_agent: messages_agent}) do + Agent.update(messages_agent, fn list -> list ++ messages end) + last_message = List.last(messages) + last_offset = last_message.metadata.offset + partition = last_message.metadata.partition + + IO.puts( + "Batch handled with #{length(messages)} messages. " <> + "Partition: #{partition} Last offset: #{last_offset}" + ) + + messages + end + end + + setup_all do + topic = "test_2" + hosts = [localhost: 9092] + + reset_topic(topic) + + {broadway_pid, messages_agent} = start_broadway() + + # Let's wait for the assignments before start sending messages + wait_for_assignments(MyBroadway) + + IO.puts("Sending messages...") + send_messages(Config.n_messages(), hosts, topic) + + [last_message_2, last_message_0, last_message_1] = Config.last_messages() + + receive do + {:last_message, 2, ^last_message_2} -> + IO.puts("Got last message from partition 2") + end + + receive do + {:last_message, 0, ^last_message_0} -> + IO.puts("Got last message from partition 0") + end + + receive do + {:last_message, 1, ^last_message_1} -> + IO.puts("Got last message from partition 1") + end + + # Let's wait a bit to see if we get more messages + Process.sleep(2000) messages = Agent.get(messages_agent, & &1) diff --git a/test/producer_test.exs b/test/producer_test.exs index a1afb39..47f0192 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -43,12 +43,12 @@ defmodule BroadwayKafka.ProducerTest do defrecord :kafka_message, extract(:kafka_message, from_lib: "brod/include/brod.hrl") @impl true - def init(opts), do: {:ok, Map.new(opts)} + def init(opts), do: {:ok, opts[:child_specs], Map.new(opts)} @impl true def setup(_stage_pid, client_id, _callback_module, config) do if !Process.whereis(client_id) do - {:ok, _pid} = Agent.start(fn -> true end, name: client_id) + {:ok, _pid} = Agent.start(fn -> Map.put(config, :connected, true) end, name: client_id) Process.monitor(client_id) end @@ -96,14 +96,17 @@ defmodule BroadwayKafka.ProducerTest do def connected?(client_id) do connected? = if pid = Process.whereis(client_id) do - Process.alive?(pid) && Agent.get(client_id, & &1) + Process.alive?(pid) && Agent.get(client_id, fn config -> config.connected end) end connected? end @impl true - def disconnect(_client_id) do + def disconnect(client_id) do + test_pid = Agent.get(client_id, fn config -> config.test_pid end) + send(test_pid, :disconnected) + :ok end @@ -238,6 +241,44 @@ defmodule BroadwayKafka.ProducerTest do stop_broadway(pid) end + test "start all child process returned by config" do + {:ok, message_server} = MessageServer.start_link() + + parent_pid = self() + + child_specs = [ + Supervisor.child_spec( + {Task, fn -> send(parent_pid, :child_started_1) end}, + id: :child_started_1 + ), + Supervisor.child_spec( + {Task, fn -> send(parent_pid, :child_started_2) end}, + id: :child_started_2 + ) + ] + + {:ok, pid} = start_broadway(message_server, shared_client: true, child_specs: child_specs) + + assert_receive :child_started_1 + assert_receive :child_started_2 + + stop_broadway(pid) + end + + test "should not disconnect client if shared_client true" do + {:ok, message_server} = MessageServer.start_link() + {:ok, pid} = start_broadway(message_server, shared_client: false) + stop_broadway(pid) + + assert_receive :disconnected + + {:ok, message_server} = MessageServer.start_link() + {:ok, pid} = start_broadway(message_server, shared_client: true) + stop_broadway(pid) + + refute_receive :disconnected + end + test "single producer receiving messages from multiple topic/partitions" do {:ok, message_server} = MessageServer.start_link() {:ok, pid} = start_broadway(message_server) @@ -384,13 +425,29 @@ defmodule BroadwayKafka.ProducerTest do [topic: "topic_2", partition: 0, begin_offset: 301] ]) - MessageServer.push_messages(message_server, 1..50, topic: "topic_1", partition: 0, offset: 110) + MessageServer.push_messages(message_server, 1..50, + topic: "topic_1", + partition: 0, + offset: 110 + ) - MessageServer.push_messages(message_server, 1..50, topic: "topic_1", partition: 1, offset: 210) + MessageServer.push_messages(message_server, 1..50, + topic: "topic_1", + partition: 1, + offset: 210 + ) - MessageServer.push_messages(message_server, 1..50, topic: "topic_2", partition: 0, offset: 310) + MessageServer.push_messages(message_server, 1..50, + topic: "topic_2", + partition: 0, + offset: 310 + ) - MessageServer.push_messages(message_server, 1..50, topic: "topic_2", partition: 1, offset: 410) + MessageServer.push_messages(message_server, 1..50, + topic: "topic_2", + partition: 1, + offset: 410 + ) assert_receive {:batch_handled, %{topic: "topic_1", partition: 0, pid: consumer_1}} assert_receive {:batch_handled, %{topic: "topic_1", partition: 1, pid: consumer_2}} @@ -569,10 +626,10 @@ defmodule BroadwayKafka.ProducerTest do Process.exit(Process.whereis(client_id), :kill) refute_receive {:setup, _} - {:ok, _} = Agent.start(fn -> false end, name: client_id) + {:ok, _} = Agent.start_link(fn -> %{test_pid: self(), connected: false} end, name: client_id) refute_receive {:setup, _} - Agent.update(client_id, fn _ -> true end) + Agent.update(client_id, fn state -> Map.put(state, :connected, true) end) assert_receive {:setup, ^client_id} stop_broadway(pid) @@ -630,7 +687,9 @@ defmodule BroadwayKafka.ProducerTest do max_bytes: 10, offset_commit_on_ack: false, begin_offset: :assigned, - ack_raises_on_offset: ack_raises_on_offset + ack_raises_on_offset: ack_raises_on_offset, + shared_client: opts[:shared_client] || false, + child_specs: opts[:child_specs] || [] ]}, concurrency: producers_concurrency ],