Skip to content

Commit

Permalink
Add event_producer
Browse files Browse the repository at this point in the history
  • Loading branch information
burmajam committed Jun 17, 2024
1 parent 8ed68c6 commit 41e76f6
Show file tree
Hide file tree
Showing 14 changed files with 1,351 additions and 70 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Changelog for extreme v1.1.1

- Add event_producer functionality for module that uses `Extreme`.

# Changelog for extreme v1.1.0

- Rename `RequestManager._unregister_subscription/2` (removed leading `_`)
Expand Down
22 changes: 22 additions & 0 deletions lib/extreme.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,28 @@ defmodule Extreme do
)
end

@spec start_event_producer(stream :: String.t(), subscriber :: pid(), opts :: Keyword.t()) ::
Supervisor.on_start_child()
def start_event_producer(stream, subscriber, opts \\ []) do
Extreme.EventProducer.Supervisor.start_event_producer(
__MODULE__,
[{:stream, stream}, {:subscriber, subscriber} | opts]
)
end

@spec subscribe_producer(producer :: pid()) :: :ok
def subscribe_producer(producer),
do: Extreme.EventProducer.subscribe(producer)

@spec unsubscribe_producer(producer :: pid()) :: :unsubscribed
def unsubscribe_producer(producer),
do: Extreme.EventProducer.unsubscribe(producer)

@spec producer_subscription_status(producer :: pid()) ::
:disconnected | :catching_up | :live | :paused
def producer_subscription_status(producer),
do: Extreme.EventProducer.subscription_status(producer)

def unsubscribe(subscription) when is_pid(subscription),
do: Extreme.Subscription.unsubscribe(subscription)

Expand Down
95 changes: 95 additions & 0 deletions lib/extreme/event_producer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
defmodule Extreme.EventProducer do
use GenServer
require Logger
alias Extreme.EventProducer.EventBuffer
alias Extreme.SharedSubscription, as: Shared

defmodule State do
defstruct ~w(base_name subscriber ack_timeout buffer_pid)a
end

def start_link(base_name, opts) do
GenServer.start_link(
__MODULE__,
{base_name, opts}
)
end

def subscribe(pid),
do: GenServer.cast(pid, :subscribe)

def unsubscribe(pid),
do: GenServer.call(pid, :unsubscribe)

def on_sync_event(pid, event, timeout),
do: GenServer.call(pid, {:on_sync_event, event}, timeout)

def on_async_event(pid, event),
do: GenServer.cast(pid, {:on_async_event, event})

def send_to_subscriber(pid, msg),
do: GenServer.cast(pid, {:send_to_subscriber, msg})

def subscription_status(pid),
do: GenServer.call(pid, :subscription_status)

@impl GenServer
def init({base_name, opts}) do
{subscriber, opts} = Keyword.pop!(opts, :subscriber)
ack_timeout = Keyword.get(opts, :ack_timeout, 5_000)
{:ok, buffer_pid} = EventBuffer.start_link(base_name, self(), opts)

state = %State{
base_name: base_name,
subscriber: subscriber,
buffer_pid: buffer_pid,
ack_timeout: ack_timeout
}

{:ok, state}
end

@impl GenServer
def handle_call({:on_sync_event, event}, _from, %State{} = state) do
Logger.debug("Sending sync event")
response = Shared.on_event(state.subscriber, event, state.ack_timeout)
{:reply, response, state}
end

def handle_call(:unsubscribe, _from, %State{} = state) do
response = EventBuffer.unsubscribe(state.buffer_pid)
{:reply, response, state}
end

def handle_call(:subscription_status, _from, %State{} = state) do
response = EventBuffer.subscription_status(state.buffer_pid)
{:reply, response, state}
end

@impl GenServer
def handle_cast({:on_async_event, event}, %State{} = state) do
Logger.debug("Sending async event")

spawn_link(fn ->
response = Shared.on_event(state.subscriber, event, state.ack_timeout)
# Process.sleep(5)
:ok = EventBuffer.ack(state.buffer_pid, response)
end)

{:noreply, state}
end

