Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket v2 improvements #7474

Merged
merged 3 commits into from May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -17,6 +17,7 @@
### Fixes

- [#7490](https://github.com/blockscout/blockscout/pull/7490) - Fix pending txs is not a map
- [#7474](https://github.com/blockscout/blockscout/pull/7474) - Websocket v2 improvements
- [#7472](https://github.com/blockscout/blockscout/pull/7472) - Fix RE_CAPTCHA_DISABLED variable parsing
- [#7391](https://github.com/blockscout/blockscout/pull/7391) - Fix: cannot read properties of null (reading 'value')
- [#7377](https://github.com/blockscout/blockscout/pull/7377), [#7454](https://github.com/blockscout/blockscout/pull/7454) - API v2 improvements
Expand Down
Expand Up @@ -267,18 +267,23 @@ defmodule BlockScoutWeb.AddressChannel do
end

def handle_transaction(
%{address: _address, transaction: transaction},
%{transactions: transactions},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket,
event
) do
transaction_json = TransactionViewAPI.render("transaction.json", %{transaction: transaction, conn: nil})
)
when is_list(transactions) do
transaction_json = TransactionViewAPI.render("transactions.json", %{transactions: transactions, conn: nil})

push(socket, event, %{transaction: transaction_json})
push(socket, event, %{transactions: transaction_json})

{:noreply, socket}
end

def handle_transaction(%{address: address, transaction: transaction}, socket, event) do
def handle_transaction(
%{address: address, transaction: transaction},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocket} = socket,
event
) do
Gettext.put_locale(BlockScoutWeb.Gettext, socket.assigns.locale)

rendered =
Expand All @@ -301,19 +306,29 @@ defmodule BlockScoutWeb.AddressChannel do
{:noreply, socket}
end

def handle_transaction(_, socket, _event) do
{:noreply, socket}
end

def handle_token_transfer(
%{address: _address, token_transfer: token_transfer},
%{token_transfers: token_transfers},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket,
event
) do
token_transfer_json = TransactionViewAPI.render("token_transfer.json", %{token_transfer: token_transfer, conn: nil})
)
when is_list(token_transfers) do
token_transfer_json =
TransactionViewAPI.render("token_transfers.json", %{token_transfers: token_transfers, conn: nil})

push(socket, event, %{token_transfer: token_transfer_json})
push(socket, event, %{token_transfers: token_transfer_json})

{:noreply, socket}
end

def handle_token_transfer(%{address: address, token_transfer: token_transfer}, socket, event) do
def handle_token_transfer(
%{address: address, token_transfer: token_transfer},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocket} = socket,
event
) do
Gettext.put_locale(BlockScoutWeb.Gettext, socket.assigns.locale)

transaction =
Expand Down Expand Up @@ -347,6 +362,10 @@ defmodule BlockScoutWeb.AddressChannel do
{:noreply, socket}
end

def handle_token_transfer(_, socket, _event) do
{:noreply, socket}
end

defp render_balance_card(address, exchange_rate, socket) do
Gettext.put_locale(BlockScoutWeb.Gettext, socket.assigns.locale)

Expand Down
17 changes: 13 additions & 4 deletions apps/block_scout_web/lib/block_scout_web/channels/token_channel.ex
Expand Up @@ -21,15 +21,20 @@ defmodule BlockScoutWeb.TokenChannel do

def handle_out(
"token_transfer",
%{token_transfer: _token_transfer},
%{token_transfers: token_transfers},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket
) do
push(socket, "token_transfer", %{token_transfer: 1})
)
when is_list(token_transfers) do
push(socket, "token_transfer", %{token_transfer: Enum.count(token_transfers)})

{:noreply, socket}
end

def handle_out("token_transfer", %{token_transfer: token_transfer}, socket) do
def handle_out(
"token_transfer",
%{token_transfer: token_transfer},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocket} = socket
) do
Gettext.put_locale(BlockScoutWeb.Gettext, socket.assigns.locale)

rendered_token_transfer =
Expand All @@ -50,6 +55,10 @@ defmodule BlockScoutWeb.TokenChannel do
{:noreply, socket}
end

