Skip to content

Commit

Permalink
[Listener] Start following events from now on, ignoring old events
Browse files Browse the repository at this point in the history
  • Loading branch information
burmajam committed Nov 7, 2017
1 parent 3d9cb53 commit 7f6e939
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 15 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Changelog for Extreme v0.10.3
* Extreme.Listener - if get_last_event/1 returns `:from_now`, catching events will start from current event

# Changelog for Extreme v0.10.2
* Fix end of patching in Listener

Expand Down
16 changes: 15 additions & 1 deletion lib/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ defmodule Extreme.Listener do
use Extreme.Listener
import MyApp.MyProcessor
# returns last processed event by MyListener on stream_name, -1 if none has been processed so far
# returns last processed event by MyListener on stream_name,
# -1 if none has been processed so far, or `:from_now` if you don't care for previous events
defp get_last_event(stream_name), do: DB.get_last_event MyListener, stream_name
defp process_push(push, stream_name) do
Expand Down Expand Up @@ -105,6 +106,9 @@ defmodule Extreme.Listener do

def handle_cast(:subscribe, state) do
{:ok, state} = case get_last_event(state.stream_name) do
:from_now ->
{:ok, %{last_event_number: last_event}} = Extreme.execute state.event_store, _read_events_backward(state.stream_name)
start_live_subscription(last_event, state)
{:patch, last_event, patch_until} -> start_patching(last_event, patch_until, state)
last_event -> start_live_subscription(last_event, state)
end
Expand Down Expand Up @@ -175,6 +179,16 @@ defmodule Extreme.Listener do
end
def handle_info(_msg, state), do: {:noreply, state}

defp _read_events_backward(stream, start \\ -1, count \\ 1) do
Extreme.Msg.ReadStreamEventsBackward.new(
event_stream_id: stream,
from_event_number: start,
max_count: count,
resolve_link_tos: true,
require_master: false
)
end

def caught_up, do: Logger.debug "We are up to date"
def register_patching_start(_, _, _), do: {:error, :not_implemented}
def patching_done(_), do: {:error, :not_implemented}
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Extreme.Mixfile do

def project do
[app: :extreme,
version: "0.10.2",
version: "0.10.3",
elixir: "~> 1.3.0 or ~> 1.4.0 or ~> 1.5.0",
source_url: "https://github.com/exponentially/extreme",
description: """
Expand Down
72 changes: 59 additions & 13 deletions test/listener_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,33 @@ defmodule Extreme.ListenerTest do
defmodule PersonChangedName, do: defstruct [:name]

defmodule DB do
def start_link,
do: Agent.start_link(fn -> %{} end, name: :db)
def start_link(name \\ :db, start_from \\ -1),
do: Agent.start_link(fn -> %{start_from: start_from} end, name: name)

def get_last_event(listener, stream),
do: Agent.get(:db, fn state -> Map.get(state, {listener, stream}, -1) end)
def get_last_event(name \\ :db, listener, stream),
do: Agent.get(name, fn state -> Map.get(state, {listener, stream}, state[:start_from]) end)

def get_patch_range(listener, stream),
do: Agent.get(:db, fn state -> Map.get(state, {listener, stream}) end)
def get_patch_range(name \\ :db, listener, stream),
do: Agent.get(name, fn state -> Map.get(state, {listener, stream}) end)

def start_patching(listener, stream, from, until) do
Agent.update(:db, fn state -> Map.put(state, {listener, stream}, %{last_event: from, patch_until: until}) end)
def start_patching(name \\ :db, listener, stream, from, until) do
Agent.update(name, fn state -> Map.put(state, {listener, stream}, %{last_event: from, patch_until: until}) end)
:ok
end

def patching_done(listener, stream) do
Agent.update(:db, fn state -> Map.delete(state, {listener, stream}) end)
def patching_done(name \\ :db, listener, stream) do
Agent.update(name, fn state -> Map.delete(state, {listener, stream}) end)
:ok
end

def ack_event(listener, stream, event_number),
do: Agent.update(:db, fn state -> Map.put(state, {listener, stream}, event_number) end)
def ack_event(name \\ :db, listener, stream, event_number),
do: Agent.update(name, fn state -> Map.put(state, {listener, stream}, event_number) end)

def in_transaction(fun), do: fun.()
end

defmodule MyListener do
use Extreme.Listener
use Extreme.Listener
alias Extreme.ListenerTest.DB

defp get_last_event(stream_name) do
Expand Down Expand Up @@ -79,6 +79,24 @@ defmodule Extreme.ListenerTest do
def caught_up, do: Logger.debug("We are up to date. YEEEY!!!")
end

defmodule NewListener do
use Extreme.Listener
alias Extreme.ListenerTest.DB

defp get_last_event(stream_name) do
DB.get_last_event :ignores_old_events_db, NewListener, stream_name
end

defp process_push(push, stream_name) do
DB.in_transaction fn ->
send :test, {:processing_push, push.event.event_type, push.event.data}
DB.ack_event(:ignores_old_events_db, NewListener, stream_name, push.event.event_number)
Logger.debug "Processed event ##{push.event.event_number}"
end
{:ok, push.event.event_number}
end
end

setup do
{:ok, server} = Application.get_env(:extreme, :event_store)
|> Extreme.start_link
Expand Down Expand Up @@ -155,6 +173,34 @@ defmodule Extreme.ListenerTest do
assert DB.get_last_event(MyListener, stream) == 2
end

test "Listener doesn't process previous events but keeps listening for new ones", %{server: server} do
Logger.debug "TEST: Listener doesn't process previous events but keeps listening for new ones"
stream = to_string(UUID.uuid1)
event1 = %PersonCreated{name: "Pera Peric"}
event2 = %PersonChangedName{name: "Zika"}
{:ok, _db} = DB.start_link :ignores_old_events_db, :from_now
assert DB.get_last_event(:ignores_old_events_db, NewListener, stream) == :from_now

# write 2 events to stream
{:ok, %{result: :Success}} = Extreme.execute server, write_events(stream, [event1, event2])

# run listener and expect it NOT to read them
{:ok, _listener} = NewListener.start_link(server, stream)
refute_receive {:processing_push, _, _}
refute_receive {:processing_push, _, _}
assert DB.get_last_event(:ignores_old_events_db, NewListener, stream) == :from_now

# write one more event to stream
event3 = %PersonChangedName{name: "Laza"}
{:ok, %{result: :Success}} = Extreme.execute server, write_events(stream, [event3])

# expect that listener got new event
assert_receive {:processing_push, event_type, event}
assert event_type == "Elixir.Extreme.ListenerTest.PersonChangedName"
assert event3 == :erlang.binary_to_term(event)
assert DB.get_last_event(:ignores_old_events_db, NewListener, stream) == 2
end

test "Listener can be paused and resumed", %{server: server} do
Logger.debug "TEST: Listener can be paused and resumed"
stream = to_string(UUID.uuid1)
Expand Down

0 comments on commit 7f6e939

Please sign in to comment.