Skip to content

Commit

Permalink
Adding Wait operation runner
Browse files Browse the repository at this point in the history
  • Loading branch information
coryodaniel committed Jan 16, 2019
1 parent 86baf1c commit a3a59d2
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 0 deletions.
111 changes: 111 additions & 0 deletions lib/k8s/client/wait.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
defmodule K8s.Client.Wait do
@moduledoc """
Waiting functionality for `K8s.Client`
"""

alias K8s.Client.{Operation, Wait}

@typedoc "A wait configuration"
@type t :: %__MODULE__{
timeout: pos_integer,
sleep: pos_integer,
eval: any | (any -> any),
find: list(binary) | (any -> any),
timeout_after: NaiveDateTime.t(),
processor: (map(), map() -> {:ok, map} | {:error, binary})
}
defstruct [:timeout, :sleep, :eval, :find, :timeout_after, :processor]

@doc """
Continually perform a GET based operation until a condition is met.
## Example
Checking the number of job completions:
```elixir
op = K8s.Client.get("batch/v1", :job, namespace: "default", name: "sleep")
conf = K8s.Conf.from_file("~/.kube/config")
opts = [find: ["status", "succeeded"], eval: 1, timeout: 60]
resp = K8s.Client.Wait.until(op, conf, opts)
```
"""
@spec until(Operation.t(), map(), keyword(atom())) ::
{:ok, map()} | {:error, binary()}
def until(%Operation{method: :get} = op, conf, opts) do
conditions =
Wait
|> struct(opts)
|> process_opts()

case conditions do
{:ok, opts} -> run_operation(op, conf, opts)
error -> error
end
end

def until(op, _, _), do: {:error, "Only HTTP GET operations are supported. #{inspect(op)}"}

defp process_opts(%Wait{eval: nil}), do: {:error, ":eval is required"}
defp process_opts(%Wait{find: nil}), do: {:error, ":find is required"}

defp process_opts(opts) when is_map(opts) do
timeout = Map.get(opts, :timeout) || 30
sleep = Map.get(opts, :sleep) || 1
now = NaiveDateTime.utc_now()
timeout_after = NaiveDateTime.add(now, timeout, :second)
processor = Map.get(opts, :processor) || (&K8s.Client.run/2)

processed =
opts
|> Map.put(:timeout, timeout)
|> Map.put(:sleep, sleep * 1000)
|> Map.put(:timeout_after, timeout_after)
|> Map.put(:processor, processor)

{:ok, processed}
end

defp run_operation(op, conf, %Wait{timeout_after: timeout_after} = opts) do
case timed_out?(timeout_after) do
true -> {:error, :timeout}
false -> evaluate_operation(op, conf, opts)
end
end

defp evaluate_operation(
op,
conf,
%Wait{processor: processor, sleep: sleep, eval: eval, find: find} = opts
) do
with {:ok, resp} <- processor.(op, conf),
true <- satisfied?(resp, find, eval) do
{:ok, resp}
else
_not_satisfied ->
Process.sleep(sleep)
run_operation(op, conf, opts)
end
end

defp satisfied?(resp = %{}, find, eval) when is_list(find) do
value = get_in(resp, find)
compare(value, eval)
end

defp satisfied?(resp = %{}, find, eval) when is_function(find) do
value = find.(resp)
compare(value, eval)
end

defp compare(value, eval) when not is_function(eval), do: value == eval
defp compare(value, eval) when is_function(eval), do: eval.(value)

defp timed_out?(timeout_after) do
case NaiveDateTime.compare(NaiveDateTime.utc_now(), timeout_after) do
:gt -> true
_ -> false
end
end
end
61 changes: 61 additions & 0 deletions test/k8s/client/wait_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule K8s.Client.WaitTest do
use ExUnit.Case, async: true
doctest K8s.Client.Wait
alias K8s.Client.Wait

def operation() do
%K8s.Client.Operation{path: "/foo", method: :get}
end

describe "until/3" do
test "returns an error when `:find` is not provided" do
{:error, msg} = Wait.until(operation(), %{}, eval: 1)
assert msg == ":find is required"
end

test "returns an error when `:eval` is not provided" do
{:error, msg} = Wait.until(operation(), %{}, find: ["foo"])
assert msg == ":eval is required"
end

test "returns an error the operation is not a GET" do
operation = %K8s.Client.Operation{path: "/foo", method: :post}
{:error, msg} = Wait.until(operation, %{}, find: ["foo"])
assert Regex.match?(~r/Only HTTP GET operations are supported/, msg)
end

test "returns an :ok tuple when the primitive conditions are met" do
processor = fn _, _ -> {:ok, %{"foo" => "bar"}} end
opts = [find: ["foo"], eval: "bar", processor: processor]

assert {:ok, _} = Wait.until(operation(), %{}, opts)
end

test "returns :ok when evaluating with a function" do
processor = fn _, _ -> {:ok, %{"foo" => "bar"}} end
eval = fn val -> val == "bar" end
opts = [find: ["foo"], eval: eval, processor: processor]

assert {:ok, _} = Wait.until(operation(), %{}, opts)
end

test "returns :ok when finding with a function" do
processor = fn _, _ -> {:ok, %{"foo" => "bar"}} end
find = fn result -> result["foo"] end
opts = [find: find, eval: "bar", processor: processor]

assert {:ok, _} = Wait.until(operation(), %{}, opts)
end

test "timeouting out" do
processor = fn _, _ ->
Process.sleep(1001)
{:ok, %{"foo" => "bar"}}
end

opts = [find: ["foo"], eval: "bar", processor: processor, timeout: 1]

assert {:ok, _} = Wait.until(operation(), %{}, opts)
end
end
end

0 comments on commit a3a59d2

Please sign in to comment.