def handle_out("token_transfer", _, socket) do
{:noreply, socket}
end

def handle_out(
"token_total_supply",
%{token: %Explorer.Chain.Token{total_supply: total_supply}},
Expand Down
Expand Up @@ -33,15 +33,20 @@ defmodule BlockScoutWeb.TransactionChannel do

def handle_out(
"pending_transaction",
%{transaction: _transaction},
%{transactions: transactions},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket
) do
push(socket, "pending_transaction", %{pending_transaction: 1})
)
when is_list(transactions) do
push(socket, "pending_transaction", %{pending_transaction: Enum.count(transactions)})

{:noreply, socket}
end

def handle_out("pending_transaction", %{transaction: transaction}, socket) do
def handle_out(
"pending_transaction",
%{transaction: transaction},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocket} = socket
) do
Gettext.put_locale(BlockScoutWeb.Gettext, socket.assigns.locale)

rendered_transaction =
Expand All @@ -61,17 +66,26 @@ defmodule BlockScoutWeb.TransactionChannel do
{:noreply, socket}
end

def handle_out("pending_transaction", _, socket) do
{:noreply, socket}
end

def handle_out(
"transaction",
%{transaction: _transaction},
%{transactions: transactions},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket
) do
push(socket, "transaction", %{transaction: 1})
)
when is_list(transactions) do
push(socket, "transaction", %{transaction: Enum.count(transactions)})

{:noreply, socket}
end

def handle_out("transaction", %{transaction: transaction}, socket) do
def handle_out(
"transaction",
%{transaction: transaction},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocket} = socket
) do
Gettext.put_locale(BlockScoutWeb.Gettext, socket.assigns.locale)

rendered_transaction =
Expand All @@ -91,6 +105,10 @@ defmodule BlockScoutWeb.TransactionChannel do
{:noreply, socket}
end

def handle_out("transaction", _, socket) do
{:noreply, socket}
end

