Skip to content

Commit

Permalink
Merge 90f7d77 into 8b4697d
Browse files Browse the repository at this point in the history
  • Loading branch information
svrdlans committed Sep 28, 2018
2 parents 8b4697d + 90f7d77 commit 6eacdab
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions lib/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Extreme.Listener do
#for indexed stream we need to follow push.link.event_number, otherwise push.event.event_number
event_number = push.link.event_number
DB.in_transaction fn ->
Logger.info "Do some processing of event #{inspect push.event.event_type}"
Logger.info fn -> "Do some processing of event #{inspect push.event.event_type}" end
:ok = push.event.data
|> :erlang.binary_to_term
|> process_event(push.event.event_type)
Expand All @@ -25,12 +25,12 @@ defmodule Extreme.Listener do
end
# This override is optional
defp caught_up, do: Logger.debug("We are up to date. YEEEY!!!")
defp caught_up, do: Logger.debug(fn -> "We are up to date. YEEEY!!!" end)
end
defmodule MyApp.MyProcessor do
def process_event(data, "Elixir.MyApp.Events.PersonCreated") do
Logger.debug "Doing something with #{inspect data}"
Logger.debug fn -> "Doing something with #{inspect data}" end
:ok
end
def process_event(_, _), do: :ok # Just acknowledge events we are not interested in
Expand Down Expand Up @@ -154,11 +154,11 @@ defmodule Extreme.Listener do

ref = Process.monitor(subscription)

Logger.info(
Logger.info(fn ->
"Listener subscribed to stream #{state.stream_name}. Start processing live events from event no: #{
last_event + 1
}"
)
end)

{:ok,
%{
Expand All @@ -182,11 +182,11 @@ defmodule Extreme.Listener do

ref = Process.monitor(subscription)

Logger.info(
Logger.info(fn ->
"Listener patching from stream #{state.stream_name} from event no: #{last_event + 1} until #{
patch_until
}"
)
end)

{:ok,
%{
Expand All @@ -201,15 +201,15 @@ defmodule Extreme.Listener do

def handle_call(:pause, _from, state) do
true = Process.exit(state.subscription, :pause)
Logger.info("Pausing listening stream #{state.stream_name}")
Logger.info(fn -> "Pausing listening stream #{state.stream_name}" end)

{:reply, {:ok, state.last_event},
%{state | subscription: nil, subscription_ref: nil, mode: :pause}}
end

def handle_call(:resume, _from, state) do
GenServer.cast(self(), :subscribe)
Logger.info("Resuming listening stream #{state.stream_name}")
Logger.info(fn -> "Resuming listening stream #{state.stream_name}" end)
{:reply, :ok, state}
end

Expand All @@ -227,13 +227,17 @@ defmodule Extreme.Listener do

def handle_info({:DOWN, ref, :process, _pid, reason}, %{subscription_ref: ref} = state)
when reason in [:pause, :done] do
Logger.info("Subscription to stream #{state.stream_name} is #{inspect(reason)}")
Logger.info(fn -> "Subscription to stream #{state.stream_name} is #{inspect(reason)}" end)
{:noreply, %{state | subscription: nil, subscription_ref: nil, mode: reason}}
end

def handle_info({:DOWN, ref, :process, _pid, _reason}, %{subscription_ref: ref} = state) do
reconnect_delay = 1_000
Logger.warn("Subscription to EventStore is down. Will retry in #{reconnect_delay} ms.")

Logger.warn(fn ->
"Subscription to EventStore is down. Will retry in #{reconnect_delay} ms."
end)

:timer.sleep(reconnect_delay)
GenServer.cast(self(), :subscribe)
{:noreply, state}
Expand Down Expand Up @@ -287,7 +291,7 @@ defmodule Extreme.Listener do
)
end

def caught_up, do: Logger.debug("We are up to date")
def caught_up, do: Logger.debug(fn -> "We are up to date" end)
def register_patching_start(_, _, _), do: {:error, :not_implemented}
def patching_done(_), do: {:error, :not_implemented}
def process_patch(_, _), do: {:error, :not_implemented}
Expand Down

0 comments on commit 6eacdab

Please sign in to comment.