def handle_cast({:send_to_subscriber, msg}, %State{} = state) do
Logger.debug("Proxing message #{inspect(msg)} to subscriber")
send(state.subscriber, msg)

{:noreply, state}
end

def handle_cast(:subscribe, %State{} = state) do
Logger.debug("Starting subscription")
:ok = EventBuffer.subscribe(state.buffer_pid)

{:noreply, state}
end
end
244 changes: 244 additions & 0 deletions lib/extreme/event_producer/event_buffer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
defmodule Extreme.EventProducer.EventBuffer do
use GenServer
require Logger
alias Extreme.{Subscription, EventProducer}

defmodule State do
defstruct ~w(base_name producer_pid subscription_params_fn subscription subscription_ref
stream ack_timeout buffered_events last_buffered_event_number max_buffered auto_subscribe status)a
end

def start_link(base_name, producer_pid, opts),
do: GenServer.start_link(__MODULE__, {base_name, producer_pid, opts})

def ack(pid, response),
do: GenServer.cast(pid, {:ack, response})

def unsubscribe(pid),
do: GenServer.call(pid, :unsubscribe)

def subscribe(pid),
do: GenServer.cast(pid, :subscribe)

def subscription_status(pid),
do: GenServer.call(pid, :subscription_status)

@impl GenServer
def init({base_name, producer_pid, opts}) do
stream = Keyword.fetch!(opts, :stream)
from_event_number = Keyword.get(opts, :from_event_number, -1)
per_page = Keyword.get(opts, :per_page, 100)
resolve_link_tos = Keyword.get(opts, :resolve_link_tos, true)
require_master = Keyword.get(opts, :require_master, false)
ack_timeout = Keyword.get(opts, :ack_timeout, 5_000)

max_buffered = Keyword.get(opts, :max_buffered, per_page * 2)
auto_subscribe = Keyword.get(opts, :auto_subscribe, true)

subscription_params_fn = fn from_event_number ->
{stream, from_event_number, per_page, resolve_link_tos, require_master, ack_timeout}
end

state = %State{
base_name: base_name,
producer_pid: producer_pid,
subscription_params_fn: subscription_params_fn,
stream: stream,
ack_timeout: ack_timeout,
status: :disconnected,
buffered_events: :queue.new(),
last_buffered_event_number: from_event_number,
max_buffered: max_buffered,
auto_subscribe: auto_subscribe
}

Logger.debug("Initialized event buffer #{inspect(state)}")

if auto_subscribe,
do: subscribe(self())

{:ok, state}
end

@impl GenServer
def handle_cast(:subscribe, state) do
Logger.debug(
"Subscribing to #{state.stream} starting with #{state.last_buffered_event_number}"
)

{:ok, subscription} =
Extreme.RequestManager.read_and_stay_subscribed(
state.base_name,
self(),
state.subscription_params_fn.(state.last_buffered_event_number + 1)
)

ref = Process.monitor(subscription)

{:noreply,
%{
state
| subscription: subscription,
subscription_ref: ref,
status: :catching_up
}}
end

def handle_cast({:ack, :ok}, %State{} = state),
do: _ack_ok(state)

def handle_cast({:ack, {:ok, _event_number}}, %State{} = state),
do: _ack_ok(state)

def handle_cast({:ack, :stop}, %State{} = state) do
Logger.info("Received `:stop` response from subscriber")
:ok = Subscription.unsubscribe(state.subscription)
{:noreply, %State{state | subscription: nil, subscription_ref: nil, status: :disconnected}}
end

@impl GenServer
def handle_call(:unsubscribe, _from, %State{subscription: nil} = state),
do: {:reply, :unsubscribed, state}

def handle_call(:unsubscribe, _from, %State{} = state) do
Logger.debug("Received `:unsubscribe` request")
response = Subscription.unsubscribe(state.subscription)
state = %State{state | subscription: nil, subscription_ref: nil, status: :disconnected}

{:reply, response, state}
end

def handle_call(:subscription_status, _from, %State{} = state),
do: {:reply, state.status, state}

def handle_call({:on_event, event}, _from, %State{status: :catching_up} = state) do
Logger.debug("Got event while catching up")
_sync_push_event_to_producer(event, state)
end

