Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: complex pod selection #10

Merged
merged 1 commit into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions lib/client/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ defmodule KubeRPC.Client do
defp get_timeout(_),
do: config()[:timeout] || 5_000

defp do_run(basename, module, function, args, timeout, attempts, skip_servers) do
defp do_run(selector, module, function, args, timeout, attempts, skip_servers) do
basename = get_basename(selector)

with :ok <- check_attempts(attempts),
servers <- filter_servers(basename, skip_servers),
servers <- filter_servers(selector, skip_servers),
{:ok, server} <- get_random_rpc_server(servers),
{:ok, pid} <- get_rpc_server_process_pid(basename, server),
{:ok, pid} <- get_rpc_server_process_pid(selector, server),
{:ok, response} <- call_rpc(pid, server, module, function, args, timeout) do
response
else
{:error, {:bad_server, server}} ->
do_run(basename, module, function, args, timeout, attempts + 1, [server | skip_servers])
do_run(selector, module, function, args, timeout, attempts + 1, [server | skip_servers])

{:error, :too_many_attempts} ->
Logger.warn("Failed RPC request to: #{basename}. #{module}.#{function}: #{sanitized_inspect(args)}")
Expand All @@ -37,6 +39,12 @@ defmodule KubeRPC.Client do
end
end

defp get_basename({basename, _regex}),
do: basename

defp get_basename(basename),
do: basename

defp check_attempts(attempts) do
cond do
attempts >= config()[:max_attempts] ->
Expand All @@ -47,15 +55,25 @@ defmodule KubeRPC.Client do
end
end

defp filter_servers(basename, skip_servers) do
defp filter_servers(selector, skip_servers) do
Node.list()
|> Enum.filter(fn node ->
case String.split(to_string(node), "@") do
|> filter_servers_by_selector(selector)
|> Enum.filter(fn server -> server not in skip_servers end)
end

defp filter_servers_by_selector(servers, {basename, regex}) do
servers
|> filter_servers_by_selector(basename)
|> Enum.filter(&(&1 |> to_string() |> String.match?(regex)))
end

defp filter_servers_by_selector(servers, basename) do
Enum.filter(servers, fn server ->
case String.split(to_string(server), "@") do
[^basename | _] -> true
_ -> false
end
end)
|> Enum.filter(fn server -> server not in skip_servers end)
end

# Invalid basename or all servers are down
Expand Down
27 changes: 25 additions & 2 deletions test/client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule KubeRPC.ClientTest do

alias KubeRPC.TestClient

test "returns :ok from remote server handler" do
test "returns :ok from remote server handler when selector is basename" do
[node] = LocalCluster.start_nodes("testing_server", 1)

log =
Expand All @@ -25,12 +25,35 @@ defmodule KubeRPC.ClientTest do
assert Enum.any?(logs, &(&1 =~ "RPC request to: testing_server1@127.0.0.1, Elixir.TestHandler.respond finished"))
end

test "returns {:error, :badrpc} when no servers available" do
test "returns :ok from remote server handler when selector is basename and regex" do
[node] = LocalCluster.start_nodes("testing_server", 1)

log =
capture_log(fn ->
assert :ok == TestClient.run({get_node_basename(node), ~r/127/}, TestHandler, :respond, [:ok])
end)

logs = String.split(log, "\n")

assert Enum.any?(logs, &(&1 =~ "RPC request to: testing_server1@127.0.0.1, Elixir.TestHandler.respond started"))
assert Enum.any?(logs, &(&1 =~ "RPC request to: testing_server1@127.0.0.1, Elixir.TestHandler.respond finished"))
end

test "returns {:error, :badrpc} when no servers available with simple selector" do
assert capture_log(fn ->
assert {:error, :badrpc} == TestClient.run("wrong_basename", TestHandler, :respond, [:ok])
end) =~ "No RPC servers available for basename: wrong_basename"
end

test "returns {:error, :badrpc} when no servers available with complex selector" do
[node] = LocalCluster.start_nodes("testing_server", 1)

assert capture_log(fn ->
assert {:error, :badrpc} ==
TestClient.run({get_node_basename(node), ~r/128/}, TestHandler, :respond, [:ok])
end) =~ "No RPC servers available for basename: testing_server"
end

test "returns {:error, :badrpc} when all attempts are exhausted" do
[node] = LocalCluster.start_nodes("testing_server", 1)

Expand Down