diff --git a/CHANGELOG.md b/CHANGELOG.md index fa83de8..87a54d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - Use new cypher names +- Allow Epmd strategy to reconnect after connection failures - Detect Self Signed Certificate Authority for Kubernetes Strategy - Remove calls to deprecated `Logger.warn/2` diff --git a/lib/strategy/epmd.ex b/lib/strategy/epmd.ex index 6d1364b..13bdfbc 100644 --- a/lib/strategy/epmd.ex +++ b/lib/strategy/epmd.ex @@ -10,21 +10,56 @@ defmodule Cluster.Strategy.Epmd do epmd_example: [ strategy: #{__MODULE__}, config: [ + timeout: 30_000, hosts: [:"a@127.0.0.1", :"b@127.0.0.1"]]]] + An optional timeout can be specified in the config. This is the timeout that + will be used in the GenServer to connect the nodes. This defaults to + `:infinity` meaning that the connection process will only happen when the + worker is started. Any integer timeout will result in the connection process + being triggered. In the example above, it has been configured for 30 seconds. """ + use GenServer use Cluster.Strategy alias Cluster.Strategy.State + @impl true def start_link([%State{config: config} = state]) do case Keyword.get(config, :hosts, []) do [] -> :ignore nodes when is_list(nodes) -> - Cluster.Strategy.connect_nodes(state.topology, state.connect, state.list_nodes, nodes) - :ignore + GenServer.start_link(__MODULE__, [state]) end end + + @impl true + def init([state]) do + connect_hosts(state) + {:ok, state, configured_timeout(state)} + end + + @impl true + def handle_info(:timeout, state) do + handle_info(:connect, state) + end + + def handle_info(:connect, state) do + connect_hosts(state) + {:noreply, state, configured_timeout(state)} + end + + @spec configured_timeout(State.t()) :: integer() | :infinity + defp configured_timeout(%State{config: config}) do + Keyword.get(config, :timeout, :infinity) + end + + @spec connect_hosts(State.t()) :: State.t() + defp connect_hosts(%State{config: config} = state) do + nodes = Keyword.get(config, :hosts, []) + Cluster.Strategy.connect_nodes(state.topology, state.connect, state.list_nodes, nodes) + state + end end diff --git a/test/epmd_test.exs b/test/epmd_test.exs index 2199b5e..3228043 100644 --- a/test/epmd_test.exs +++ b/test/epmd_test.exs @@ -5,23 +5,56 @@ defmodule Cluster.Strategy.EpmdTest do alias Cluster.Strategy.Epmd - import ExUnit.CaptureLog + require Cluster.Nodes describe "start_link/1" do - test "calls right functions" do - capture_log(fn -> - :ignore = - Epmd.start_link([ - %Cluster.Strategy.State{ - topology: :name, - config: [hosts: [:foo@bar]], - connect: {Cluster.Nodes, :connect, [self()]}, - list_nodes: {Cluster.Nodes, :list_nodes, [[]]} - } - ]) - - assert_receive {:connect, :foo@bar}, 5_000 - end) + @tag capture_log: true + test "starts GenServer and connects nodes" do + {:ok, pid} = + Epmd.start_link([ + %Cluster.Strategy.State{ + topology: :name, + config: [hosts: [:foo@bar]], + connect: {Cluster.Nodes, :connect, [self()]}, + list_nodes: {Cluster.Nodes, :list_nodes, [[]]} + } + ]) + + assert is_pid(pid) + + assert_receive {:connect, :foo@bar}, 5_000 + end + + @tag capture_log: true + test "reconnects every time the configured timeout was reached" do + timeout = 500 + start_timestamp = NaiveDateTime.utc_now() + + {:ok, _pid} = + Epmd.start_link([ + %Cluster.Strategy.State{ + topology: :name, + config: [hosts: [:foo@bar], timeout: timeout], + connect: {Cluster.Nodes, :connect, [self()]}, + list_nodes: {Cluster.Nodes, :list_nodes, [[]]} + } + ]) + + # Initial connect + assert_receive {:connect, :foo@bar}, 5_000 + + # First reconnect should not have happened right away, + # but it should happen after a timeout + refute_received {:connect, _} + assert_receive {:connect, :foo@bar}, 2 * timeout + + # A consecutive reconnect should not have happened right away, + # but it should happen after a timeout + refute_received {:connect, _} + assert_receive {:connect, :foo@bar}, 2 * timeout + + duration = NaiveDateTime.diff(NaiveDateTime.utc_now(), start_timestamp, :millisecond) + assert duration > 2 * timeout end end end