def handle_call({:on_event, event}, _from, %State{status: :live} = state) do
Logger.debug("Got event in live mode")

buffered_events = :queue.in(event, state.buffered_events)
buffer_size = :queue.len(buffered_events)
{:ok, event_number} = _get_event_number(event)

if buffer_size == 1,
do: :ok = EventProducer.on_async_event(state.producer_pid, event)

response =
if buffer_size >= state.max_buffered do
Logger.warning(
"Event buffer is full (#{inspect(buffer_size)}). Turning off subscription on #{state.stream}"
)

:stop
else
:ok
end

{:reply, response,
%State{state | buffered_events: buffered_events, last_buffered_event_number: event_number}}
end

def handle_call({:on_event, _event}, _from, %State{} = state) do
Logger.warning("We shouldn't get event in #{inspect(state.status)} status")
{:reply, :stop, state}
end

@impl GenServer
def handle_info(:caught_up, %State{subscription: subscription} = state)
when not is_nil(subscription) do
Logger.debug("Caught up now")
:ok = EventProducer.send_to_subscriber(state.producer_pid, :caught_up)
{:noreply, %State{state | status: :live}}
end

def handle_info(
{:DOWN, ref, :process, subscription, {:shutdown, reason}},
%State{subscription: subscription, subscription_ref: ref} = state
)
when reason in [
:processing_of_event_requested_stopping_subscription,
:processing_of_read_events_failed,
:processing_of_buffered_events_failed,
:unsubscribed,
:stream_deleted
] do
Logger.info("Subscription is disconnected: #{reason}")
{:noreply, %State{state | subscription: nil, subscription_ref: nil, status: :disconnected}}
end

def handle_info(
{:DOWN, _ref, :process, _subscription, {:shutdown, _reason}},
%State{subscription: nil, subscription_ref: nil, status: :disconnected} = state
) do
# we are already aware of that
{:noreply, state}
end

def handle_info({:DOWN, _ref, :process, _subscription, reason}, %State{} = state) do
reconnect_delay = 1_000

Logger.warning(
"Subscription to EventStore is down: #{inspect(reason)}. Will try to reconnect in #{reconnect_delay} ms."
)

:timer.sleep(reconnect_delay)
GenServer.cast(self(), :subscribe)
{:noreply, %State{state | subscription: nil, subscription_ref: nil, status: :paused}}
end

def handle_info(msg, %State{} = state) do
:ok = EventProducer.send_to_subscriber(state.producer_pid, msg)
{:noreply, state}
end

defp _sync_push_event_to_producer(event, %State{} = state) do
state.producer_pid
|> EventProducer.on_sync_event(event, state.ack_timeout)
|> case do
:ok ->
{:ok, event_number} = _get_event_number(event)
{:reply, :ok, %State{state | last_buffered_event_number: event_number}}

{:ok, event_number} ->
{:reply, :ok, %State{state | last_buffered_event_number: event_number}}

:stop ->
{:reply, :stop, %State{state | status: :paused}}
end
end

defp _get_event_number(%{link: %{event_number: event_number}}),
do: {:ok, event_number}

defp _get_event_number(%{link: nil, event: %{event_number: event_number}}),
do: {:ok, event_number}

defp _ack_ok(%State{} = state) do
{{:value, _}, buffered_events} = :queue.out(state.buffered_events)
Logger.debug("Event acked")

# push new buffered event if there is one
buffered_events
|> :queue.out()
|> case do
{{:value, event}, _} ->
:ok = EventProducer.on_async_event(state.producer_pid, event)

_ ->
:ok
end

# start subscription if status is :paused and we don't have buffered events
# I wanted to make it if it was < max/2 but applying back pressure would be hard
# in cahtching up mode while we still have buffered events
if state.status == :paused and :queue.len(buffered_events) == 0 do
Logger.info("Resuming subscription on #{state.stream}")
:ok = GenServer.cast(self(), :subscribe)
end

{:noreply, %State{state | buffered_events: buffered_events}}
end
end
Loading

0 comments on commit 41e76f6

Please sign in to comment.