Skip to content

Commit

Permalink
Refactor subscription mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
burmajam committed Jun 24, 2024
1 parent 945c089 commit af76d6c
Show file tree
Hide file tree
Showing 25 changed files with 355 additions and 256 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
elixir: ['1.11.4', '1.12.3']
erlang: ['22.3', '23.3', '24.1']
elixir: ['1.15', '1.16']
erlang: ['24', '25', '26']

services:
es:
Expand Down Expand Up @@ -62,8 +62,8 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
elixir: ['1.11.4', '1.12.3']
erlang: ['22.3', '23.3', '24.1']
elixir: ['1.15', '1.16']
erlang: ['24', '25', '26']

services:
es:
Expand Down Expand Up @@ -108,8 +108,8 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
elixir: ['1.12.3', '1.13', '1.14', '1.15', '1.16', '1.17']
erlang: ['24.3', '25.3', '26.3', '27.0']
elixir: ['1.15', '1.16']
erlang: ['24', '25', '26']

services:
es:
Expand Down
4 changes: 2 additions & 2 deletions lib/extreme.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ defmodule Extreme do
def subscribe_producer(producer),
do: Extreme.EventProducer.subscribe(producer)

@spec unsubscribe_producer(producer :: pid()) :: :unsubscribed
@spec unsubscribe_producer(producer :: pid()) :: :ok
def unsubscribe_producer(producer),
do: Extreme.EventProducer.unsubscribe(producer)

Expand Down Expand Up @@ -131,7 +131,7 @@ defmodule Extreme do
@doc """
TODO
"""
@callback unsubscribe(subscription :: pid()) :: :unsubscribed
@callback unsubscribe(subscription :: pid()) :: :ok

