Skip to content

Commit

Permalink
async dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
coryodaniel committed Mar 11, 2019
1 parent fd8ca61 commit e149358
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 29 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -15,11 +15,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Async watcher event dispatch
- Replaced `HTTPoison` with [k8s](https://github.com/coryodaniel/k8s).
- Removed `Bypass` from test suite

### Removed

- Removed `Bypass` from test suite
- Removed `Impl.parse_metadata/1`.
- Removed `kubeconf_file` and `kubeconf_opts` config options

Expand Down
26 changes: 13 additions & 13 deletions lib/bonny/watcher/impl.ex
Expand Up @@ -75,7 +75,7 @@ defmodule Bonny.Watcher.Impl do
@doc """
Dispatches an `ADDED`, `MODIFIED`, and `DELETED` events to an controller
"""
@spec dispatch(map, atom) :: nil
@spec dispatch(map, atom) :: no_return
def dispatch(%{"type" => "ADDED", "object" => object}, controller),
do: do_dispatch(controller, :add, object)

Expand All @@ -87,22 +87,22 @@ defmodule Bonny.Watcher.Impl do

@spec do_dispatch(atom, atom, map) :: nil
defp do_dispatch(controller, event, object) do
Logger.debug(fn -> "Dispatching: #{inspect(controller)}.#{event}/1" end)
Task.start fn ->
Logger.debug(fn -> "Dispatching: #{inspect(controller)}.#{event}/1" end)

case apply(controller, event, [object]) do
:ok ->
Logger.debug(fn -> "#{inspect(controller)}.#{event}/1 succeeded" end)
case apply(controller, event, [object]) do
:ok ->
Logger.debug(fn -> "#{inspect(controller)}.#{event}/1 succeeded" end)

:error ->
Logger.error(fn -> "#{inspect(controller)}.#{event}/1 failed" end)
:error ->
Logger.error(fn -> "#{inspect(controller)}.#{event}/1 failed" end)

invalid ->
Logger.error(fn ->
"Unsupported response from #{inspect(controller)}.#{event}/1: #{inspect(invalid)}"
end)
invalid ->
Logger.error(fn ->
"Unsupported response from #{inspect(controller)}.#{event}/1: #{inspect(invalid)}"
end)
end
end

nil
end

@spec list_operation(Impl.t()) :: K8s.Operation.t()
Expand Down
43 changes: 28 additions & 15 deletions test/bonny/watcher/impl_test.exs
Expand Up @@ -3,16 +3,28 @@ defmodule Bonny.Watcher.ImplTest do
use ExUnit.Case, async: true
alias Bonny.Watcher.Impl

defp chunk(), do: "{\"type\":\"ADDED\",\"object\":{\"apiVersion\":\"example.com/v1\",\"kind\":\"Widget\",\"metadata\":{\"annotations\":{\"kubectl.kubernetes.io/last-applied-configuration\":\"{\\\"apiVersion\\\":\\\"example.com/v1\\\",\\\"kind\\\":\\\"Widget\\\",\\\"metadata\\\":{\\\"annotations\\\":{},\\\"name\\\":\\\"test-widget\\\",\\\"namespace\\\":\\\"default\\\"}}\\n\"},\"clusterName\":\"\",\"creationTimestamp\":\"2018-12-17T06:26:41Z\",\"generation\":1,\"name\":\"test-widget\",\"namespace\":\"default\",\"resourceVersion\":\"705460\",\"selfLink\":\"/apis/example.com/v1/namespaces/default/widgets/test-widget\",\"uid\":\"b7464e30-01c4-11e9-9066-025000000001\"}}}\n"

defmodule Whizbang do
@moduledoc false
def add(evt), do: emit(:added, evt)
def modify(evt), do: emit(:modified, evt)
def delete(evt), do: emit(:deleted, evt)

defp emit(type, evt) do
send(self(), {type, evt})
:ok
use Agent

def start_link() do
Agent.start_link(fn -> [] end, name: __MODULE__)
end

def get() do
Agent.get(__MODULE__, fn(events) -> events end)
end

def put(event) do
Agent.update(__MODULE__, fn(events) -> [event|events] end)
end

def add(evt), do: put({:added, evt})
def modify(evt), do: put({:modified, evt})
def delete(evt), do: put({:deleted, evt})
end

describe "new/1" do
Expand Down Expand Up @@ -64,22 +76,23 @@ defmodule Bonny.Watcher.ImplTest do

describe "parse_chunk/1" do
test "strips whitespace and parses json" do
chunk =
"{\"type\":\"ADDED\",\"object\":{\"apiVersion\":\"example.com/v1\",\"kind\":\"Widget\",\"metadata\":{\"annotations\":{\"kubectl.kubernetes.io/last-applied-configuration\":\"{\\\"apiVersion\\\":\\\"example.com/v1\\\",\\\"kind\\\":\\\"Widget\\\",\\\"metadata\\\":{\\\"annotations\\\":{},\\\"name\\\":\\\"test-widget\\\",\\\"namespace\\\":\\\"default\\\"}}\\n\"},\"clusterName\":\"\",\"creationTimestamp\":\"2018-12-17T06:26:41Z\",\"generation\":1,\"name\":\"test-widget\",\"namespace\":\"default\",\"resourceVersion\":\"705460\",\"selfLink\":\"/apis/example.com/v1/namespaces/default/widgets/test-widget\",\"uid\":\"b7464e30-01c4-11e9-9066-025000000001\"}}}\n"

result = Impl.parse_chunk(chunk)
result = Impl.parse_chunk(chunk())
assert %{"type" => "ADDED"} = result
end
end

describe "dispatch/2" do
test "dispatches a kubernetes API event to the given module" do
chunk =
"{\"type\":\"ADDED\",\"object\":{\"apiVersion\":\"example.com/v1\",\"kind\":\"Widget\",\"metadata\":{\"annotations\":{\"kubectl.kubernetes.io/last-applied-configuration\":\"{\\\"apiVersion\\\":\\\"example.com/v1\\\",\\\"kind\\\":\\\"Widget\\\",\\\"metadata\\\":{\\\"annotations\\\":{},\\\"name\\\":\\\"test-widget\\\",\\\"namespace\\\":\\\"default\\\"}}\\n\"},\"clusterName\":\"\",\"creationTimestamp\":\"2018-12-17T06:26:41Z\",\"generation\":1,\"name\":\"test-widget\",\"namespace\":\"default\",\"resourceVersion\":\"705460\",\"selfLink\":\"/apis/example.com/v1/namespaces/default/widgets/test-widget\",\"uid\":\"b7464e30-01c4-11e9-9066-025000000001\"}}}\n"
{:ok, _} = Whizbang.start_link()

chunk()
|> Impl.parse_chunk
|> Impl.dispatch(Whizbang)

added_chunk = Impl.parse_chunk(chunk)
Impl.dispatch(added_chunk, Whizbang)
assert_received({:added, added_event})
# Professional.
:timer.sleep(1000)
assert [{:added, event}] = Whizbang.get
assert %{"apiVersion" => "example.com/v1"} = event
end
end
end

0 comments on commit e149358

Please sign in to comment.