From d5c2436d17a298809728e87b13949ba2d599f905 Mon Sep 17 00:00:00 2001 From: Seivan Heidari Date: Tue, 12 Dec 2017 01:08:55 +0100 Subject: [PATCH 1/2] PoC for DNS based service discovery on Kubernetes It could use some input on documentation and maybe look into naming of things. I just tested it, and it works great. But I am not even 50% sure on the naming as well as the general documentation. I would love some feedback and discussion here. ```elixir config :libcluster, topologies: [ k8s_example: [ strategy: Cluster.Strategy.Kubernetes.DNS, config: [ service: "web-headless", application_name: "myapp", polling_interval: 1000 ]]] ``` ``` apiVersion: v1 kind: Service metadata: labels: deployment: web name: web-headless namespace: default spec: clusterIP: None ports: - port: 80 selector: deployment: web ``` --- lib/strategy/kubernetes_dns.ex | 115 +++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 lib/strategy/kubernetes_dns.ex diff --git a/lib/strategy/kubernetes_dns.ex b/lib/strategy/kubernetes_dns.ex new file mode 100644 index 0000000..d9e0300 --- /dev/null +++ b/lib/strategy/kubernetes_dns.ex @@ -0,0 +1,115 @@ +defmodule Cluster.Strategy.Kubernetes.DNS do + @moduledoc """ + This clustering strategy works by loading all your Erlang nodes (within Pods) in the current Kubernetes + namespace. It will fetch the addresses of all pods under a shared headless service and attempt to connect. + It will continually monitor and update its connections every 5s. + + It assumes that all Erlang nodes were launched under a base name, are using longnames, and are unique + based on their FQDN, rather than the base hostname. In other words, in the following + longname, `@`, `basename` would be the value configured through + `application_name`. + + An example configuration is below: + + + config :libcluster, + topologies: [ + k8s_example: [ + strategy: #{__MODULE__}, + config: [ + service: "myapp-headless", + application_name: "myapp", + polling_interval: 10_000]]] + + """ + use GenServer + use Cluster.Strategy + import Cluster.Logger + + alias Cluster.Strategy.State + + @default_polling_interval 5_000 + + def start_link(opts), do: GenServer.start_link(__MODULE__, opts) + def init(opts) do + state = %State{ + topology: Keyword.fetch!(opts, :topology), + connect: Keyword.fetch!(opts, :connect), + disconnect: Keyword.fetch!(opts, :disconnect), + list_nodes: Keyword.fetch!(opts, :list_nodes), + config: Keyword.fetch!(opts, :config), + meta: MapSet.new([]) + } + {:ok, state, 0} + end + + def handle_info(:timeout, state) do + handle_info(:load, state) + end + def handle_info(:load, %State{topology: topology, connect: connect, disconnect: disconnect, list_nodes: list_nodes} = state) do + new_nodelist = MapSet.new(get_nodes(state)) + added = MapSet.difference(new_nodelist, state.meta) + removed = MapSet.difference(state.meta, new_nodelist) + new_nodelist = case Cluster.Strategy.disconnect_nodes(topology, disconnect, list_nodes, MapSet.to_list(removed)) do + :ok -> + new_nodelist + {:error, bad_nodes} -> + # Add back the nodes which should have been removed, but which couldn't be for some reason + Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc -> + MapSet.put(acc, n) + end) + end + new_nodelist = case Cluster.Strategy.connect_nodes(topology, connect, list_nodes, MapSet.to_list(added)) do + :ok -> + new_nodelist + {:error, bad_nodes} -> + # Remove the nodes which should have been added, but couldn't be for some reason + Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc -> + MapSet.delete(acc, n) + end) + end + Process.send_after(self(), :load, Keyword.get(state.config, :polling_interval, @default_polling_interval)) + {:noreply, %{state | :meta => new_nodelist}} + end + + def handle_info(_, state) do + {:noreply, state} + end + + + + @spec get_nodes(State.t) :: [atom()] + defp get_nodes(%State{topology: topology, config: config}) do + app_name = Keyword.fetch!(config, :application_name) + service = Keyword.fetch!(config, :service) + cond do + app_name != nil and service != nil -> + headless_service = service |> Kernel.to_charlist + case :inet_res.getbyname(headless_service, :a) do + {:ok, {:hostent, fqdn, [], :inet, value, adresses}} -> + parse_response(adresses, app_name) + {:error, reason} -> + error topology, "lookup against #{service} failed: #{inspect reason}" + [] + end + app_name == nil -> + warn topology, "kubernetes.DNS strategy is selected, but :application_name is not configured!" + [] + service == nil -> + warn topology, "kubernetes strategy is selected, but :service is not configured!" + [] + :else -> + warn topology, "kubernetes strategy is selected, but is not configured!" + [] + end + end + + defp parse_response(adresses, app_name) do + adresses + |> Enum.map(&(:inet_parse.ntoa(&1))) + |> Enum.map(&("#{app_name}@#{&1}")) + |> Enum.map(&(String.to_atom(&1))) + end + + +end From 11dfabfcc4ecebdadae05642d56f6291c3dba047 Mon Sep 17 00:00:00 2001 From: Seivan Heidari Date: Thu, 19 Apr 2018 12:10:31 +0200 Subject: [PATCH 2/2] * Fixing formatting. * Changing `init/1` to be a blocking operation. * Extracting polling_interval value into `polling_interval/1` --- lib/strategy/kubernetes_dns.ex | 120 ++++++++++++++++++++++----------- 1 file changed, 80 insertions(+), 40 deletions(-) diff --git a/lib/strategy/kubernetes_dns.ex b/lib/strategy/kubernetes_dns.ex index d9e0300..a7d6586 100644 --- a/lib/strategy/kubernetes_dns.ex +++ b/lib/strategy/kubernetes_dns.ex @@ -1,7 +1,7 @@ defmodule Cluster.Strategy.Kubernetes.DNS do @moduledoc """ This clustering strategy works by loading all your Erlang nodes (within Pods) in the current Kubernetes - namespace. It will fetch the addresses of all pods under a shared headless service and attempt to connect. + namespace. It will fetch the addresses of all pods under a shared headless service and attempt to connect. It will continually monitor and update its connections every 5s. It assumes that all Erlang nodes were launched under a base name, are using longnames, and are unique @@ -31,6 +31,7 @@ defmodule Cluster.Strategy.Kubernetes.DNS do @default_polling_interval 5_000 def start_link(opts), do: GenServer.start_link(__MODULE__, opts) + def init(opts) do state = %State{ topology: Keyword.fetch!(opts, :topology), @@ -40,76 +41,115 @@ defmodule Cluster.Strategy.Kubernetes.DNS do config: Keyword.fetch!(opts, :config), meta: MapSet.new([]) } - {:ok, state, 0} + + {:ok, load(state), 0} end def handle_info(:timeout, state) do handle_info(:load, state) end - def handle_info(:load, %State{topology: topology, connect: connect, disconnect: disconnect, list_nodes: list_nodes} = state) do - new_nodelist = MapSet.new(get_nodes(state)) - added = MapSet.difference(new_nodelist, state.meta) - removed = MapSet.difference(state.meta, new_nodelist) - new_nodelist = case Cluster.Strategy.disconnect_nodes(topology, disconnect, list_nodes, MapSet.to_list(removed)) do - :ok -> - new_nodelist - {:error, bad_nodes} -> - # Add back the nodes which should have been removed, but which couldn't be for some reason - Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc -> - MapSet.put(acc, n) - end) - end - new_nodelist = case Cluster.Strategy.connect_nodes(topology, connect, list_nodes, MapSet.to_list(added)) do - :ok -> - new_nodelist - {:error, bad_nodes} -> - # Remove the nodes which should have been added, but couldn't be for some reason - Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc -> - MapSet.delete(acc, n) - end) - end - Process.send_after(self(), :load, Keyword.get(state.config, :polling_interval, @default_polling_interval)) - {:noreply, %{state | :meta => new_nodelist}} + + def handle_info(:load, %State{} = state) do + {:noreply, load(state)} end def handle_info(_, state) do {:noreply, state} end + defp load( + %State{ + topology: topology, + connect: connect, + disconnect: disconnect, + list_nodes: list_nodes + } = state + ) do + new_nodelist = MapSet.new(get_nodes(state)) + added = MapSet.difference(new_nodelist, state.meta) + removed = MapSet.difference(state.meta, new_nodelist) + + new_nodelist = + case Cluster.Strategy.disconnect_nodes( + topology, + disconnect, + list_nodes, + MapSet.to_list(removed) + ) do + :ok -> + new_nodelist + + {:error, bad_nodes} -> + # Add back the nodes which should have been removed, but which couldn't be for some reason + Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc -> + MapSet.put(acc, n) + end) + end + + new_nodelist = + case Cluster.Strategy.connect_nodes(topology, connect, list_nodes, MapSet.to_list(added)) do + :ok -> + new_nodelist + + {:error, bad_nodes} -> + # Remove the nodes which should have been added, but couldn't be for some reason + Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc -> + MapSet.delete(acc, n) + end) + end + + Process.send_after( + self(), + :load, + polling_interval(state.config) + ) + {:noreply, %{state | :meta => new_nodelist}} + end - @spec get_nodes(State.t) :: [atom()] + @spec get_nodes(State.t()) :: [atom()] defp get_nodes(%State{topology: topology, config: config}) do app_name = Keyword.fetch!(config, :application_name) service = Keyword.fetch!(config, :service) + cond do app_name != nil and service != nil -> - headless_service = service |> Kernel.to_charlist + headless_service = Kernel.to_charlist(service) + case :inet_res.getbyname(headless_service, :a) do - {:ok, {:hostent, fqdn, [], :inet, value, adresses}} -> - parse_response(adresses, app_name) + {:ok, {:hostent, _fqdn, [], :inet, _value, addresses}} -> + parse_response(addresses, app_name) + {:error, reason} -> - error topology, "lookup against #{service} failed: #{inspect reason}" + error(topology, "lookup against #{service} failed: #{inspect(reason)}") [] end + app_name == nil -> - warn topology, "kubernetes.DNS strategy is selected, but :application_name is not configured!" + warn( + topology, + "kubernetes.DNS strategy is selected, but :application_name is not configured!" + ) + [] + service == nil -> - warn topology, "kubernetes strategy is selected, but :service is not configured!" + warn(topology, "kubernetes strategy is selected, but :service is not configured!") [] + :else -> - warn topology, "kubernetes strategy is selected, but is not configured!" + warn(topology, "kubernetes strategy is selected, but is not configured!") [] end end + defp polling_interval(config), + do: Keyword.get(config, :polling_interval, @default_polling_interval) + defp parse_response(adresses, app_name) do - adresses - |> Enum.map(&(:inet_parse.ntoa(&1))) - |> Enum.map(&("#{app_name}@#{&1}")) - |> Enum.map(&(String.to_atom(&1))) + adresses + |> Enum.map(&:inet_parse.ntoa(&1)) + |> Enum.map(&"#{app_name}@#{&1}") + |> Enum.map(&String.to_atom(&1)) end - - end