Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart watcher when server is gone #226

Merged
merged 5 commits into from Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Fixed

- Resume watch when API server has gone away - [#226](https://github.com/coryodaniel/k8s/pull/226), [#222](https://github.com/coryodaniel/k8s/issues/222)

<!-- Add your changelog entry to the relevant subsection -->

<!-- ### Added | Changed | Deprecated | Removed | Fixed | Security -->
Expand Down
102 changes: 55 additions & 47 deletions lib/k8s/client/mint/connection_registry.ex
Expand Up @@ -15,6 +15,9 @@ defmodule K8s.Client.Mint.ConnectionRegistry do
alias K8s.Client.HTTPError
alias K8s.Client.Mint.HTTPAdapter

require Logger
import K8s.Sys.Logger, only: [log_prefix: 1]

@poolboy_config [
worker_module: K8s.Client.Mint.HTTPAdapter,
size: 10,
Expand All @@ -34,27 +37,6 @@ defmodule K8s.Client.Mint.ConnectionRegistry do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end

@doc """
Gets a `HTTPAdapter` process from the registry and runs the given `callback`
function, passing it the adapter's PID.

If the process returned by the registry is a pool, it runs the given
`callback` in a `:poolboy` transaction.
"""
@spec run(uriopts(), (pid() -> any())) :: any()
def run({uri, opts}, callback) do
case GenServer.call(__MODULE__, {:get_or_open, HTTPAdapter.connection_args(uri, opts)}) do
{:ok, {:singleton, adapter_pid}} ->
callback.(adapter_pid)

{:ok, {:adapter_pool, pool_pid}} ->
:poolboy.transaction(pool_pid, callback)

{:error, error} ->
{:error, error}
end
end

@doc """
ets a `HTTPAdapter` process from the registry.

Expand All @@ -66,9 +48,24 @@ defmodule K8s.Client.Mint.ConnectionRegistry do
"""
@spec checkout(uriopts()) :: {:ok, adapter_pool_t()} | {:error, HTTPError.t()}
def checkout({uri, opts}) do
case GenServer.call(__MODULE__, {:get_or_open, HTTPAdapter.connection_args(uri, opts)}) do
key = HTTPAdapter.connection_args(uri, opts)

case GenServer.call(__MODULE__, {:get_or_open, key}, :infinity) do
{:ok, {:singleton, pid}} ->
{:ok, %{adapter: pid, pool: nil}}
# Check if the connection is open for writing.
if HTTPAdapter.open?(pid, :write) do
{:ok, %{adapter: pid, pool: nil}}
else
# The connection is closed for writing and needs to be removed from
# the registry
Logger.debug(
log_prefix("Connection is not open for writing. Removing it."),
library: :k8s
)

GenServer.cast(__MODULE__, {:remove, key})
checkout({uri, opts})
end

{:ok, {:adapter_pool, pool_pid}} ->
try do
Expand Down Expand Up @@ -99,40 +96,51 @@ defmodule K8s.Client.Mint.ConnectionRegistry do
end

@impl true
def handle_call({:get_or_open, key}, _from, {adapters, refs}) do
if Map.has_key?(adapters, key) do
{:reply, Map.fetch(adapters, key), {adapters, refs}}
def handle_call({:get_or_open, key}, _from, {adapters, refs}) when is_map_key(adapters, key) do
{:reply, Map.fetch(adapters, key), {adapters, refs}}
end

def handle_call({:get_or_open, key}, from, {adapters, refs}) do
{scheme, host, port, opts} = key

# Connect to the server to see if the server supports HTTP/2
with {:ok, conn} <- Mint.HTTP.connect(scheme, host, port, opts),
{type, adapter_spec} <- get_adapter_spec(conn, key),
{:ok, adapter} <-
DynamicSupervisor.start_child(K8s.Client.Mint.ConnectionSupervisor, adapter_spec) do
Mint.HTTP.close(conn)
ref = Process.monitor(adapter)
refs = Map.put(refs, ref, key)
adapters = Map.put(adapters, key, {type, adapter})
GenServer.reply(from, {:ok, {type, adapter}})
{:reply, {:ok, {type, adapter}}, {adapters, refs}}
else
{scheme, host, port, opts} = key

# Connect to the server to see if the server supports HTTP/2
with {:ok, conn} <- Mint.HTTP.connect(scheme, host, port, opts),
{type, adapter_spec} <- get_adapter_spec(conn, key),
{:ok, adapter} <-
DynamicSupervisor.start_child(K8s.Client.Mint.ConnectionSupervisor, adapter_spec) do
Mint.HTTP.close(conn)
ref = Process.monitor(adapter)
refs = Map.put(refs, ref, key)
adapters = Map.put(adapters, key, {type, adapter})
{:reply, {:ok, {type, adapter}}, {adapters, refs}}
else
{:error, %HTTPError{} = error} ->
{:error, error}

{:error, error} ->
{:reply, {:error, HTTPError.from_exception(error)}, {adapters, refs}}
end
{:error, %HTTPError{} = error} ->
{:reply, {:error, error}, {adapters, refs}}

{:error, error} ->
{:reply, {:error, HTTPError.from_exception(error)}, {adapters, refs}}
end
end

@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, {adapters, refs}) do
def handle_cast({:remove, key}, {adapters, refs}) do
adapters = Map.delete(adapters, key)
{:noreply, {adapters, refs}}
end

@impl true
def handle_info({:DOWN, ref, :process, pid, _reason}, {adapters, refs}) do
Logger.debug(log_prefix("DOWN of process #{inspect(pid)} received."), library: :k8s)
{key, refs} = Map.pop(refs, ref)
adapters = Map.delete(adapters, key)
{:noreply, {adapters, refs}}
end

def handle_info(_, state), do: {:noreply, state}
def handle_info(other, state) do
Logger.debug(log_prefix("other message received: #{inspect(other)}"), library: :k8s)
{:noreply, state}
end

@spec get_adapter_spec(Mint.HTTP.t(), HTTPAdapter.connection_args_t()) ::
{adapter_type_t(), :supervisor.child_spec()}
Expand Down
117 changes: 80 additions & 37 deletions lib/k8s/client/mint/http_adapter.ex
Expand Up @@ -29,9 +29,13 @@ defmodule K8s.Client.Mint.HTTPAdapter do
alias K8s.Client.HTTPError
alias K8s.Client.Mint.Request

import K8s.Sys.Logger, only: [log_prefix: 1]
require Logger
require Mint.HTTP

# healthcheck frequency in seconds
@healthcheck_freq 30

@type connection_args_t ::
{scheme :: atom(), host :: binary(), port :: integer(), opts :: keyword()}

Expand Down Expand Up @@ -65,6 +69,13 @@ defmodule K8s.Client.Mint.HTTPAdapter do
{String.to_atom(uri.scheme), uri.host, uri.port, opts}
end

@spec open?(GenServer.server(), :read | :write | :read_write) :: boolean()
def open?(pid, type \\ :read_write) do
GenServer.call(pid, {:open?, type})
catch
:exit, _ -> false
end

@doc """
Starts a HTTP request. The way the response parts are returned depends on the
`stream_to` argument passed to it.
Expand Down Expand Up @@ -138,16 +149,21 @@ defmodule K8s.Client.Mint.HTTPAdapter do
def init({scheme, host, port, opts}) do
case Mint.HTTP.connect(scheme, host, port, opts) do
{:ok, conn} ->
Process.send_after(self(), :healthcheck, @healthcheck_freq * 1_000)
state = %__MODULE__{conn: conn}
{:ok, state}

{:error, error} ->
Logger.error("Failed initializing HTTPAdapter GenServer", library: :k8s)
Logger.error(log_prefix("Failed initializing HTTPAdapter GenServer"), library: :k8s)
{:stop, HTTPError.from_exception(error)}
end
end

@impl true
def handle_call({:open?, type}, _from, state) do
{:reply, Mint.HTTP.open?(state.conn, type), state}
end

def handle_call({:request, method, path, headers, body, pool, stream_to}, from, state) do
caller_ref = from |> elem(0) |> Process.monitor()
conn = state.conn
Expand Down Expand Up @@ -217,16 +233,13 @@ defmodule K8s.Client.Mint.HTTPAdapter do
end

def handle_call({:recv, request_ref}, from, state) do
case get_and_update_in(
state.requests[request_ref],
&Request.recv(&1, from)
) do
{:stop, state} ->
{:stop, :normal, state}
{_, state} =
get_and_update_in(
state.requests[request_ref],
&Request.recv(&1, from)
)

{_, state} ->
{:noreply, state}
end
{:noreply, state}
end

@impl true
Expand All @@ -253,20 +266,25 @@ defmodule K8s.Client.Mint.HTTPAdapter do
Logger.debug("The connection was closed.", library: :k8s)

# We could terminate the process here. But there might still be chunks
# in the buffer, so we don't.
# in the buffer, so we let the healthcheck take care of that.
{:noreply, struct!(state, conn: conn)}

{:error, conn, error} ->
Logger.error(
"An error occurred when streaming the request body: #{Exception.message(error)}",
Logger.warn(
log_prefix(
"An error occurred when streaming the request body: #{Exception.message(error)}"
),
error: error,
library: :k8s
)

struct!(state, conn: conn)

{:error, conn, error, responses} ->
Logger.error("An error occurred when streaming the response: #{Exception.message(error)}",
Logger.warn(
log_prefix(
"An error occurred when streaming the response: #{Exception.message(error)}"
),
error: error,
library: :k8s
)
Expand Down Expand Up @@ -297,7 +315,9 @@ defmodule K8s.Client.Mint.HTTPAdapter do
case state do
{:stop, state} ->
Logger.debug(
"Received :DOWN signal from parent process. Terminating HTTPAdapter #{inspect(self())}.",
log_prefix(
"Received :DOWN signal from parent process. Terminating HTTPAdapter #{inspect(self())}."
),
library: :k8s
)

Expand All @@ -308,19 +328,49 @@ defmodule K8s.Client.Mint.HTTPAdapter do
end
end

# This is called regularly to check whether the connection is still open. If
# it's not open, and all buffers are emptied, this process is considered
# garbage and is stopped.
def handle_info(:healthcheck, state) do
any_non_empty_buffers? =
Enum.any?(state.requests, fn {_, request} -> request.buffer != [] end)

if Mint.HTTP.open?(state.conn) or any_non_empty_buffers? do
Process.send_after(self(), :healthcheck, @healthcheck_freq * 1_000)
{:noreply, state}
else
Logger.warn(
log_prefix("Connection closed for reading and writing - stopping this process."),
library: :k8s
)

{:stop, :closed, state}
end
end

@impl true
def terminate(_reason, state) do
def terminate(reason, state) do
state = state

state.requests
|> Enum.filter(fn {_ref, request} -> !is_nil(request.websocket) end)
|> Enum.each(fn {request_ref, request} ->
{:ok, _websocket, data} = Mint.WebSocket.encode(request.websocket, :close)
Mint.WebSocket.stream_request_body(state.conn, request_ref, data)
|> Enum.each(fn
{_request_ref, request} when is_nil(request.websocket) ->
Request.put_response(
request,
{:error, reason}
)

{request_ref, request} ->
{:ok, _websocket, data} = Mint.WebSocket.encode(request.websocket, :close)
Mint.WebSocket.stream_request_body(state.conn, request_ref, data)
end)

Mint.HTTP.close(state.conn)
Logger.debug("Terminating HTTPAdapter GenServer #{inspect(self())}", library: :k8s)

Logger.debug(log_prefix("Terminating HTTPAdapter GenServer #{inspect(self())}"),
library: :k8s
)

:ok
end

Expand Down Expand Up @@ -375,24 +425,17 @@ defmodule K8s.Client.Mint.HTTPAdapter do
state =
frames
|> Enum.map(&Request.map_frame/1)
|> Enum.reduce_while(state, fn mapped_frame, state ->
case get_and_update_in(
state.requests[request_ref],
&Request.put_response(&1, mapped_frame)
) do
{:stop, state} ->
# StreamTo requests need to be stopped from inside the GenServer.
{:halt, {:stop, :normal, state}}

{_, state} ->
{:cont, state}
end
|> Enum.reduce(state, fn mapped_frame, state ->
{_, state} =
get_and_update_in(
state.requests[request_ref],
&Request.put_response(&1, mapped_frame)
)

state
end)

case state do
{:stop, :normal, state} -> {:stop, :normal, state}
state -> {:noreply, state}
end
{:noreply, state}
end

@spec process_responses(t(), [Mint.Types.response()]) :: {:noreply, t()}
Expand Down
12 changes: 5 additions & 7 deletions lib/k8s/client/mint/request.ex
Expand Up @@ -12,9 +12,8 @@ defmodule K8s.Client.Mint.Request do
- `:receiving` - The request is currently receiving response parts / frames
- `:closing` - Websocket requests only: The `:close` frame was received but the process wasn't terminated yet
- `:terminating` - HTTP requests only: The `:done` part was received but the request isn't cleaned up yet
- `:closed` - The websocket request is closed. The process is going to terminate any moment now
"""
@type request_modes :: :pending | :receiving | :closing | :terminating | :closed
@type request_modes :: :pending | :receiving | :closing | :terminating

@typedoc """
Defines the state of the request.
Expand Down Expand Up @@ -58,7 +57,7 @@ defmodule K8s.Client.Mint.Request do
struct!(__MODULE__, fields)
end

@spec put_response(t(), :done | {atom(), any()}) :: :pop | {t() | :stop, t()}
@spec put_response(t(), :done | {atom(), any()}) :: :pop | {t(), t()}
def put_response(request, response) do
request
|> struct!(buffer: [response | request.buffer])
Expand All @@ -67,7 +66,7 @@ defmodule K8s.Client.Mint.Request do
|> maybe_terminate_request()
end

@spec recv(t(), GenServer.from()) :: :pop | {t() | :stop, t()}
@spec recv(t(), GenServer.from()) :: :pop | {t(), t()}
def recv(request, from) do
request
|> struct!(stream_to: {:reply, from})
Expand All @@ -88,9 +87,8 @@ defmodule K8s.Client.Mint.Request do

defp update_mode(request, _), do: request

@spec maybe_terminate_request(t()) :: {t() | :stop, t()} | :pop
def maybe_terminate_request(%__MODULE__{mode: :closing, buffer: []} = request),
do: {:stop, struct!(request, mode: :closed)}
@spec maybe_terminate_request(t()) :: {t(), t()} | :pop
def maybe_terminate_request(%__MODULE__{mode: :closing, buffer: []}), do: :pop

def maybe_terminate_request(%__MODULE__{mode: :terminating, buffer: []} = request) do
Process.demonitor(request.caller_ref)
Expand Down