Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send events through postgres notify #2449

Merged
merged 21 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## Current

### Features
- [#2449](https://github.com/poanetwork/blockscout/pull/2449) - add ability to send notification events through postgres notify

### Fixes

Expand Down
12 changes: 12 additions & 0 deletions apps/explorer/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ config :explorer,
include_uncles_in_average_block_time:
if(System.get_env("UNCLES_IN_AVERAGE_BLOCK_TIME") == "true", do: true, else: false),
healthy_blocks_period: System.get_env("HEALTHY_BLOCKS_PERIOD") || :timer.minutes(5),
realtime_events_sender:
if(System.get_env("DISABLE_WEBAPP") != "true",
do: Explorer.Chain.Events.SimpleSender,
else: Explorer.Chain.Events.DBSender
),
index_internal_transactions_for_token_transfers:
if(System.get_env("INTERNAL_TRANSACTIONOS_FOR_TOKEN_TRANSFERS") == "true", do: true, else: false)

Expand All @@ -29,6 +34,13 @@ config :explorer, Explorer.Counters.AverageBlockTime,
enabled: true,
period: average_block_period

config :explorer, Explorer.Chain.Events.Listener,
enabled:
if(System.get_env("DISABLE_WEBAPP") == nil && System.get_env("DISABLE_INDEXER") == nil,
do: false,
else: true
)

config :explorer, Explorer.ChainSpec.GenesisData,
enabled: true,
chain_spec_path: System.get_env("CHAIN_SPEC_PATH"),
Expand Down
3 changes: 3 additions & 0 deletions apps/explorer/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ end
config :explorer, Explorer.ExchangeRates.Source.TransactionAndLog,
secondary_source: Explorer.ExchangeRates.Source.OneCoinSource

config :explorer,
realtime_events_sender: Explorer.Chain.Events.SimpleSender

variant =
if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do
"parity"
Expand Down
1 change: 1 addition & 0 deletions apps/explorer/lib/explorer/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ defmodule Explorer.Application do
configure(Explorer.ChainSpec.GenesisData),
configure(Explorer.KnownTokens),
configure(Explorer.Market.History.Cataloger),
configure(Explorer.Chain.Events.Listener),
configure(Explorer.Counters.AddressesWithBalanceCounter),
configure(Explorer.Counters.AddressesCounter),
configure(Explorer.Counters.AverageBlockTime),
Expand Down
26 changes: 26 additions & 0 deletions apps/explorer/lib/explorer/chain/events/db_sender.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Explorer.Chain.Events.DBSender do
@moduledoc """
Sends events to Postgres.
"""
alias Explorer.Repo

def send_data(event_type) do
payload = encode_payload({:chain_event, event_type})
send_notify(payload)
end

def send_data(event_type, broadcast_type, event_data) do
payload = encode_payload({:chain_event, event_type, broadcast_type, event_data})
send_notify(payload)
end

defp encode_payload(payload) do
payload
|> :erlang.term_to_binary([:compressed])
|> Base.encode64()
end

defp send_notify(payload) do
Repo.query!("select pg_notify('chain_event', $1::text);", [payload])
end
end
55 changes: 55 additions & 0 deletions apps/explorer/lib/explorer/chain/events/listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule Explorer.Chain.Events.Listener do
@moduledoc """
Listens and publishes events from PG
"""

use GenServer

alias Postgrex.Notifications

def start_link(_) do
GenServer.start_link(__MODULE__, "chain_event", name: __MODULE__)
end

def init(channel) do
{:ok, pid} =
:explorer
|> Application.get_env(Explorer.Repo)
|> Notifications.start_link()

ref = Notifications.listen!(pid, channel)

{:ok, {pid, ref, channel}}
end

def handle_info({:notification, _pid, _ref, _topic, payload}, state) do
payload
|> decode_payload!()
|> broadcast()

{:noreply, state}
end

# sobelow_skip ["Misc.BinToTerm"]
defp decode_payload!(payload) do
payload
|> Base.decode64!()
|> :erlang.binary_to_term()
end

defp broadcast({:chain_event, event_type} = event) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, event)
end
end)
end

