Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send events through postgres notify #2449

Merged
merged 21 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/explorer/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ config :explorer,
"homestead,tangerineWhistle,spuriousDragon,byzantium,constantinople,petersburg,default",
include_uncles_in_average_block_time:
if(System.get_env("UNCLES_IN_AVERAGE_BLOCK_TIME") == "false", do: false, else: true),
healthy_blocks_period: System.get_env("HEALTHY_BLOCKS_PERIOD") || :timer.minutes(5)
healthy_blocks_period: System.get_env("HEALTHY_BLOCKS_PERIOD") || :timer.minutes(5),
realtime_events_sender: Explorer.Chain.Events.Sender

average_block_period =
case Integer.parse(System.get_env("AVERAGE_BLOCK_CACHE_PERIOD", "")) do
Expand Down
3 changes: 3 additions & 0 deletions apps/explorer/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ end
config :explorer, Explorer.ExchangeRates.Source.TransactionAndLog,
secondary_source: Explorer.ExchangeRates.Source.OneCoinSource

config :explorer,
realtime_events_sender: Explorer.Chain.Events.SenderMock

variant =
if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do
"parity"
Expand Down
3 changes: 3 additions & 0 deletions apps/explorer/lib/explorer/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Explorer.Application do
Transactions
}

alias Explorer.Chain.Events.Listener

alias Explorer.Chain.Supply.RSK

alias Explorer.Market.MarketHistoryCache
Expand Down Expand Up @@ -43,6 +45,7 @@ defmodule Explorer.Application do
{Admin.Recovery, [[], [name: Admin.Recovery]]},
{TransactionCount, [[], []]},
{BlockCount, []},
{Listener, []},
con_cache_child_spec(Blocks.cache_name()),
con_cache_child_spec(NetVersion.cache_name()),
con_cache_child_spec(MarketHistoryCache.cache_name()),
Expand Down
55 changes: 55 additions & 0 deletions apps/explorer/lib/explorer/chain/events/listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule Explorer.Chain.Events.Listener do
@moduledoc """
Listens and publishes events from PG
"""

use GenServer

alias Postgrex.Notifications

def start_link(_) do
GenServer.start_link(__MODULE__, "chain_event", name: __MODULE__)
end

def init(channel) do
{:ok, pid} =
:explorer
|> Application.get_env(Explorer.Repo)
|> Notifications.start_link()

ref = Notifications.listen!(pid, channel)

{:ok, {pid, ref, channel}}
end

def handle_info({:notification, _pid, _ref, _topic, payload}, state) do
payload
|> decode_payload!()
|> broadcast()

{:noreply, state}
end

# sobelow_skip ["Misc.BinToTerm"]
defp decode_payload!(payload) do
payload
|> Base.decode64!()
|> :erlang.binary_to_term()
end

defp broadcast({:chain_event, event_type} = event) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, event)
end
end)
end

defp broadcast({:chain_event, event_type, broadcast_type, _data} = event) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, event)
end
end)
end
end
22 changes: 12 additions & 10 deletions apps/explorer/lib/explorer/chain/events/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Explorer.Chain.Events.Publisher do
@moduledoc """
Publishes events related to the Chain context.
"""
@sender Application.get_env(:explorer, :realtime_events_sender)

@allowed_events ~w(addresses address_coin_balances blocks block_rewards internal_transactions token_transfers transactions contract_verification_result)a

Expand All @@ -16,14 +17,12 @@ defmodule Explorer.Chain.Events.Publisher do
@spec broadcast(atom()) :: :ok
def broadcast(event_type) do
send_data(event_type)
:ok
end

defp send_data(event_type) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type})
end
end)
payload = encode_payload({:chain_event, event_type})
@sender.send_notify(payload)
end

# The :catchup type of event is not being consumed right now.
Expand All @@ -32,10 +31,13 @@ defmodule Explorer.Chain.Events.Publisher do
defp send_data(_event_type, :catchup, _event_data), do: :ok

defp send_data(event_type, broadcast_type, event_data) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, broadcast_type, event_data})
end
end)
payload = encode_payload({:chain_event, event_type, broadcast_type, event_data})
@sender.send_notify(payload)
end

defp encode_payload(payload) do
payload
|> :erlang.term_to_binary([:compressed])
|> Base.encode64()
end
end
12 changes: 12 additions & 0 deletions apps/explorer/lib/explorer/chain/events/sender.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Explorer.Chain.Events.Sender do
@moduledoc """
Sends events to Postgres.
"""
alias Explorer.Repo

@callback send_notify(String.t()) :: {:ok, any}

def send_notify(payload) do
Repo.query!("select pg_notify('chain_event', $1::text);", [payload])
end
end
10 changes: 10 additions & 0 deletions apps/explorer/lib/explorer/chain/events/sender_mock.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule Explorer.Chain.Events.SenderMock do
@moduledoc """
Sends events directly to Listener.
"""
alias Explorer.Chain.Events.Listener

def send_notify(payload) do
send(Listener, {:notification, nil, nil, nil, payload})
end
end
4 changes: 2 additions & 2 deletions apps/explorer/test/explorer/chain/events/publisher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Explorer.Chain.Events.PublisherTest do

Publisher.broadcast([{event_type, event_data}], broadcast_type)

assert_received {:chain_event, ^event_type, ^broadcast_type, []}
assert_receive {:chain_event, ^event_type, ^broadcast_type, []}
end

test "won't send chain_event of catchup type" do
Expand Down Expand Up @@ -59,7 +59,7 @@ defmodule Explorer.Chain.Events.PublisherTest do

Publisher.broadcast(event_type)

assert_received {:chain_event, ^event_type}
assert_receive {:chain_event, ^event_type}
end
end
end
4 changes: 2 additions & 2 deletions apps/explorer/test/explorer/chain/events/subscriber_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Explorer.Chain.Events.SubscriberTest do

Publisher.broadcast([{event_type, event_data}], broadcast_type)

assert_received {:chain_event, :blocks, :realtime, []}
assert_receive {:chain_event, :blocks, :realtime, []}
end
end

Expand All @@ -27,7 +27,7 @@ defmodule Explorer.Chain.Events.SubscriberTest do

Publisher.broadcast(event_type)

assert_received {:chain_event, :exchange_rate}
assert_receive {:chain_event, :exchange_rate}
end
end
end
12 changes: 6 additions & 6 deletions apps/explorer/test/explorer/chain/import_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -470,35 +470,35 @@ defmodule Explorer.Chain.ImportTest do
test "publishes addresses with updated fetched_coin_balance data to subscribers on insert" do
Subscriber.to(:addresses, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]}
assert_receive {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]}
end

test "publishes block data to subscribers on insert" do
Subscriber.to(:blocks, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :blocks, :realtime, [%Block{}]}
assert_receive {:chain_event, :blocks, :realtime, [%Block{}]}
end

test "publishes internal_transaction data to subscribers on insert" do
Subscriber.to(:internal_transactions, :realtime)
Import.all(@import_data)

assert_received {:chain_event, :internal_transactions, :realtime,
[%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]}
assert_receive {:chain_event, :internal_transactions, :realtime,
[%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]}
end

test "publishes transactions data to subscribers on insert" do
Subscriber.to(:transactions, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :transactions, :realtime, [%Transaction{}]}
assert_receive {:chain_event, :transactions, :realtime, [%Transaction{}]}
end

test "publishes token_transfers data to subscribers on insert" do
Subscriber.to(:token_transfers, :realtime)

Import.all(@import_data)

assert_received {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]}
assert_receive {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]}
end

test "does not broadcast if broadcast option is false" do
Expand Down