Skip to content

Commit

Permalink
Merge pull request #2449 from poanetwork/realtime-pubsub-by-postges-n…
Browse files Browse the repository at this point in the history
…otify

Send events through postgres notify
  • Loading branch information
vbaranov committed Oct 24, 2019
2 parents 4f987c2 + f50b73e commit a4e6eeb
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## Current

### Features
- [#2449](https://github.com/poanetwork/blockscout/pull/2449) - add ability to send notification events through postgres notify

### Fixes

Expand Down
12 changes: 12 additions & 0 deletions apps/explorer/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ config :explorer,
include_uncles_in_average_block_time:
if(System.get_env("UNCLES_IN_AVERAGE_BLOCK_TIME") == "true", do: true, else: false),
healthy_blocks_period: System.get_env("HEALTHY_BLOCKS_PERIOD") || :timer.minutes(5),
realtime_events_sender:
if(System.get_env("DISABLE_WEBAPP") != "true",
do: Explorer.Chain.Events.SimpleSender,
else: Explorer.Chain.Events.DBSender
),
index_internal_transactions_for_token_transfers:
if(System.get_env("INTERNAL_TRANSACTIONOS_FOR_TOKEN_TRANSFERS") == "true", do: true, else: false)

Expand All @@ -29,6 +34,13 @@ config :explorer, Explorer.Counters.AverageBlockTime,
enabled: true,
period: average_block_period

config :explorer, Explorer.Chain.Events.Listener,
enabled:
if(System.get_env("DISABLE_WEBAPP") == nil && System.get_env("DISABLE_INDEXER") == nil,
do: false,
else: true
)

config :explorer, Explorer.ChainSpec.GenesisData,
enabled: true,
chain_spec_path: System.get_env("CHAIN_SPEC_PATH"),
Expand Down
3 changes: 3 additions & 0 deletions apps/explorer/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ end
config :explorer, Explorer.ExchangeRates.Source.TransactionAndLog,
secondary_source: Explorer.ExchangeRates.Source.OneCoinSource

config :explorer,
realtime_events_sender: Explorer.Chain.Events.SimpleSender

variant =
if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do
"parity"
Expand Down
1 change: 1 addition & 0 deletions apps/explorer/lib/explorer/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ defmodule Explorer.Application do
configure(Explorer.ChainSpec.GenesisData),
configure(Explorer.KnownTokens),
configure(Explorer.Market.History.Cataloger),
configure(Explorer.Chain.Events.Listener),
configure(Explorer.Counters.AddressesWithBalanceCounter),
configure(Explorer.Counters.AddressesCounter),
configure(Explorer.Counters.AverageBlockTime),
Expand Down
26 changes: 26 additions & 0 deletions apps/explorer/lib/explorer/chain/events/db_sender.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Explorer.Chain.Events.DBSender do
@moduledoc """
Sends events to Postgres.
"""
alias Explorer.Repo

def send_data(event_type) do
payload = encode_payload({:chain_event, event_type})
send_notify(payload)
end

def send_data(event_type, broadcast_type, event_data) do
payload = encode_payload({:chain_event, event_type, broadcast_type, event_data})
send_notify(payload)
end

defp encode_payload(payload) do
payload
|> :erlang.term_to_binary([:compressed])
|> Base.encode64()
end

defp send_notify(payload) do
Repo.query!("select pg_notify('chain_event', $1::text);", [payload])
end
end
55 changes: 55 additions & 0 deletions apps/explorer/lib/explorer/chain/events/listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule Explorer.Chain.Events.Listener do
@moduledoc """
Listens and publishes events from PG
"""

use GenServer

alias Postgrex.Notifications

def start_link(_) do
GenServer.start_link(__MODULE__, "chain_event", name: __MODULE__)
end

def init(channel) do
{:ok, pid} =
:explorer
|> Application.get_env(Explorer.Repo)
|> Notifications.start_link()

ref = Notifications.listen!(pid, channel)

{:ok, {pid, ref, channel}}
end

def handle_info({:notification, _pid, _ref, _topic, payload}, state) do
payload
|> decode_payload!()
|> broadcast()

{:noreply, state}
end

# sobelow_skip ["Misc.BinToTerm"]
defp decode_payload!(payload) do
payload
|> Base.decode64!()
|> :erlang.binary_to_term()
end

defp broadcast({:chain_event, event_type} = event) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, event)
end
end)
end

