Skip to content
This repository has been archived by the owner on Jul 28, 2023. It is now read-only.

Commit

Permalink
Add exponential back off for marathon connect
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyflash committed Nov 22, 2016
1 parent 637ef45 commit 886925a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 13 deletions.
78 changes: 68 additions & 10 deletions lib/marathon/binge_watch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,66 @@ defmodule Heimdall.Marathon.BingeWatch do
streaming Marathon events to itself in `start_link/0`.
"""

@name Heimdall.Marathon.BingeWatch

use GenServer

require Logger
alias Heimdall.DynamicRoutes
alias Plug.Router.Utils

def start_link(args) do
marathon_url = Keyword.get(args, :marathon_url)
{:ok, pid} = GenServer.start_link(__MODULE__, [marathon_url: marathon_url], [])
HTTPoison.get!(marathon_url <> "/v2/events", %{"Accept": "text/event-stream"}, stream_to: pid, recv_timeout: :infinity)
{:ok, pid}
GenServer.start_link(__MODULE__, args, name: @name)
end

def init(args) do
state = reset_retries(args)
GenServer.cast(@name, :connect)
{:ok, state}
end

def handle_cast(:connect, state) do
retries = Keyword.get(state, :retries)
:timer.sleep(delay_seconds(retries))
marathon_url = Keyword.get(state, :marathon_url)
case connect_to_marathon(marathon_url) do
{:ok, _} ->
new_state = reset_retries(state)
{:noreply, new_state}
{:error, %HTTPoison.Error{reason: reason}} ->
Logger.warn "Failed to connect to Marathon: #{inspect(reason)}"
retry_connect(state)
{:error, reason} ->
Logger.warn "Failed to connect to Marathon: #{inspect(reason)}"
retry_connect(state)
end
end

def handle_cast(_req, state) do
{:noreply, state}
end

defp delay_seconds(retries) do
back_off = round(:math.pow(2, retries))
:timer.seconds(back_off)
end

defp increment_retries(state) do
Keyword.update(state, :retries, 0, fn r -> r + 1 end)
end

defp reset_retries(state) do
Keyword.put(state, :retries, 0)
end

defp retry_connect(state) do
new_state = increment_retries(state)
GenServer.cast(@name, :connect)
{:noreply, new_state}
end

defp connect_to_marathon(marathon_url) do
HTTPoison.get(marathon_url <> "/v2/events", %{"Accept": "text/event-stream"}, stream_to: self, recv_timeout: 15_000)
end

@doc """
Expand Down Expand Up @@ -122,8 +171,8 @@ defmodule Heimdall.Marathon.BingeWatch do
`handle_info/2` handles responses streamed in from Marathon
If a response from Marathon gives back anything other than 200,
or if there is an error connecting, BingeWatch will stop with
the reason.
or if there is an error connecting, BingeWatch will attempt to
reconnect with exponential back-off.
Any chunked response other than a carriage return (which is used
as a keep-alive) from Marathon will trigger a reload of the
Expand All @@ -132,11 +181,14 @@ defmodule Heimdall.Marathon.BingeWatch do
Also other message to `handle_info/2` will be ignored.
"""
def handle_info(%HTTPoison.AsyncStatus{code: 200}, state) do
{:noreply, state}
Logger.info "Successfully connected to Marathon stream"
new_state = reset_retries(state)
{:noreply, new_state}
end

def handle_info(%HTTPoison.AsyncStatus{code: status}, state) do
{:stop, "Got error code from Marathon: " <> status, state}
Logger.warn "Got error status code from Marathon: #{status}"
retry_connect(state)
end

def handle_info(%HTTPoison.AsyncChunk{chunk: "\r\n"}, state) do
Expand All @@ -150,16 +202,22 @@ defmodule Heimdall.Marathon.BingeWatch do
{:ok, _routes} ->
{:noreply, state}
{:error, reason} ->
Logger.warn "Creating routes failed: #{reason}"
Logger.warn "Creating routes failed: #{inspect(reason)}"
{:noreply, state}
_ ->
Logger.warn "Creating routes failed for unknown reason"
{:noreply, state}
end
end

def handle_info(%HTTPoison.AsyncEnd{}, state) do
Logger.warn "Disconnected from Marathon, attempting to reconnect now"
retry_connect(state)
end

def handle_info(%HTTPoison.Error{reason: reason}, state) do
{:stop, reason, state}
Logger.warn "Got error from Marathon stream: #{inspect(reason)}"
retry_connect(state)
end

def handle_info(_msg, state) do
Expand Down
7 changes: 4 additions & 3 deletions lib/marathon/binge_watch/supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
defmodule Heimdall.Marathon.BingeWatch.Supervisor do
use Supervisor

@name Heimdall.Marathon.BingeWatch.Supervisor

def start_link(args) do
Supervisor.start_link(__MODULE__, args)
Supervisor.start_link(__MODULE__, args, name: @name)
end

def init(args) do
marathon_url = Keyword.get(args, :marathon_url)
children = [
worker(Heimdall.Marathon.BingeWatch, [[marathon_url: marathon_url]], restart: :permanent)
worker(Heimdall.Marathon.BingeWatch, [args], restart: :permanent)
]

supervise(children, strategy: :one_for_one, max_retries: :infinity, max_seconds: 1)
Expand Down

0 comments on commit 886925a

Please sign in to comment.