Skip to content

Commit

Permalink
Send events through postgres notify
Browse files Browse the repository at this point in the history
  • Loading branch information
saneery committed Jul 26, 2019
1 parent 185d8be commit 77cd8f3
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 10 deletions.
3 changes: 3 additions & 0 deletions apps/explorer/lib/explorer/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Explorer.Application do
Transactions
}

alias Explorer.Chain.Events.Listener

alias Explorer.Chain.Supply.RSK

alias Explorer.Market.MarketHistoryCache
Expand Down Expand Up @@ -43,6 +45,7 @@ defmodule Explorer.Application do
{Admin.Recovery, [[], [name: Admin.Recovery]]},
{TransactionCount, [[], []]},
{BlockCount, []},
{Listener, []},
con_cache_child_spec(Blocks.cache_name()),
con_cache_child_spec(NetVersion.cache_name()),
con_cache_child_spec(MarketHistoryCache.cache_name()),
Expand Down
54 changes: 54 additions & 0 deletions apps/explorer/lib/explorer/chain/events/listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule Explorer.Chain.Events.Listener do
@moduledoc """
Listens and publishes events from PG
"""

use GenServer

alias Postgrex.Notifications

def start_link(_) do
GenServer.start_link(__MODULE__, "chain_event")
end

def init(channel) do
{:ok, pid} =
:explorer
|> Application.get_env(Explorer.Repo)
|> Notifications.start_link()

ref = Notifications.listen!(pid, channel)

{:ok, {pid, ref, channel}}
end

def handle_info({:notification, _pid, _ref, _topic, payload}, state) do
payload
|> decode_payload!()
|> broadcast()

{:noreply, state}
end

defp decode_payload!(payload) do
payload
|> Base.decode64!()
|> :erlang.binary_to_term()
end

defp broadcast({:chain_event, event_type} = event) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, event)
end
end)
end

defp broadcast({:chain_event, event_type, broadcast_type, _data} = event) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, event)
end
end)
end
end
22 changes: 12 additions & 10 deletions apps/explorer/lib/explorer/chain/events/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ defmodule Explorer.Chain.Events.Publisher do
@moduledoc """
Publishes events related to the Chain context.
"""
alias Ecto.Adapters.SQL
alias Explorer.Repo

@allowed_events ~w(addresses address_coin_balances blocks block_rewards internal_transactions token_transfers transactions contract_verification_result)a

Expand All @@ -19,11 +21,8 @@ defmodule Explorer.Chain.Events.Publisher do
end

defp send_data(event_type) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type})
end
end)
payload = encode_payload({:chain_event, event_type})
SQL.query(Repo, "NOTIFY chain_event, '#{payload}';")
end

# The :catchup type of event is not being consumed right now.
Expand All @@ -32,10 +31,13 @@ defmodule Explorer.Chain.Events.Publisher do
defp send_data(_event_type, :catchup, _event_data), do: :ok

defp send_data(event_type, broadcast_type, event_data) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, broadcast_type, event_data})
end
end)
payload = encode_payload({:chain_event, event_type, broadcast_type, event_data})
SQL.query(Repo, "NOTIFY chain_event, '#{payload}';")
end

defp encode_payload(payload) do
payload
|> :erlang.term_to_binary()
|> Base.encode64()
end
end

0 comments on commit 77cd8f3

Please sign in to comment.