From 29edb2117f12af6ce3cd530e4002960af160461f Mon Sep 17 00:00:00 2001 From: "busariev.r" Date: Tue, 9 Apr 2024 15:38:13 +0300 Subject: [PATCH] feat: complex pod selection --- lib/client/client.ex | 34 ++++++++++++++++++++++++++-------- test/client_test.exs | 27 +++++++++++++++++++++++++-- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/lib/client/client.ex b/lib/client/client.ex index b49c4e9..10fba02 100644 --- a/lib/client/client.ex +++ b/lib/client/client.ex @@ -16,16 +16,18 @@ defmodule KubeRPC.Client do defp get_timeout(_), do: config()[:timeout] || 5_000 - defp do_run(basename, module, function, args, timeout, attempts, skip_servers) do + defp do_run(selector, module, function, args, timeout, attempts, skip_servers) do + basename = get_basename(selector) + with :ok <- check_attempts(attempts), - servers <- filter_servers(basename, skip_servers), + servers <- filter_servers(selector, skip_servers), {:ok, server} <- get_random_rpc_server(servers), - {:ok, pid} <- get_rpc_server_process_pid(basename, server), + {:ok, pid} <- get_rpc_server_process_pid(selector, server), {:ok, response} <- call_rpc(pid, server, module, function, args, timeout) do response else {:error, {:bad_server, server}} -> - do_run(basename, module, function, args, timeout, attempts + 1, [server | skip_servers]) + do_run(selector, module, function, args, timeout, attempts + 1, [server | skip_servers]) {:error, :too_many_attempts} -> Logger.warn("Failed RPC request to: #{basename}. #{module}.#{function}: #{sanitized_inspect(args)}") @@ -37,6 +39,12 @@ defmodule KubeRPC.Client do end end + defp get_basename({basename, _regex}), + do: basename + + defp get_basename(basename), + do: basename + defp check_attempts(attempts) do cond do attempts >= config()[:max_attempts] -> @@ -47,15 +55,25 @@ defmodule KubeRPC.Client do end end - defp filter_servers(basename, skip_servers) do + defp filter_servers(selector, skip_servers) do Node.list() - |> Enum.filter(fn node -> - case String.split(to_string(node), "@") do + |> filter_servers_by_selector(selector) + |> Enum.filter(fn server -> server not in skip_servers end) + end + + defp filter_servers_by_selector(servers, {basename, regex}) do + servers + |> filter_servers_by_selector(basename) + |> Enum.filter(&(&1 |> to_string() |> String.match?(regex))) + end + + defp filter_servers_by_selector(servers, basename) do + Enum.filter(servers, fn server -> + case String.split(to_string(server), "@") do [^basename | _] -> true _ -> false end end) - |> Enum.filter(fn server -> server not in skip_servers end) end # Invalid basename or all servers are down diff --git a/test/client_test.exs b/test/client_test.exs index 83fad07..0714c9c 100644 --- a/test/client_test.exs +++ b/test/client_test.exs @@ -11,7 +11,7 @@ defmodule KubeRPC.ClientTest do alias KubeRPC.TestClient - test "returns :ok from remote server handler" do + test "returns :ok from remote server handler when selector is basename" do [node] = LocalCluster.start_nodes("testing_server", 1) log = @@ -25,12 +25,35 @@ defmodule KubeRPC.ClientTest do assert Enum.any?(logs, &(&1 =~ "RPC request to: testing_server1@127.0.0.1, Elixir.TestHandler.respond finished")) end - test "returns {:error, :badrpc} when no servers available" do + test "returns :ok from remote server handler when selector is basename and regex" do + [node] = LocalCluster.start_nodes("testing_server", 1) + + log = + capture_log(fn -> + assert :ok == TestClient.run({get_node_basename(node), ~r/127/}, TestHandler, :respond, [:ok]) + end) + + logs = String.split(log, "\n") + + assert Enum.any?(logs, &(&1 =~ "RPC request to: testing_server1@127.0.0.1, Elixir.TestHandler.respond started")) + assert Enum.any?(logs, &(&1 =~ "RPC request to: testing_server1@127.0.0.1, Elixir.TestHandler.respond finished")) + end + + test "returns {:error, :badrpc} when no servers available with simple selector" do assert capture_log(fn -> assert {:error, :badrpc} == TestClient.run("wrong_basename", TestHandler, :respond, [:ok]) end) =~ "No RPC servers available for basename: wrong_basename" end + test "returns {:error, :badrpc} when no servers available with complex selector" do + [node] = LocalCluster.start_nodes("testing_server", 1) + + assert capture_log(fn -> + assert {:error, :badrpc} == + TestClient.run({get_node_basename(node), ~r/128/}, TestHandler, :respond, [:ok]) + end) =~ "No RPC servers available for basename: testing_server" + end + test "returns {:error, :badrpc} when all attempts are exhausted" do [node] = LocalCluster.start_nodes("testing_server", 1)