defp broadcast({:chain_event, event_type, broadcast_type, _data} = event) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, event)
end
end)
end
end
17 changes: 7 additions & 10 deletions apps/explorer/lib/explorer/chain/events/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ defmodule Explorer.Chain.Events.Publisher do
@spec broadcast(atom()) :: :ok
def broadcast(event_type) do
send_data(event_type)
:ok
end

defp send_data(event_type) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type})
end
end)
sender().send_data(event_type)
end

defp sender do
Application.get_env(:explorer, :realtime_events_sender)
end

# The :catchup type of event is not being consumed right now.
Expand All @@ -32,10 +33,6 @@ defmodule Explorer.Chain.Events.Publisher do
defp send_data(_event_type, :catchup, _event_data), do: :ok

defp send_data(event_type, broadcast_type, event_data) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, broadcast_type, event_data})
end
end)
sender().send_data(event_type, broadcast_type, event_data)
end
end
21 changes: 21 additions & 0 deletions apps/explorer/lib/explorer/chain/events/simple_sender.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule Explorer.Chain.Events.SimpleSender do
@moduledoc """
Publishes events through Registry without intermediate levels.
"""

def send_data(event_type, broadcast_type, event_data) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, broadcast_type, event_data})
end
end)
end

def send_data(event_type) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type})
end
end)
end
end
4 changes: 2 additions & 2 deletions apps/explorer/test/explorer/chain/events/publisher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Explorer.Chain.Events.PublisherTest do

Publisher.broadcast([{event_type, event_data}], broadcast_type)

assert_received {:chain_event, ^event_type, ^broadcast_type, []}
assert_receive {:chain_event, ^event_type, ^broadcast_type, []}
end

test "won't send chain_event of catchup type" do
Expand Down Expand Up @@ -59,7 +59,7 @@ defmodule Explorer.Chain.Events.PublisherTest do

Publisher.broadcast(event_type)

assert_received {:chain_event, ^event_type}
assert_receive {:chain_event, ^event_type}
end
end
end
4 changes: 2 additions & 2 deletions apps/explorer/test/explorer/chain/events/subscriber_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Explorer.Chain.Events.SubscriberTest do

Publisher.broadcast([{event_type, event_data}], broadcast_type)

assert_received {:chain_event, :blocks, :realtime, []}
assert_receive {:chain_event, :blocks, :realtime, []}
end
end

Expand All @@ -27,7 +27,7 @@ defmodule Explorer.Chain.Events.SubscriberTest do

Publisher.broadcast(event_type)

assert_received {:chain_event, :exchange_rate}
assert_receive {:chain_event, :exchange_rate}
end
end
end
12 changes: 6 additions & 6 deletions apps/explorer/test/explorer/chain/import_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -470,35 +470,35 @@ defmodule Explorer.Chain.ImportTest do
test "publishes addresses with updated fetched_coin_balance data to subscribers on insert" do
Subscriber.to(:addresses, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]}
assert_receive {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]}
end

test "publishes block data to subscribers on insert" do
Subscriber.to(:blocks, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :blocks, :realtime, [%Block{}]}
assert_receive {:chain_event, :blocks, :realtime, [%Block{}]}
end

test "publishes internal_transaction data to subscribers on insert" do
Subscriber.to(:internal_transactions, :realtime)
Import.all(@import_data)

assert_received {:chain_event, :internal_transactions, :realtime,
[%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]}
assert_receive {:chain_event, :internal_transactions, :realtime,
[%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]}
end

test "publishes transactions data to subscribers on insert" do
Subscriber.to(:transactions, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :transactions, :realtime, [%Transaction{}]}
assert_receive {:chain_event, :transactions, :realtime, [%Transaction{}]}
end

test "publishes token_transfers data to subscribers on insert" do
Subscriber.to(:token_transfers, :realtime)

Import.all(@import_data)

assert_received {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]}
assert_receive {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]}
end

test "does not broadcast if broadcast option is false" do
Expand Down