Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug with catch-up all streams subscription where the checkpoint is not committed for hard deleted streams #238

Merged
merged 3 commits into from May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,10 @@
- Improve performance of appending events under normal and degraded network conditions ([#230](https://github.com/commanded/eventstore/pull/230)).
- Subscription checkpoint tuning ([#237](https://github.com/commanded/eventstore/pull/237)).

## Bug fixes

- Fix bug with catch-up all streams subscription where the checkpoint is not committed for hard deleted streams ([#238](https://github.com/commanded/eventstore/pull/238)).

## v1.2.3

- Add `:configure` to postgrex connection options ([#233](https://github.com/commanded/eventstore/pull/233)).
Expand Down
148 changes: 75 additions & 73 deletions lib/event_store/subscriptions/subscription_fsm.ex
Expand Up @@ -38,13 +38,7 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do

defstate initial do
defevent subscribe, data: %SubscriptionState{transient: true} = data do
data = %SubscriptionState{
data
| queue_size: 0,
partitions: %{},
processed_event_numbers: MapSet.new(),
checkpoints_pending: 0
}
data = SubscriptionState.reset_event_tracking(data)

with :ok <- subscribe_to_events(data) do
last_seen = data.start_from
Expand All @@ -68,13 +62,7 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do

defevent subscribe,
data: %SubscriptionState{} = data do
data = %SubscriptionState{
data
| queue_size: 0,
partitions: %{},
processed_event_numbers: MapSet.new(),
checkpoints_pending: 0
}
data = SubscriptionState.reset_event_tracking(data)

with {:ok, subscription} <- create_subscription(data),
{:ok, lock_ref} <- try_acquire_exclusive_lock(data, subscription),
Expand Down Expand Up @@ -290,14 +278,8 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do

defevent disconnect(lock_ref), data: %SubscriptionState{lock_ref: lock_ref} = data do
data =
%SubscriptionState{
data
| lock_ref: nil,
queue_size: 0,
partitions: %{},
processed_event_numbers: MapSet.new(),
checkpoints_pending: 0
}
%SubscriptionState{data | lock_ref: nil}
|> SubscriptionState.reset_event_tracking()
|> purge_in_flight_events()

next_state(:disconnected, data)
Expand Down Expand Up @@ -378,15 +360,14 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
{:ok, %Subscriber{} = subscriber} ->
%Subscriber{in_flight: in_flight} = subscriber

# Prepend in-flight events for the removed subscriber to the pending
# event queue so they can be sent to another available subscriber.
# Prepend in-flight events sent to the removed subscriber back into the
# pending queue so they can be sent to another available subscriber.

data =
in_flight
|> Enum.sort_by(fn %RecordedEvent{event_number: event_number} -> -event_number end)
|> Enum.reduce(data, fn event, data ->
enqueue_event(data, event, &:queue.in_r/2)
end)
|> Enum.reduce(data, fn event, data -> enqueue_event(data, event, &:queue.in_r/2) end)
|> SubscriptionState.track_in_flight(in_flight)

%SubscriptionState{data | subscribers: Map.delete(subscribers, subscriber_pid)}

Expand Down Expand Up @@ -421,10 +402,10 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
%SubscriptionState{data | last_received: max(last_received, event_number)}
end

defp track_last_sent(%SubscriptionState{} = data, event_number) do
%SubscriptionState{last_sent: last_sent} = data

%SubscriptionState{data | last_sent: max(last_sent, event_number)}
defp track_sent(%SubscriptionState{} = data, event_number) do
data
|> SubscriptionState.track_last_sent(event_number)
|> SubscriptionState.track_in_flight(event_number)
end

defp first_event_number([%RecordedEvent{event_number: event_number} | _]), do: event_number
Expand Down Expand Up @@ -489,22 +470,20 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
defp enqueue_events(%SubscriptionState{} = data, []), do: data

defp enqueue_events(%SubscriptionState{} = data, [event | events]) do
%SubscriptionState{processed_event_numbers: processed_event_numbers} = data
%SubscriptionState{acknowledged_event_numbers: acknowledged_event_numbers} = data
%RecordedEvent{event_number: event_number} = event

data =
case selected?(event, data) do
true ->
# Unfiltered event, enqueue to send to a subscriber
enqueue_event(data, event)

false ->
# Filtered event, don't send to subscriber, but track it as processed.
%SubscriptionState{
data
| processed_event_numbers: MapSet.put(processed_event_numbers, event_number)
}
|> track_last_sent(event_number)
if selected?(event, data) do
# Unfiltered event, enqueue to send to a subscriber
enqueue_event(data, event)
else
# Filtered event, don't send to subscriber, but track it as ack'd.
%SubscriptionState{
data
| acknowledged_event_numbers: MapSet.put(acknowledged_event_numbers, event_number)
}
|> track_sent(event_number)
end

data
Expand Down Expand Up @@ -540,13 +519,13 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
|> Enum.reduce(data, fn {partition_key, _pending_events}, data ->
notify_partition_subscriber(data, partition_key)
end)
|> checkpoint_last_seen()
|> checkpoint_acknowledged()
end

defp peek_event_number(pending_events) do
case :queue.peek(pending_events) do
{:value, %RecordedEvent{event_number: event_number}} -> event_number
_ -> nil
:empty -> nil
end
end

Expand Down Expand Up @@ -577,7 +556,7 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
subscribers: Map.put(subscribers, subscriber_pid, subscriber),
queue_size: max(queue_size - 1, 0)
}
|> track_last_sent(event_number)
|> track_sent(event_number)
|> notify_partition_subscriber(partition_key, [{subscriber_pid, event} | events_to_send])
else
_ ->
Expand Down Expand Up @@ -644,21 +623,23 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
defp map(events, _mapper), do: events

defp ack_events(%SubscriptionState{} = data, ack, subscriber_pid) do
%SubscriptionState{subscribers: subscribers, processed_event_numbers: processed_event_numbers} =
data
%SubscriptionState{
subscribers: subscribers,
acknowledged_event_numbers: acknowledged_event_numbers
} = data

with {:ok, subscriber} <- subscriber_by_pid(subscribers, subscriber_pid),
{:ok, subscriber, acknowledged_events} <- Subscriber.acknowledge(subscriber, ack) do
processed_event_numbers =
acknowledged_event_numbers =
acknowledged_events
|> Enum.map(& &1.event_number)
|> Enum.reduce(processed_event_numbers, &MapSet.put(&2, &1))
|> Enum.reduce(acknowledged_event_numbers, &MapSet.put(&2, &1))

data =
%SubscriptionState{
data
| subscribers: Map.put(subscribers, subscriber_pid, subscriber),
processed_event_numbers: processed_event_numbers
acknowledged_event_numbers: acknowledged_event_numbers
}
|> notify_subscribers()

Expand All @@ -673,32 +654,50 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
end
end

defp checkpoint_last_seen(%SubscriptionState{} = data, persist \\ false) do
defp checkpoint_acknowledged(data, persist \\ false)

defp checkpoint_acknowledged(%SubscriptionState{in_flight_event_numbers: []} = data, persist) do
if persist, do: maybe_persist_checkpoint(data), else: data
end

defp checkpoint_acknowledged(%SubscriptionState{} = data, persist) do
%SubscriptionState{
processed_event_numbers: processed_event_numbers,
checkpoint_after: checkpoint_after,
checkpoint_threshold: checkpoint_threshold,
checkpoint_timer_ref: checkpoint_timer_ref,
acknowledged_event_numbers: acknowledged_event_numbers,
checkpoints_pending: checkpoints_pending,
last_ack: last_ack
in_flight_event_numbers: [ack | in_flight_event_numbers]
} = data

ack = last_ack + 1
if MapSet.member?(acknowledged_event_numbers, ack) do
%SubscriptionState{
data
| acknowledged_event_numbers: MapSet.delete(acknowledged_event_numbers, ack),
in_flight_event_numbers: in_flight_event_numbers,
checkpoints_pending: checkpoints_pending + 1,
last_ack: ack
}
|> checkpoint_acknowledged(true)
else
if persist, do: maybe_persist_checkpoint(data), else: data
end
end

cond do
MapSet.member?(processed_event_numbers, ack) ->
%SubscriptionState{
data
| processed_event_numbers: MapSet.delete(processed_event_numbers, ack),
checkpoints_pending: checkpoints_pending + 1,
last_ack: ack
}
|> checkpoint_last_seen(true)
defp maybe_persist_checkpoint(%SubscriptionState{transient: true} = data) do
%SubscriptionState{data | checkpoints_pending: 0}
end

persist and checkpoints_pending >= checkpoint_threshold ->
defp maybe_persist_checkpoint(%SubscriptionState{transient: false} = data) do
%SubscriptionState{
checkpoint_after: checkpoint_after,
checkpoints_pending: checkpoints_pending,
checkpoint_threshold: checkpoint_threshold,
checkpoint_timer_ref: checkpoint_timer_ref
} = data

cond do
checkpoints_pending >= checkpoint_threshold ->
persist_checkpoint(data)

persist and checkpoint_after > 0 ->
checkpoint_after > 0 ->
if checkpoint_timer_ref, do: Process.cancel_timer(checkpoint_timer_ref)

checkpoint_timer_ref = Process.send_after(self(), :checkpoint, checkpoint_after)
Expand All @@ -710,18 +709,21 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
end
end

defp persist_checkpoint(%SubscriptionState{} = data) do
defp persist_checkpoint(%SubscriptionState{transient: true} = data) do
%SubscriptionState{data | checkpoints_pending: 0}
end

defp persist_checkpoint(%SubscriptionState{transient: false} = data) do
%SubscriptionState{
conn: conn,
schema: schema,
stream_uuid: stream_uuid,
subscription_name: subscription_name,
transient: transient,
last_ack: last_ack,
checkpoints_pending: checkpoints_pending
} = data

if checkpoints_pending > 0 and !transient do
if checkpoints_pending > 0 do
Storage.Subscription.ack_last_seen_event(conn, stream_uuid, subscription_name, last_ack,
schema: schema
)
Expand Down
48 changes: 47 additions & 1 deletion lib/event_store/subscriptions/subscription_state.ex
@@ -1,6 +1,9 @@
defmodule EventStore.Subscriptions.SubscriptionState do
@moduledoc false

alias EventStore.RecordedEvent
alias __MODULE__

defstruct [
:conn,
:event_store,
Expand All @@ -26,7 +29,50 @@ defmodule EventStore.Subscriptions.SubscriptionState do
checkpoints_pending: 0,
subscribers: %{},
partitions: %{},
processed_event_numbers: MapSet.new(),
acknowledged_event_numbers: MapSet.new(),
in_flight_event_numbers: [],
transient: false
]

def reset_event_tracking(%SubscriptionState{} = state) do
%SubscriptionState{
state
| queue_size: 0,
partitions: %{},
acknowledged_event_numbers: MapSet.new(),
in_flight_event_numbers: [],
checkpoints_pending: 0
}
end

def track_in_flight(%SubscriptionState{} = state, event_number) when is_number(event_number) do
%SubscriptionState{in_flight_event_numbers: in_flight_event_numbers} = state

in_flight_event_numbers =
Enum.sort([event_number | in_flight_event_numbers])
|> Enum.uniq()

%SubscriptionState{state | in_flight_event_numbers: in_flight_event_numbers}
end

def track_in_flight(%SubscriptionState{} = state, []), do: state

def track_in_flight(%SubscriptionState{} = state, events) when is_list(events) do
%SubscriptionState{in_flight_event_numbers: in_flight_event_numbers} = state

in_flight_event_numbers =
events
|> Enum.map(fn %RecordedEvent{event_number: event_number} -> event_number end)
|> Enum.concat(in_flight_event_numbers)
|> Enum.sort()
|> Enum.uniq()

%SubscriptionState{state | in_flight_event_numbers: in_flight_event_numbers}
end

def track_last_sent(%SubscriptionState{} = data, event_number) when is_number(event_number) do
%SubscriptionState{last_sent: last_sent} = data

%SubscriptionState{data | last_sent: max(last_sent, event_number)}
end
end
2 changes: 1 addition & 1 deletion test/subscriptions/subscription_acknowledgement_test.exs
Expand Up @@ -60,7 +60,7 @@ defmodule EventStore.Subscriptions.SubscriptionAcknowledgementTest do

test "should checkpoint after inactivity", %{conn: conn} do
{:ok, subscription} =
subscribe_to_all_streams(buffer_size: 3, checkpoint_after: 10, checkpoint_threshold: 100)
subscribe_to_all_streams(buffer_size: 3, checkpoint_after: 25, checkpoint_threshold: 100)

:ok = append_to_stream("stream1", 3)
:ok = receive_and_ack(subscription, "stream1", [1, 2, 3])
Expand Down