From bdb8f49338af32e2e157cb2b09c00ff38b0b69c6 Mon Sep 17 00:00:00 2001 From: Samuel Date: Fri, 10 Jun 2022 00:30:01 +0200 Subject: [PATCH] Integrate quorum reads into beacon live explorer --- lib/archethic_web/live/chains/beacon_live.ex | 69 +++----------------- 1 file changed, 10 insertions(+), 59 deletions(-) diff --git a/lib/archethic_web/live/chains/beacon_live.ex b/lib/archethic_web/live/chains/beacon_live.ex index f24c7b4068..0d87823298 100644 --- a/lib/archethic_web/live/chains/beacon_live.ex +++ b/lib/archethic_web/live/chains/beacon_live.ex @@ -13,17 +13,13 @@ defmodule ArchethicWeb.BeaconChainLive do alias Archethic.Election alias Archethic.P2P - alias Archethic.P2P.Message.GetLastTransactionAddress - alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.LastTransactionAddress - alias Archethic.P2P.Message.TransactionList - alias Archethic.P2P.Node alias Archethic.PubSub alias Archethic.SelfRepair.Sync.BeaconSummaryHandler + alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.TransactionData @@ -288,13 +284,15 @@ defmodule ArchethicWeb.BeaconChainLive do |> Flow.partition(key: {:elem, 2}) |> Flow.reduce(fn -> [] end, fn {address, nodes, _subset}, acc -> transactions = - with {:ok, last_address} <- get_last_beacon_address(nodes, address), - {:ok, transactions} <- get_beacon_chain(nodes, last_address) do - transactions - |> Stream.filter(&(&1.type == :beacon)) - |> Stream.map(&deserialize_beacon_transaction/1) - |> Enum.to_list() - else + case TransactionChain.fetch_last_address_remotely(address, nodes) do + {:ok, last_address} -> + last_address + |> TransactionChain.stream_remotely(nodes) + |> Stream.flat_map(& &1) + |> Stream.filter(&(&1.type == :beacon)) + |> Stream.map(&deserialize_beacon_transaction/1) + |> Enum.to_list() + {:error, :network_issue} -> [] end @@ -307,53 +305,6 @@ defmodule ArchethicWeb.BeaconChainLive do |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) end - defp get_last_beacon_address([node | rest], address) do - case P2P.send_message(node, %GetLastTransactionAddress{ - address: address, - timestamp: DateTime.utc_now() - }) do - {:ok, %LastTransactionAddress{address: last_address}} -> - {:ok, last_address} - - {:error, _} -> - get_last_beacon_address(rest, address) - end - end - - defp get_last_beacon_address([], _), do: {:error, :network_issue} - - defp get_beacon_chain(nodes, address, opts \\ [], acc \\ []) - - defp get_beacon_chain(nodes = [node | rest], address, opts, acc) do - message = %GetTransactionChain{ - address: address, - paging_state: Keyword.get(opts, :paging_state) - } - - case P2P.send_message(node, message) do - {:ok, %TransactionList{transactions: transactions, more?: false}} -> - {:ok, Enum.uniq_by(acc ++ transactions, & &1.address)} - - {:ok, %TransactionList{transactions: transactions, more?: true, paging_state: paging_state}} -> - get_beacon_chain( - nodes, - address, - [paging_state: paging_state], - Enum.uniq_by(acc ++ transactions, & &1.address) - ) - - {:error, _} -> - get_beacon_chain( - rest, - address, - opts, - acc - ) - end - end - - defp get_beacon_chain([], _, _, _), do: {:error, :network_issue} - defp deserialize_beacon_transaction(%Transaction{ type: :beacon, data: %TransactionData{content: content}