From 2123a00ff58fe4f5879147c628861de05c189883 Mon Sep 17 00:00:00 2001 From: Nikita Pozdniakov Date: Thu, 11 May 2023 13:29:32 +0300 Subject: [PATCH] Refactor token and address channel --- .../channels/address_channel.ex | 20 +++-- .../block_scout_web/channels/token_channel.ex | 17 +++- .../lib/block_scout_web/notifier.ex | 80 ++++++++++++------- 3 files changed, 78 insertions(+), 39 deletions(-) diff --git a/apps/block_scout_web/lib/block_scout_web/channels/address_channel.ex b/apps/block_scout_web/lib/block_scout_web/channels/address_channel.ex index 587d20aa9d1e..6072545f34c4 100644 --- a/apps/block_scout_web/lib/block_scout_web/channels/address_channel.ex +++ b/apps/block_scout_web/lib/block_scout_web/channels/address_channel.ex @@ -311,18 +311,24 @@ defmodule BlockScoutWeb.AddressChannel do end def handle_token_transfer( - %{address: _address, token_transfer: token_transfer}, + %{token_transfers: token_transfers}, %Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket, event - ) do - token_transfer_json = TransactionViewAPI.render("token_transfer.json", %{token_transfer: token_transfer, conn: nil}) + ) + when is_list(token_transfers) do + token_transfer_json = + TransactionViewAPI.render("token_transfers.json", %{token_transfers: token_transfers, conn: nil}) - push(socket, event, %{token_transfer: token_transfer_json}) + push(socket, event, %{token_transfers: token_transfer_json}) {:noreply, socket} end - def handle_token_transfer(%{address: address, token_transfer: token_transfer}, socket, event) do + def handle_token_transfer( + %{address: address, token_transfer: token_transfer}, + %Phoenix.Socket{handler: BlockScoutWeb.UserSocket} = socket, + event + ) do Gettext.put_locale(BlockScoutWeb.Gettext, socket.assigns.locale) transaction = @@ -356,6 +362,10 @@ defmodule BlockScoutWeb.AddressChannel do {:noreply, socket} end + def handle_token_transfer(_, socket, _event) do + {:noreply, socket} + end + defp render_balance_card(address, exchange_rate, socket) do Gettext.put_locale(BlockScoutWeb.Gettext, socket.assigns.locale) diff --git a/apps/block_scout_web/lib/block_scout_web/channels/token_channel.ex b/apps/block_scout_web/lib/block_scout_web/channels/token_channel.ex index fff56d6c0902..ac4295e6b010 100644 --- a/apps/block_scout_web/lib/block_scout_web/channels/token_channel.ex +++ b/apps/block_scout_web/lib/block_scout_web/channels/token_channel.ex @@ -21,15 +21,20 @@ defmodule BlockScoutWeb.TokenChannel do def handle_out( "token_transfer", - %{token_transfer: _token_transfer}, + %{token_transfers: token_transfers}, %Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket - ) do - push(socket, "token_transfer", %{token_transfer: 1}) + ) + when is_list(token_transfers) do + push(socket, "token_transfer", %{token_transfer: Enum.count(token_transfers)}) {:noreply, socket} end - def handle_out("token_transfer", %{token_transfer: token_transfer}, socket) do + def handle_out( + "token_transfer", + %{token_transfer: token_transfer}, + %Phoenix.Socket{handler: BlockScoutWeb.UserSocket} = socket + ) do Gettext.put_locale(BlockScoutWeb.Gettext, socket.assigns.locale) rendered_token_transfer = @@ -50,6 +55,10 @@ defmodule BlockScoutWeb.TokenChannel do {:noreply, socket} end + def handle_out("token_transfer", _, socket) do + {:noreply, socket} + end + def handle_out( "token_total_supply", %{token: %Explorer.Chain.Token{total_supply: total_supply}}, diff --git a/apps/block_scout_web/lib/block_scout_web/notifier.ex b/apps/block_scout_web/lib/block_scout_web/notifier.ex index a2dcb3bae5ad..078eeb73ea0a 100644 --- a/apps/block_scout_web/lib/block_scout_web/notifier.ex +++ b/apps/block_scout_web/lib/block_scout_web/notifier.ex @@ -15,7 +15,7 @@ defmodule BlockScoutWeb.Notifier do } alias Explorer.{Chain, Market, Repo} - alias Explorer.Chain.{Address, InternalTransaction, TokenTransfer, Transaction} + alias Explorer.Chain.{Address, InternalTransaction, Transaction} alias Explorer.Chain.Supply.RSK alias Explorer.Chain.Transaction.History.TransactionStats alias Explorer.Counters.{AverageBlockTime, Helper} @@ -154,7 +154,16 @@ defmodule BlockScoutWeb.Notifier do end def handle_event({:chain_event, :token_transfers, :realtime, all_token_transfers}) do - transfers_by_token = Enum.group_by(all_token_transfers, fn tt -> to_string(tt.token_contract_address_hash) end) + all_token_transfers_full = + all_token_transfers + |> Stream.map( + &(&1 + |> Repo.preload([:from_address, :to_address, :token, transaction: :block])) + ) + + transfers_by_token = Enum.group_by(all_token_transfers_full, fn tt -> to_string(tt.token_contract_address_hash) end) + + broadcast_token_transfers_websocket_v2(all_token_transfers_full, transfers_by_token) for {token_contract_address_hash, token_transfers} <- transfers_by_token do Subscription.publish( @@ -163,20 +172,7 @@ defmodule BlockScoutWeb.Notifier do token_transfers: token_contract_address_hash ) - token_transfers_full = - token_transfers - |> Stream.map( - &(TokenTransfer - |> Repo.get_by( - block_hash: &1.block_hash, - transaction_hash: &1.transaction_hash, - token_contract_address_hash: &1.token_contract_address_hash, - log_index: &1.log_index - ) - |> Repo.preload([:from_address, :to_address, :token, transaction: :block])) - ) - - token_transfers_full + token_transfers |> Enum.each(&broadcast_token_transfer/1) end end @@ -407,20 +403,7 @@ defmodule BlockScoutWeb.Notifier do }) end - transactions_grouped_by_from = - transactions - |> Enum.group_by(fn transaction -> transaction.from_address_hash end) - - transactions_grouped_by_to = - transactions - |> Enum.group_by(fn transaction -> transaction.to_address_hash end) - - grouped_transactions = - Map.merge(transactions_grouped_by_to, transactions_grouped_by_from, fn _k, v1, v2 -> Enum.uniq(v1 ++ v2) end) - - for {address_hash, transactions} <- grouped_transactions do - Endpoint.broadcast("addresses:#{address_hash}", event, %{transactions: transactions}) - end + group_by_address_hashes_and_broadcast(transactions, event, :transactions) end defp broadcast_transaction(%Transaction{block_number: nil} = pending) do @@ -451,6 +434,14 @@ defmodule BlockScoutWeb.Notifier do end end + defp broadcast_token_transfers_websocket_v2(tokens_transfers, transfers_by_token) do + for {token_contract_address_hash, token_transfers} <- transfers_by_token do + Endpoint.broadcast("tokens:#{token_contract_address_hash}", "token_transfer", %{token_transfers: token_transfers}) + end + + group_by_address_hashes_and_broadcast(tokens_transfers, "token_transfer", :token_transfers) + end + defp broadcast_token_transfer(token_transfer) do broadcast_token_transfer(token_transfer, "token_transfer") end @@ -473,4 +464,33 @@ defmodule BlockScoutWeb.Notifier do }) end end + + defp group_by_address_hashes_and_broadcast(elements, event, map_key) do + grouped_by_from = + elements + |> Enum.group_by(fn el -> el.from_address_hash end) + + grouped_by_to = + elements + |> Enum.group_by(fn el -> el.to_address_hash end) + + grouped = Map.merge(grouped_by_to, grouped_by_from, fn _k, v1, v2 -> Enum.uniq(v1 ++ v2) end) + + for {address_hash, elements} <- grouped do + if "0xbb36c792b9b45aaf8b848a1392b0d6559202729e" == "#{address_hash}" do + debug("addresses:#{address_hash}", event) + debug(%{map_key => elements}, "12312312313") + end + + Endpoint.broadcast("addresses:#{address_hash}", event, %{map_key => elements}) + end + end + + defp debug(value, key) do + require Logger + Logger.configure(truncate: :infinity) + Logger.info(key) + Logger.info(Kernel.inspect(value, limit: :infinity, printable_limit: :infinity)) + value + end end