Skip to content

Commit

Permalink
Update tests to receive realtime events
Browse files Browse the repository at this point in the history
  • Loading branch information
saneery committed Jul 29, 2019
1 parent 51f6a68 commit 19d84ab
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 14 deletions.
1 change: 1 addition & 0 deletions apps/explorer/lib/explorer/chain/events/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ defmodule Explorer.Chain.Events.Listener do
{:noreply, state}
end

# sobelow_skip ["Misc.BinToTerm"]
defp decode_payload!(payload) do
payload
|> Base.decode64!()
Expand Down
20 changes: 17 additions & 3 deletions apps/explorer/lib/explorer/chain/events/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Explorer.Chain.Events.Publisher do
@moduledoc """
Publishes events related to the Chain context.
"""
alias Ecto.Adapters.SQL.Sandbox
alias Explorer.Repo

@allowed_events ~w(addresses address_coin_balances blocks block_rewards internal_transactions token_transfers transactions contract_verification_result)a
Expand All @@ -17,11 +18,12 @@ defmodule Explorer.Chain.Events.Publisher do
@spec broadcast(atom()) :: :ok
def broadcast(event_type) do
send_data(event_type)
:ok
end

defp send_data(event_type) do
payload = encode_payload({:chain_event, event_type})
Repo.query("NOTIFY chain_event, '#{payload}';")
send_notify(payload)
end

# The :catchup type of event is not being consumed right now.
Expand All @@ -31,12 +33,24 @@ defmodule Explorer.Chain.Events.Publisher do

defp send_data(event_type, broadcast_type, event_data) do
payload = encode_payload({:chain_event, event_type, broadcast_type, event_data})
Repo.query("NOTIFY chain_event, '#{payload}';")
send_notify(payload)
end

defp encode_payload(payload) do
payload
|> :erlang.term_to_binary()
|> :erlang.term_to_binary([:compressed])
|> Base.encode64()
end

defp send_notify(payload) do
fun = fn ->
Repo.query("select pg_notify('chain_event', $1::text);", [payload])
end

if Mix.env() == :test do
Sandbox.unboxed_run(Repo, fun)
else
fun.()
end
end
end
6 changes: 3 additions & 3 deletions apps/explorer/test/explorer/chain/events/publisher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Explorer.Chain.Events.PublisherTest do

Publisher.broadcast([{event_type, event_data}], broadcast_type)

assert_received {:chain_event, ^event_type, ^broadcast_type, []}
assert_receive {:chain_event, ^event_type, ^broadcast_type, []}
end

test "won't send chain_event of catchup type" do
Expand All @@ -27,7 +27,7 @@ defmodule Explorer.Chain.Events.PublisherTest do

Publisher.broadcast([{event_type, event_data}], broadcast_type)

refute_received {:chain_event, ^event_type, ^broadcast_type, []}
refute_receive {:chain_event, ^event_type, ^broadcast_type, []}
end

test "won't send event that is not allowed" do
Expand Down Expand Up @@ -59,7 +59,7 @@ defmodule Explorer.Chain.Events.PublisherTest do

Publisher.broadcast(event_type)

assert_received {:chain_event, ^event_type}
assert_receive {:chain_event, ^event_type}
end
end
end
4 changes: 2 additions & 2 deletions apps/explorer/test/explorer/chain/events/subscriber_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Explorer.Chain.Events.SubscriberTest do

Publisher.broadcast([{event_type, event_data}], broadcast_type)

assert_received {:chain_event, :blocks, :realtime, []}
assert_receive {:chain_event, :blocks, :realtime, []}
end
end

Expand All @@ -27,7 +27,7 @@ defmodule Explorer.Chain.Events.SubscriberTest do

Publisher.broadcast(event_type)

assert_received {:chain_event, :exchange_rate}
assert_receive {:chain_event, :exchange_rate}
end
end
end
12 changes: 6 additions & 6 deletions apps/explorer/test/explorer/chain/import_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -470,35 +470,35 @@ defmodule Explorer.Chain.ImportTest do
test "publishes addresses with updated fetched_coin_balance data to subscribers on insert" do
Subscriber.to(:addresses, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]}
assert_receive {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]}
end

test "publishes block data to subscribers on insert" do
Subscriber.to(:blocks, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :blocks, :realtime, [%Block{}]}
assert_receive {:chain_event, :blocks, :realtime, [%Block{}]}
end

test "publishes internal_transaction data to subscribers on insert" do
Subscriber.to(:internal_transactions, :realtime)
Import.all(@import_data)

assert_received {:chain_event, :internal_transactions, :realtime,
[%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]}
assert_receive {:chain_event, :internal_transactions, :realtime,
[%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]}
end

test "publishes transactions data to subscribers on insert" do
Subscriber.to(:transactions, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :transactions, :realtime, [%Transaction{}]}
assert_receive {:chain_event, :transactions, :realtime, [%Transaction{}]}
end

test "publishes token_transfers data to subscribers on insert" do
Subscriber.to(:token_transfers, :realtime)

Import.all(@import_data)

assert_received {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]}
assert_receive {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]}
end

test "does not broadcast if broadcast option is false" do
Expand Down

0 comments on commit 19d84ab

Please sign in to comment.