diff --git a/.travis.yml b/.travis.yml index 469440d..3b458c0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,9 +4,17 @@ elixir: - 1.6 - 1.7 - 1.8.1 + - 1.9.0 otp_release: - 20.2 - 21.0 + - 22.0 + +matrix: + exclude: + - otp_release: 22.0 + elixir: 1.6 + script: - "mix format --check-formatted --dry-run" - "MIX_ENV=test mix do deps.get, test" diff --git a/lib/strategy/kubernetes_dns_srv.ex b/lib/strategy/kubernetes_dns_srv.ex new file mode 100644 index 0000000..5069c84 --- /dev/null +++ b/lib/strategy/kubernetes_dns_srv.ex @@ -0,0 +1,233 @@ +defmodule Cluster.Strategy.Kubernetes.DNSSRV do + @moduledoc """ + This clustering strategy works by issuing a SRV query for the kubernetes headless service + under which the stateful set containing your nodes is running. + + For more information, see the kubernetes stateful-application [documentation](https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/#using-stable-network-identities) + + * It will fetch the FQDN of all pods under the headless service and attempt to connect. + * It will continually monitor and update its connections according to the polling_interval (default 5s) + + The `application_name` is configurable (you may have launched erlang with a different configured name), + but will in most cases be the name of your application + + An example configuration is below: + + config :libcluster, + topologies: [ + k8s_example: [ + strategy: #{__MODULE__}, + config: [ + service: "elixir-plug-poc", + application_name: "elixir_plug_poc", + polling_interval: 10_000]]] + + An example of how this strategy extracts topology information from DNS follows: + + ``` + bash-5.0# hostname -f + elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local + bash-5.0# dig SRV elixir-plug-poc.default.svc.cluster.local + + ; <<>> DiG 9.14.3 <<>> SRV elixir-plug-poc.default.svc.cluster.local + ;; global options: +cmd + ;; Got answer: + ;; WARNING: .local is reserved for Multicast DNS + ;; You are currently testing what happens when an mDNS query is leaked to DNS + ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 7169 + ;; flags: qr aa rd ra; QUERY: 1, ANSWER: 2, AUTHORITY: 0, ADDITIONAL: 2 + + ;; QUESTION SECTION: + ;elixir-plug-poc.default.svc.cluster.local. IN SRV + + ;; ANSWER SECTION: + elixir-plug-poc.default.svc.cluster.local. 30 IN SRV 10 50 0 elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local. + elixir-plug-poc.default.svc.cluster.local. 30 IN SRV 10 50 0 elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local. + + ;; ADDITIONAL SECTION: + elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local. 30 IN A 10.1.0.95 + elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local. 30 IN A 10.1.0.96 + + ;; Query time: 0 msec + ;; SERVER: 10.96.0.10#53(10.96.0.10) + ;; WHEN: Wed Jul 03 11:55:27 UTC 2019 + ;; MSG SIZE rcvd: 167 + ``` + + And here is an example of a corresponding kubernetes statefulset/service definition: + + ```yaml + apiVersion: v1 + kind: Service + metadata: + name: elixir-plug-poc + labels: + app: elixir-plug-poc + spec: + ports: + - port: 4000 + name: web + clusterIP: None + selector: + app: elixir-plug-poc + --- + apiVersion: apps/v1 + kind: StatefulSet + metadata: + name: elixir-plug-poc + spec: + serviceName: "elixir-plug-poc" + replicas: 2 + selector: + matchLabels: + app: elixir-plug-poc + template: + metadata: + labels: + app: elixir-plug-poc + spec: + containers: + - name: elixir-plug-poc + image: binarytemple/elixir_plug_poc + args: + - foreground + env: + - name: ERLANG_COOKIE + value: "cookie" + imagePullPolicy: Always + ports: + - containerPort: 4000 + name: http + protocol: TCP + ``` + """ + use GenServer + use Cluster.Strategy + import Cluster.Logger + + alias Cluster.Strategy.State + + @default_polling_interval 5_000 + + def start_link(args), do: GenServer.start_link(__MODULE__, args) + + @impl true + def init([%State{meta: nil} = state]) do + init([%State{state | :meta => MapSet.new()}]) + end + + def init([%State{} = state]) do + {:ok, load(state), 0} + end + + @impl true + def handle_info(:timeout, state) do + handle_info(:load, state) + end + + def handle_info(:load, state) do + {:noreply, load(state)} + end + + def handle_info(_, state) do + {:noreply, state} + end + + defp load(%State{topology: topology, meta: meta} = state) do + new_nodelist = MapSet.new(get_nodes(state)) + added = MapSet.difference(new_nodelist, meta) + removed = MapSet.difference(meta, new_nodelist) + + new_nodelist = + case Cluster.Strategy.disconnect_nodes( + topology, + state.disconnect, + state.list_nodes, + MapSet.to_list(removed) + ) do + :ok -> + new_nodelist + + {:error, bad_nodes} -> + # Add back the nodes which should have been removed, but which couldn't be for some reason + Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc -> + MapSet.put(acc, n) + end) + end + + new_nodelist = + case Cluster.Strategy.connect_nodes( + topology, + state.connect, + state.list_nodes, + MapSet.to_list(added) + ) do + :ok -> + new_nodelist + + {:error, bad_nodes} -> + # Remove the nodes which should have been added, but couldn't be for some reason + Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc -> + MapSet.delete(acc, n) + end) + end + + Process.send_after( + self(), + :load, + polling_interval(state) + ) + + %State{state | :meta => new_nodelist} + end + + @spec get_nodes(State.t()) :: [atom()] + defp get_nodes(%State{topology: topology, config: config}) do + app_name = Keyword.fetch!(config, :application_name) + service = Keyword.fetch!(config, :service) + namespace = Keyword.fetch!(config, :namespace) + service_k8s_path = "#{service}.#{namespace}.svc.cluster.local." + resolver = Keyword.get(config, :resolver, &:inet_res.getbyname(&1, :srv)) + + cond do + app_name != nil and service != nil -> + headless_service = to_charlist(service_k8s_path) + + case resolver.(headless_service) do + {:ok, {:hostent, _, _, :srv, _count, addresses}} -> + parse_response(addresses, app_name) + + {:error, reason} -> + error(topology, "#{inspect(headless_service)} : lookup against #{service} failed: #{inspect(reason)}") + [] + end + + app_name == nil -> + warn( + topology, + "kubernetes.DNS strategy is selected, but :application_name is not configured!" + ) + + [] + + service == nil -> + warn(topology, "kubernetes strategy is selected, but :service is not configured!") + [] + + :else -> + warn(topology, "kubernetes strategy is selected, but is not configured!") + [] + end + end + + defp polling_interval(%State{config: config}) do + Keyword.get(config, :polling_interval, @default_polling_interval) + end + + defp parse_response(addresses, app_name) do + addresses + |> Enum.map(&:erlang.list_to_binary(elem(&1, 3))) + |> Enum.map(&"#{app_name}@#{&1}") + |> Enum.map(&String.to_atom(&1)) + end +end diff --git a/test/kubernetes_dns_srv_test.exs b/test/kubernetes_dns_srv_test.exs new file mode 100644 index 0000000..55969bb --- /dev/null +++ b/test/kubernetes_dns_srv_test.exs @@ -0,0 +1,149 @@ +defmodule Cluster.Strategy.KubernetesSRVDNSTest do + @moduledoc false + + use ExUnit.Case, async: true + import ExUnit.CaptureLog + + alias Cluster.Strategy.Kubernetes.DNSSRV + alias Cluster.Strategy.State + alias Cluster.Nodes + + require Cluster.Nodes + + describe "start_link/1" do + test "adds new nodes" do + capture_log(fn -> + [ + %State{ + topology: :k8s_dns_example, + config: [ + polling_interval: 100, + service: "elixir-plug-poc", + namespace: "default", + application_name: "node", + resolver: fn _query -> + {:ok, + {:hostent, 'elixir-plug-poc.default.svc.cluster.local', [], :srv, 2, + [ + {10, 50, 0, 'elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local'}, + {10, 50, 0, 'elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local'} + ]}} + end + ], + connect: {Nodes, :connect, [self()]}, + disconnect: {Nodes, :disconnect, [self()]}, + list_nodes: {Nodes, :list_nodes, [[]]} + } + ] + |> DNSSRV.start_link() + + assert_receive {:connect, + :"node@elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local"}, + 100 + + assert_receive {:connect, + :"node@elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local"}, + 100 + end) + end + + test "removes nodes" do + capture_log(fn -> + [ + %State{ + topology: :k8s_dns_example, + config: [ + polling_interval: 100, + service: "elixir-plug-poc", + namespace: "default", + application_name: "node", + resolver: fn _query -> + {:ok, + {:hostent, 'elixir-plug-poc.default.svc.cluster.local', [], :srv, 1, + [ + {10, 50, 0, 'elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local'} + ]}} + end + ], + connect: {Nodes, :connect, [self()]}, + disconnect: {Nodes, :disconnect, [self()]}, + list_nodes: + {Nodes, :list_nodes, + [ + [ + :"node@elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local", + :"node@elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local" + ] + ]}, + meta: + MapSet.new([ + :"node@elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local", + :"node@elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local" + ]) + } + ] + |> DNSSRV.start_link() + + assert_receive {:disconnect, + :"node@elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local"}, + 100 + end) + end + + test "keeps state" do + capture_log(fn -> + [ + %State{ + topology: :k8s_dns_example, + config: [ + polling_interval: 100, + service: "app", + namespace: "default", + application_name: "node", + resolver: fn _query -> + {:ok, + {:hostent, 'elixir-plug-poc.default.svc.cluster.local', [], :srv, 2, + [ + {10, 50, 0, 'elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local'} + ]}} + + + end + ], + connect: {Nodes, :connect, [self()]}, + disconnect: {Nodes, :disconnect, [self()]}, + list_nodes: {Nodes, :list_nodes, [[:"node@elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local"]]}, + meta: MapSet.new([:"node@elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local"]) + } + ] + |> DNSSRV.start_link() + + refute_receive {:disconnect, _}, 100 + refute_receive {:connect, _}, 100 + end) + end + + test "does not connect to anything if name is not resolved" do + capture_log(fn -> + [ + %State{ + topology: :k8s_dns_example, + config: [ + polling_interval: 100, + service: "app", + namespace: "default", + application_name: "node", + resolver: fn _query -> {:error, :nxdomain} end + ], + connect: {Nodes, :connect, [self()]}, + disconnect: {Nodes, :disconnect, [self()]}, + list_nodes: {Nodes, :list_nodes, [[]]} + } + ] + |> DNSSRV.start_link() + + refute_receive {:connect, _}, 100 + end) + end + end +end