def handle_out(
"raw_trace",
%{raw_trace_origin: transaction_hash},
Expand Down
103 changes: 79 additions & 24 deletions apps/block_scout_web/lib/block_scout_web/notifier.ex
Expand Up @@ -5,6 +5,8 @@ defmodule BlockScoutWeb.Notifier do

alias Absinthe.Subscription

alias BlockScoutWeb.API.V2, as: API_V2

alias BlockScoutWeb.{
AddressContractVerificationViaFlattenedCodeView,
AddressContractVerificationViaJsonView,
Expand All @@ -15,7 +17,7 @@ defmodule BlockScoutWeb.Notifier do
}

alias Explorer.{Chain, Market, Repo}
alias Explorer.Chain.{Address, InternalTransaction, TokenTransfer, Transaction}
alias Explorer.Chain.{Address, InternalTransaction, Transaction}
alias Explorer.Chain.Supply.RSK
alias Explorer.Chain.Transaction.History.TransactionStats
alias Explorer.Counters.{AverageBlockTime, Helper}
Expand Down Expand Up @@ -143,6 +145,7 @@ defmodule BlockScoutWeb.Notifier do
Endpoint.broadcast("transactions:#{transaction_hash}", "raw_trace", %{raw_trace_origin: transaction_hash})
end

# internal txs broadcast disabled on the indexer level, therefore it out of scope of the refactoring within https://github.com/blockscout/blockscout/pull/7474
def handle_event({:chain_event, :internal_transactions, :realtime, internal_transactions}) do
internal_transactions
|> Stream.map(
Expand All @@ -154,7 +157,16 @@ defmodule BlockScoutWeb.Notifier do
end

def handle_event({:chain_event, :token_transfers, :realtime, all_token_transfers}) do
transfers_by_token = Enum.group_by(all_token_transfers, fn tt -> to_string(tt.token_contract_address_hash) end)
all_token_transfers_full =
all_token_transfers
|> Enum.map(
&(&1
|> Repo.preload([:from_address, :to_address, :token, transaction: :block]))
)

transfers_by_token = Enum.group_by(all_token_transfers_full, fn tt -> to_string(tt.token_contract_address_hash) end)

broadcast_token_transfers_websocket_v2(all_token_transfers_full, transfers_by_token)

for {token_contract_address_hash, token_transfers} <- transfers_by_token do
Subscription.publish(
Expand All @@ -163,35 +175,20 @@ defmodule BlockScoutWeb.Notifier do
token_transfers: token_contract_address_hash
)

token_transfers_full =
token_transfers
|> Stream.map(
&(TokenTransfer
|> Repo.get_by(
block_hash: &1.block_hash,
transaction_hash: &1.transaction_hash,
token_contract_address_hash: &1.token_contract_address_hash,
log_index: &1.log_index
)
|> Repo.preload([:from_address, :to_address, :token, transaction: :block]))
)

token_transfers_full
token_transfers
|> Enum.each(&broadcast_token_transfer/1)
end
end

def handle_event({:chain_event, :transactions, :realtime, transactions}) do
preloads = [:block, created_contract_address: :names, from_address: :names, to_address: :names]

transactions
|> Enum.map(& &1.hash)
|> Chain.hashes_to_transactions(
necessity_by_association: %{
:block => :optional,
[created_contract_address: :names] => :optional,
[from_address: :names] => :optional,
[to_address: :names] => :optional
}
|> Enum.map(
&(&1
|> Repo.preload(if API_V2.enabled?(), do: [:token_transfers | preloads], else: preloads))
)
|> broadcast_transactions_websocket_v2()
|> Enum.map(fn tx ->
# Disable parsing of token transfers from websocket for transaction tab because we display token transfers at a separate tab
Map.put(tx, :token_transfers, [])
Expand Down Expand Up @@ -375,6 +372,40 @@ defmodule BlockScoutWeb.Notifier do
end
end

defp broadcast_transactions_websocket_v2(transactions) do
pending_transactions =
Enum.filter(transactions, fn
%Transaction{block_number: nil} -> true
_ -> false
end)

validated_transactions =
Enum.filter(transactions, fn
%Transaction{block_number: nil} -> false
_ -> true
end)

broadcast_transactions_websocket_v2_inner(
pending_transactions,
"transactions:new_pending_transaction",
"pending_transaction"
)

broadcast_transactions_websocket_v2_inner(validated_transactions, "transactions:new_transaction", "transaction")

transactions
end

defp broadcast_transactions_websocket_v2_inner(transactions, default_channel, event) do
if Enum.count(transactions) > 0 do
Endpoint.broadcast(default_channel, event, %{
transactions: transactions
})
end

group_by_address_hashes_and_broadcast(transactions, event, :transactions)
end

defp broadcast_transaction(%Transaction{block_number: nil} = pending) do
broadcast_transaction(pending, "transactions:new_pending_transaction", "pending_transaction")
end
Expand Down Expand Up @@ -403,6 +434,14 @@ defmodule BlockScoutWeb.Notifier do
end
end

defp broadcast_token_transfers_websocket_v2(tokens_transfers, transfers_by_token) do
for {token_contract_address_hash, token_transfers} <- transfers_by_token do
Endpoint.broadcast("tokens:#{token_contract_address_hash}", "token_transfer", %{token_transfers: token_transfers})
end

group_by_address_hashes_and_broadcast(tokens_transfers, "token_transfer", :token_transfers)
end

defp broadcast_token_transfer(token_transfer) do
broadcast_token_transfer(token_transfer, "token_transfer")
end
Expand All @@ -425,4 +464,20 @@ defmodule BlockScoutWeb.Notifier do
})
end
end

defp group_by_address_hashes_and_broadcast(elements, event, map_key) do
grouped_by_from =
elements
|> Enum.group_by(fn el -> el.from_address_hash end)

grouped_by_to =
elements
|> Enum.group_by(fn el -> el.to_address_hash end)

grouped = Map.merge(grouped_by_to, grouped_by_from, fn _k, v1, v2 -> Enum.uniq(v1 ++ v2) end)

for {address_hash, elements} <- grouped do
Endpoint.broadcast("addresses:#{address_hash}", event, %{map_key => elements})
end
end
end