Skip to content

Commit

Permalink
Integrate quorum reads into beacon live explorer
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel committed Jun 13, 2022
1 parent d9ea1d5 commit bdb8f49
Showing 1 changed file with 10 additions and 59 deletions.
69 changes: 10 additions & 59 deletions lib/archethic_web/live/chains/beacon_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,13 @@ defmodule ArchethicWeb.BeaconChainLive do
alias Archethic.Election

alias Archethic.P2P
alias Archethic.P2P.Message.GetLastTransactionAddress
alias Archethic.P2P.Message.GetTransactionChain
alias Archethic.P2P.Message.LastTransactionAddress
alias Archethic.P2P.Message.TransactionList

alias Archethic.P2P.Node

alias Archethic.PubSub

alias Archethic.SelfRepair.Sync.BeaconSummaryHandler

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionData

Expand Down Expand Up @@ -288,13 +284,15 @@ defmodule ArchethicWeb.BeaconChainLive do
|> Flow.partition(key: {:elem, 2})
|> Flow.reduce(fn -> [] end, fn {address, nodes, _subset}, acc ->
transactions =
with {:ok, last_address} <- get_last_beacon_address(nodes, address),
{:ok, transactions} <- get_beacon_chain(nodes, last_address) do
transactions
|> Stream.filter(&(&1.type == :beacon))
|> Stream.map(&deserialize_beacon_transaction/1)
|> Enum.to_list()
else
case TransactionChain.fetch_last_address_remotely(address, nodes) do
{:ok, last_address} ->
last_address
|> TransactionChain.stream_remotely(nodes)
|> Stream.flat_map(& &1)
|> Stream.filter(&(&1.type == :beacon))
|> Stream.map(&deserialize_beacon_transaction/1)
|> Enum.to_list()

{:error, :network_issue} ->
[]
end
Expand All @@ -307,53 +305,6 @@ defmodule ArchethicWeb.BeaconChainLive do
|> Enum.sort_by(& &1.timestamp, {:desc, DateTime})
end

defp get_last_beacon_address([node | rest], address) do
case P2P.send_message(node, %GetLastTransactionAddress{
address: address,
timestamp: DateTime.utc_now()
}) do
{:ok, %LastTransactionAddress{address: last_address}} ->
{:ok, last_address}

{:error, _} ->
get_last_beacon_address(rest, address)
end
end

defp get_last_beacon_address([], _), do: {:error, :network_issue}

defp get_beacon_chain(nodes, address, opts \\ [], acc \\ [])

defp get_beacon_chain(nodes = [node | rest], address, opts, acc) do
message = %GetTransactionChain{
address: address,
paging_state: Keyword.get(opts, :paging_state)
}

case P2P.send_message(node, message) do
{:ok, %TransactionList{transactions: transactions, more?: false}} ->
{:ok, Enum.uniq_by(acc ++ transactions, & &1.address)}

{:ok, %TransactionList{transactions: transactions, more?: true, paging_state: paging_state}} ->
get_beacon_chain(
nodes,
address,
[paging_state: paging_state],
Enum.uniq_by(acc ++ transactions, & &1.address)
)

{:error, _} ->
get_beacon_chain(
rest,
address,
opts,
acc
)
end
end

defp get_beacon_chain([], _, _, _), do: {:error, :network_issue}

defp deserialize_beacon_transaction(%Transaction{
type: :beacon,
data: %TransactionData{content: content}
Expand Down

0 comments on commit bdb8f49

Please sign in to comment.