@doc """
TODO
Expand Down
8 changes: 4 additions & 4 deletions lib/extreme/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Extreme.Connection do
|> GenServer.cast({:execute, message})
end

@impl true
@impl GenServer
def init({base_name, configuration}) do
GenServer.cast(self(), {:connect, configuration, 1})

Expand All @@ -30,7 +30,7 @@ defmodule Extreme.Connection do
{:ok, state}
end

@impl true
@impl GenServer
def handle_cast({:connect, configuration, attempt}, state) do
configuration
|> _connect(attempt)
Expand All @@ -57,7 +57,7 @@ defmodule Extreme.Connection do
end
end

@impl true
@impl GenServer
def handle_info({:tcp, socket, pkg}, %State{socket: socket} = state) do
{:ok, state} = Impl.receive_package(pkg, state)
{:noreply, state}
Expand All @@ -66,7 +66,7 @@ defmodule Extreme.Connection do
def handle_info({:tcp_closed, _port}, state),
do: {:stop, :tcp_closed, state}

@impl true
@impl GenServer
def terminate(reason, state) do
Logger.warning("[Extreme] Connection terminated: #{inspect(reason)}")
RequestManager.kill_all_subscriptions(state.base_name)
Expand Down
10 changes: 5 additions & 5 deletions lib/extreme/connection_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ defmodule Extreme.ConnectionImpl do

def receive_package(pkg, %State{socket: socket, received_data: received_data} = state) do
:inet.setopts(socket, active: :once)
state = _process_package(state, received_data <> pkg)
state = _process_package(received_data <> pkg, state)
{:ok, state}
end

defp _process_package(
state,
<<message_length::32-unsigned-little-integer, content::binary-size(message_length),
rest::binary>>
rest::binary>>,
%State{} = state
) do
# Handle binary data containing zero, one or many messages
# All messages start with a 32 bit unsigned little endian integer of the content length + a binary body of that size
:ok = RequestManager.process_server_message(state.base_name, content)
_process_package(state, rest)
_process_package(rest, state)
end

# No full message left, keep state in GenServer to reprocess once more data arrives
defp _process_package(state, package_with_incomplete_message),
defp _process_package(package_with_incomplete_message, %State{} = state),
do: %{state | received_data: package_with_incomplete_message}
end
12 changes: 6 additions & 6 deletions lib/extreme/event_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Extreme.EventProducer do
do: GenServer.cast(pid, :subscribe)

def unsubscribe(pid),
do: GenServer.call(pid, :unsubscribe)
do: GenServer.cast(pid, :unsubscribe)

def on_sync_event(pid, event, timeout),
do: GenServer.call(pid, {:on_sync_event, event}, timeout)
Expand Down Expand Up @@ -56,11 +56,6 @@ defmodule Extreme.EventProducer do
{:reply, response, state}
end

def handle_call(:unsubscribe, _from, %State{} = state) do
response = EventBuffer.unsubscribe(state.buffer_pid)
{:reply, response, state}
end

def handle_call(:subscription_status, _from, %State{} = state) do
response = EventBuffer.subscription_status(state.buffer_pid)
{:reply, response, state}
Expand Down Expand Up @@ -92,4 +87,9 @@ defmodule Extreme.EventProducer do

{:noreply, state}
end

def handle_cast(:unsubscribe, %State{} = state) do
:ok = EventBuffer.unsubscribe(state.buffer_pid)
{:noreply, state}
end
end
66 changes: 41 additions & 25 deletions lib/extreme/event_producer/event_buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Extreme.EventProducer.EventBuffer do
do: GenServer.cast(pid, {:ack, response})

def unsubscribe(pid),
do: GenServer.call(pid, :unsubscribe)
do: GenServer.cast(pid, :unsubscribe)

def subscribe(pid),
do: GenServer.cast(pid, :subscribe)
Expand Down Expand Up @@ -61,11 +61,31 @@ defmodule Extreme.EventProducer.EventBuffer do
end

@impl GenServer
def handle_cast(:subscribe, state) do
def handle_cast(:unsubscribe, %State{subscription: nil} = state),
do: {:noreply, state}

def handle_cast(:unsubscribe, %State{} = state) do
Logger.debug("Received `:unsubscribe` request")
:ok = Subscription.unsubscribe(state.subscription)
state = %State{state | subscription: nil, subscription_ref: nil, status: :disconnected}

{:noreply, state}
end

def handle_cast(:subscribe, %{status: :disconnected} = state) do
Logger.debug(
"Subscribing to #{state.stream} starting with #{state.last_buffered_event_number}"
)

handle_cast(:resubscribe, state)
end

def handle_cast(:subscribe, %{} = state) do
Logger.warning("(noop) Subscribe attempted while in #{state.status} status")
{:noreply, state}
end

def handle_cast(:resubscribe, %{} = state) do
{:ok, subscription} =
Extreme.RequestManager.read_and_stay_subscribed(
state.base_name,
Expand Down Expand Up @@ -97,17 +117,6 @@ defmodule Extreme.EventProducer.EventBuffer do
end

@impl GenServer
def handle_call(:unsubscribe, _from, %State{subscription: nil} = state),
do: {:reply, :unsubscribed, state}

def handle_call(:unsubscribe, _from, %State{} = state) do
Logger.debug("Received `:unsubscribe` request")
response = Subscription.unsubscribe(state.subscription)
state = %State{state | subscription: nil, subscription_ref: nil, status: :disconnected}

{:reply, response, state}
end

def handle_call(:subscription_status, _from, %State{} = state),
do: {:reply, state.status, state}

Expand All @@ -126,19 +135,24 @@ defmodule Extreme.EventProducer.EventBuffer do
if buffer_size == 1,
do: :ok = EventProducer.on_async_event(state.producer_pid, event)

response =
state = %State{
state
| buffered_events: buffered_events,
last_buffered_event_number: event_number
}

{response, state} =
if buffer_size >= state.max_buffered do
Logger.warning(
Logger.info(
"Event buffer is full (#{inspect(buffer_size)}). Turning off subscription on #{state.stream}"
)

:stop
{:stop, %State{state | subscription: nil, subscription_ref: nil, status: :paused}}
else
:ok
{:ok, state}
end

{:reply, response,
%State{state | buffered_events: buffered_events, last_buffered_event_number: event_number}}
{:reply, response, state}
end

def handle_call({:on_event, _event}, _from, %State{} = state) do
Expand All @@ -159,7 +173,6 @@ defmodule Extreme.EventProducer.EventBuffer do
%State{subscription: subscription, subscription_ref: ref} = state
)
when reason in [
:processing_of_event_requested_stopping_subscription,
:processing_of_read_events_failed,
:processing_of_buffered_events_failed,
:unsubscribed,
Expand All @@ -171,7 +184,7 @@ defmodule Extreme.EventProducer.EventBuffer do

def handle_info(
{:DOWN, _ref, :process, _subscription, {:shutdown, _reason}},
%State{subscription: nil, subscription_ref: nil, status: :disconnected} = state
%State{subscription: nil, subscription_ref: nil} = state
) do
# we are already aware of that
{:noreply, state}
Expand All @@ -185,7 +198,7 @@ defmodule Extreme.EventProducer.EventBuffer do
)

:timer.sleep(reconnect_delay)
GenServer.cast(self(), :subscribe)
GenServer.cast(self(), :resubscribe)
{:noreply, %State{state | subscription: nil, subscription_ref: nil, status: :paused}}
end

Expand All @@ -202,11 +215,14 @@ defmodule Extreme.EventProducer.EventBuffer do
{:ok, event_number} = _get_event_number(event)
{:reply, :ok, %State{state | last_buffered_event_number: event_number}}

{:ok, event_number} ->
{:ok, event_number} when is_integer(event_number) ->
{:reply, :ok, %State{state | last_buffered_event_number: event_number}}

:stop ->
{:reply, :stop, %State{state | status: :paused}}
:ok = Subscription.unsubscribe(state.subscription)

{:reply, :stop,
%State{state | subscription: nil, subscription_ref: nil, status: :disconnected}}
end
end

Expand Down Expand Up @@ -236,7 +252,7 @@ defmodule Extreme.EventProducer.EventBuffer do
# in cahtching up mode while we still have buffered events
if state.status == :paused and :queue.len(buffered_events) == 0 do
Logger.info("Resuming subscription on #{state.stream}")
:ok = GenServer.cast(self(), :subscribe)
:ok = GenServer.cast(self(), :resubscribe)
end

{:noreply, %State{state | buffered_events: buffered_events}}
Expand Down
18 changes: 5 additions & 13 deletions lib/extreme/persistent_subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ defmodule Extreme.PersistentSubscription do
end
def handle_call(:unsubscribe, _from, state) do
:unsubscribed = MyExtremeClientModule.unsubscribe(state.subscription_pid)
:ok = MyExtremeClientModule.unsubscribe(state.subscription_pid)
{:reply, :ok, state}
end
Expand Down Expand Up @@ -218,7 +218,7 @@ defmodule Extreme.PersistentSubscription do
end

@doc false
@impl true
@impl GenServer
def init({base_name, correlation_id, subscriber, stream, group, allowed_in_flight_messages}) do
state = %State{
base_name: base_name,
Expand All @@ -235,13 +235,12 @@ defmodule Extreme.PersistentSubscription do
{:ok, state}
end

@impl true
def handle_call(:unsubscribe, from, state) do
:ok = Shared.unsubscribe(from, state)
@impl GenServer
def handle_cast(:unsubscribe, state) do
:ok = Shared.unsubscribe(state)
{:noreply, state}
end

@impl true
def handle_cast(:subscribe, state) do
Msg.ConnectToPersistentSubscription.new(
subscription_id: state.group,
Expand Down Expand Up @@ -279,13 +278,6 @@ defmodule Extreme.PersistentSubscription do
{:noreply, state}
end

def handle_cast(:unsubscribe, state) do
Msg.UnsubscribeFromStream.new()
|> cast_request_manager(state.base_name, state.correlation_id)

{:noreply, state}
end

defp cast_request_manager(message, base_name, correlation_id) do
base_name
|> RequestManager._name()
Expand Down
9 changes: 4 additions & 5 deletions lib/extreme/reading_subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Extreme.ReadingSubscription do
)
end

@impl true
@impl GenServer
def init(
{base_name, correlation_id, subscriber,
{stream, from_event_number, per_page, resolve_link_tos, require_master, ack_timeout}}
Expand Down Expand Up @@ -50,13 +50,12 @@ defmodule Extreme.ReadingSubscription do
{:ok, %State{state | read_until: read_until}}
end

@impl true
def handle_call(:unsubscribe, from, state) do
:ok = Shared.unsubscribe(from, state)
@impl GenServer
def handle_cast(:unsubscribe, state) do
:ok = Shared.unsubscribe(state)
{:noreply, state}
end

@impl true
def handle_cast({:process_push, fun}, %{status: :subscribed} = state),
do: Shared.process_push(fun, state)

Expand Down
1 change: 1 addition & 0 deletions lib/extreme/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule Extreme.Request do

def prepare(protobuf_msg, credentials, correlation_id) do
cmd = protobuf_msg.__struct__
# IO.inspect([correlation_id, cmd], label: "Sending to ES")
data = cmd.encode(protobuf_msg)
_to_binary(cmd, correlation_id, credentials, data)
end
Expand Down
Loading

0 comments on commit af76d6c

Please sign in to comment.