Skip to content

Commit

Permalink
Refactor token and address channel
Browse files Browse the repository at this point in the history
  • Loading branch information
nikitosing committed May 15, 2023
1 parent cf0cae0 commit 2123a00
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,18 +311,24 @@ defmodule BlockScoutWeb.AddressChannel do
end

def handle_token_transfer(
%{address: _address, token_transfer: token_transfer},
%{token_transfers: token_transfers},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket,
event
) do
token_transfer_json = TransactionViewAPI.render("token_transfer.json", %{token_transfer: token_transfer, conn: nil})
)
when is_list(token_transfers) do
token_transfer_json =
TransactionViewAPI.render("token_transfers.json", %{token_transfers: token_transfers, conn: nil})

push(socket, event, %{token_transfer: token_transfer_json})
push(socket, event, %{token_transfers: token_transfer_json})

{:noreply, socket}
end

def handle_token_transfer(%{address: address, token_transfer: token_transfer}, socket, event) do
def handle_token_transfer(
%{address: address, token_transfer: token_transfer},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocket} = socket,
event
) do
Gettext.put_locale(BlockScoutWeb.Gettext, socket.assigns.locale)

transaction =
Expand Down Expand Up @@ -356,6 +362,10 @@ defmodule BlockScoutWeb.AddressChannel do
{:noreply, socket}
end

def handle_token_transfer(_, socket, _event) do
{:noreply, socket}
end

defp render_balance_card(address, exchange_rate, socket) do
Gettext.put_locale(BlockScoutWeb.Gettext, socket.assigns.locale)

Expand Down
17 changes: 13 additions & 4 deletions apps/block_scout_web/lib/block_scout_web/channels/token_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@ defmodule BlockScoutWeb.TokenChannel do

def handle_out(
"token_transfer",
%{token_transfer: _token_transfer},
%{token_transfers: token_transfers},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket
) do
push(socket, "token_transfer", %{token_transfer: 1})
)
when is_list(token_transfers) do
push(socket, "token_transfer", %{token_transfer: Enum.count(token_transfers)})

{:noreply, socket}
end

def handle_out("token_transfer", %{token_transfer: token_transfer}, socket) do
def handle_out(
"token_transfer",
%{token_transfer: token_transfer},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocket} = socket
) do
Gettext.put_locale(BlockScoutWeb.Gettext, socket.assigns.locale)

rendered_token_transfer =
Expand All @@ -50,6 +55,10 @@ defmodule BlockScoutWeb.TokenChannel do
{:noreply, socket}
end

def handle_out("token_transfer", _, socket) do
{:noreply, socket}
end

def handle_out(
"token_total_supply",
%{token: %Explorer.Chain.Token{total_supply: total_supply}},
Expand Down
80 changes: 50 additions & 30 deletions apps/block_scout_web/lib/block_scout_web/notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule BlockScoutWeb.Notifier do
}

alias Explorer.{Chain, Market, Repo}
alias Explorer.Chain.{Address, InternalTransaction, TokenTransfer, Transaction}
alias Explorer.Chain.{Address, InternalTransaction, Transaction}
alias Explorer.Chain.Supply.RSK
alias Explorer.Chain.Transaction.History.TransactionStats
alias Explorer.Counters.{AverageBlockTime, Helper}
Expand Down Expand Up @@ -154,7 +154,16 @@ defmodule BlockScoutWeb.Notifier do
end

def handle_event({:chain_event, :token_transfers, :realtime, all_token_transfers}) do
transfers_by_token = Enum.group_by(all_token_transfers, fn tt -> to_string(tt.token_contract_address_hash) end)
all_token_transfers_full =
all_token_transfers
|> Stream.map(
&(&1
|> Repo.preload([:from_address, :to_address, :token, transaction: :block]))
)

transfers_by_token = Enum.group_by(all_token_transfers_full, fn tt -> to_string(tt.token_contract_address_hash) end)

broadcast_token_transfers_websocket_v2(all_token_transfers_full, transfers_by_token)

for {token_contract_address_hash, token_transfers} <- transfers_by_token do
Subscription.publish(
Expand All @@ -163,20 +172,7 @@ defmodule BlockScoutWeb.Notifier do
token_transfers: token_contract_address_hash
)

token_transfers_full =
token_transfers
|> Stream.map(
&(TokenTransfer
|> Repo.get_by(
block_hash: &1.block_hash,
transaction_hash: &1.transaction_hash,
token_contract_address_hash: &1.token_contract_address_hash,
log_index: &1.log_index
)
|> Repo.preload([:from_address, :to_address, :token, transaction: :block]))
)

token_transfers_full
token_transfers
|> Enum.each(&broadcast_token_transfer/1)
end
end
Expand Down Expand Up @@ -407,20 +403,7 @@ defmodule BlockScoutWeb.Notifier do
})
end

transactions_grouped_by_from =
transactions
|> Enum.group_by(fn transaction -> transaction.from_address_hash end)

transactions_grouped_by_to =
transactions
|> Enum.group_by(fn transaction -> transaction.to_address_hash end)

grouped_transactions =
Map.merge(transactions_grouped_by_to, transactions_grouped_by_from, fn _k, v1, v2 -> Enum.uniq(v1 ++ v2) end)

for {address_hash, transactions} <- grouped_transactions do
Endpoint.broadcast("addresses:#{address_hash}", event, %{transactions: transactions})
end
group_by_address_hashes_and_broadcast(transactions, event, :transactions)
end

defp broadcast_transaction(%Transaction{block_number: nil} = pending) do
Expand Down Expand Up @@ -451,6 +434,14 @@ defmodule BlockScoutWeb.Notifier do
end
end

defp broadcast_token_transfers_websocket_v2(tokens_transfers, transfers_by_token) do
for {token_contract_address_hash, token_transfers} <- transfers_by_token do
Endpoint.broadcast("tokens:#{token_contract_address_hash}", "token_transfer", %{token_transfers: token_transfers})
end

group_by_address_hashes_and_broadcast(tokens_transfers, "token_transfer", :token_transfers)
end

defp broadcast_token_transfer(token_transfer) do
broadcast_token_transfer(token_transfer, "token_transfer")
end
Expand All @@ -473,4 +464,33 @@ defmodule BlockScoutWeb.Notifier do
})
end
end

defp group_by_address_hashes_and_broadcast(elements, event, map_key) do
grouped_by_from =
elements
|> Enum.group_by(fn el -> el.from_address_hash end)

grouped_by_to =
elements
|> Enum.group_by(fn el -> el.to_address_hash end)

grouped = Map.merge(grouped_by_to, grouped_by_from, fn _k, v1, v2 -> Enum.uniq(v1 ++ v2) end)

for {address_hash, elements} <- grouped do
if "0xbb36c792b9b45aaf8b848a1392b0d6559202729e" == "#{address_hash}" do
debug("addresses:#{address_hash}", event)
debug(%{map_key => elements}, "12312312313")
end

Endpoint.broadcast("addresses:#{address_hash}", event, %{map_key => elements})
end
end

defp debug(value, key) do
require Logger
Logger.configure(truncate: :infinity)
Logger.info(key)
Logger.info(Kernel.inspect(value, limit: :infinity, printable_limit: :infinity))
value
end
end

0 comments on commit 2123a00

Please sign in to comment.