defp broadcast({:chain_event, event_type, broadcast_type, _data} = event) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, event)
end
end)
end
end
17 changes: 7 additions & 10 deletions apps/explorer/lib/explorer/chain/events/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ defmodule Explorer.Chain.Events.Publisher do
@spec broadcast(atom()) :: :ok
def broadcast(event_type) do
send_data(event_type)
:ok
end

defp send_data(event_type) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type})
end
end)
sender().send_data(event_type)
end

defp sender do
Application.get_env(:explorer, :realtime_events_sender)
end

# The :catchup type of event is not being consumed right now.
Expand All @@ -32,10 +33,6 @@ defmodule Explorer.Chain.Events.Publisher do
defp send_data(_event_type, :catchup, _event_data), do: :ok

defp send_data(event_type, broadcast_type, event_data) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, broadcast_type, event_data})
end
end)
sender().send_data(event_type, broadcast_type, event_data)
end
end
21 changes: 21 additions & 0 deletions apps/explorer/lib/explorer/chain/events/simple_sender.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule Explorer.Chain.Events.SimpleSender do
@moduledoc """
Publishes events through Registry without intermediate levels.
"""

def send_data(event_type, broadcast_type, event_data) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, broadcast_type, event_data})
end
end)
end

def send_data(event_type) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type})
end
end)
end
end
4 changes: 2 additions & 2 deletions apps/explorer/test/explorer/chain/events/publisher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Explorer.Chain.Events.PublisherTest do

Publisher.broadcast([{event_type, event_data}], broadcast_type)

assert_received {:chain_event, ^event_type, ^broadcast_type, []}
assert_receive {:chain_event, ^event_type, ^broadcast_type, []}
end

test "won't send chain_event of catchup type" do
Expand Down Expand Up @@ -59,7 +59,7 @@ defmodule Explorer.Chain.Events.PublisherTest do

Publisher.broadcast(event_type)

assert_received {:chain_event, ^event_type}
assert_receive {:chain_event, ^event_type}
end
end
end
4 changes: 2 additions & 2 deletions apps/explorer/test/explorer/chain/events/subscriber_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Explorer.Chain.Events.SubscriberTest do

Publisher.broadcast([{event_type, event_data}], broadcast_type)

assert_received {:chain_event, :blocks, :realtime, []}
assert_receive {:chain_event, :blocks, :realtime, []}
end
end

Expand All @@ -27,7 +27,7 @@ defmodule Explorer.Chain.Events.SubscriberTest do

Publisher.broadcast(event_type)

assert_received {:chain_event, :exchange_rate}
assert_receive {:chain_event, :exchange_rate}
end
end
end
12 changes: 6 additions & 6 deletions apps/explorer/test/explorer/chain/import_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -470,35 +470,35 @@ defmodule Explorer.Chain.ImportTest do
test "publishes addresses with updated fetched_coin_balance data to subscribers on insert" do
Subscriber.to(:addresses, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]}
assert_receive {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]}
end

test "publishes block data to subscribers on insert" do
Subscriber.to(:blocks, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :blocks, :realtime, [%Block{}]}
assert_receive {:chain_event, :blocks, :realtime, [%Block{}]}
end

test "publishes internal_transaction data to subscribers on insert" do
Subscriber.to(:internal_transactions, :realtime)
Import.all(@import_data)

assert_received {:chain_event, :internal_transactions, :realtime,
[%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]}
assert_receive {:chain_event, :internal_transactions, :realtime,
[%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]}
end

test "publishes transactions data to subscribers on insert" do
Subscriber.to(:transactions, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :transactions, :realtime, [%Transaction{}]}
assert_receive {:chain_event, :transactions, :realtime, [%Transaction{}]}
end

test "publishes token_transfers data to subscribers on insert" do
Subscriber.to(:token_transfers, :realtime)

Import.all(@import_data)

assert_received {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]}
assert_receive {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]}
end

test "does not broadcast if broadcast option is false" do
Expand Down

0 comments on commit a4e6eeb

Please sign in to comment.