From 65a83ecf74768eba24d52d15642d690adb3be6b9 Mon Sep 17 00:00:00 2001 From: Samuel <42943690+samuel-uniris@users.noreply.github.com> Date: Thu, 9 Jun 2022 10:17:14 +0200 Subject: [PATCH] Fix beacon live transaction chain loading (#360) Improve transaction loading in the beacon explorer by using transaction chain to fetch the updates and fix cache usage to avoid stale or inconsistent data. --- lib/archethic/crypto.ex | 4 +- lib/archethic_web/live/chains/beacon_live.ex | 362 ++++++++---------- .../explorer/beacon_chain_index.html.leex | 2 +- 3 files changed, 170 insertions(+), 198 deletions(-) diff --git a/lib/archethic/crypto.ex b/lib/archethic/crypto.ex index f9edaf921..22c2158a0 100755 --- a/lib/archethic/crypto.ex +++ b/lib/archethic/crypto.ex @@ -906,9 +906,9 @@ defmodule Archethic.Crypto do defp do_hash(data, :blake2b), do: :crypto.hash(:blake2b, data) @doc """ - Generate an address as per ARCHEthic specification + Generate an address as per Archethic specification - The fist-byte representing the curve type second-byte representing hash algorithm used and rest is the hash of publicKey as per ARCHEthic specifications . + The fist-byte representing the curve type second-byte representing hash algorithm used and rest is the hash of publicKey as per Archethic specifications . ## Examples diff --git a/lib/archethic_web/live/chains/beacon_live.ex b/lib/archethic_web/live/chains/beacon_live.ex index f82aa6a5c..f24c7b406 100644 --- a/lib/archethic_web/live/chains/beacon_live.ex +++ b/lib/archethic_web/live/chains/beacon_live.ex @@ -13,40 +13,41 @@ defmodule ArchethicWeb.BeaconChainLive do alias Archethic.Election alias Archethic.P2P + alias Archethic.P2P.Message.GetLastTransactionAddress alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.GetBeaconSummaries - alias Archethic.P2P.Message.BeaconSummaryList - alias Archethic.P2P.Message.NotFound + alias Archethic.P2P.Message.LastTransactionAddress alias Archethic.P2P.Message.TransactionList + alias Archethic.P2P.Node + alias Archethic.PubSub alias Archethic.SelfRepair.Sync.BeaconSummaryHandler - alias Archethic.TaskSupervisor - alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.TransactionData alias ArchethicWeb.ExplorerView + alias ArchethicWeb.TransactionCache alias Phoenix.View require Logger - alias ArchethicWeb.TransactionCache def mount(_params, _session, socket) do next_summary_time = BeaconChain.next_summary_date(DateTime.utc_now()) if connected?(socket) do PubSub.register_to_next_summary_time() - # register for client to able to get the current added transaction to the beacon pool PubSub.register_to_current_epoch_of_slot_time() PubSub.register_to_new_replication_attestations() + + # register for client to able to get the current added transaction to the beacon pool + BeaconChain.register_to_beacon_pool_updates() end beacon_dates = - case get_beacon_dates() |> Enum.to_list() do + case get_beacon_dates() do [] -> [next_summary_time] @@ -54,18 +55,14 @@ defmodule ArchethicWeb.BeaconChainLive do [next_summary_time | dates] end - BeaconChain.register_to_beacon_pool_updates() - new_assign = socket |> assign(:next_summary_time, next_summary_time) |> assign(:dates, beacon_dates) |> assign(:current_date_page, 1) |> assign(:update_time, DateTime.utc_now()) - |> assign( - :transactions, - [] - ) + |> assign(:transactions, []) + |> assign(:live_cache, []) |> assign(:fetching, true) {:ok, new_assign} @@ -79,36 +76,32 @@ defmodule ArchethicWeb.BeaconChainLive do page = Map.get(params, "page", "1") case Integer.parse(page) do - {number, ""} when number > 0 and is_list(dates) -> - if number > length(dates) do - {:noreply, - push_redirect(socket, to: Routes.live_path(socket, __MODULE__, %{"page" => 1}))} - else - new_assign = - socket - |> assign(:current_date_page, number) - |> assign(:transactions, []) - |> assign(:fetching, true) + {1, ""} -> + send(self(), :initial_load) - if Enum.at(dates, 0) == Enum.at(dates, number - 1) do - next_summary_date = - dates - |> Enum.at(number - 1) + new_assign = + socket + |> assign(:current_date_page, 1) + |> assign(:transactions, []) + |> assign(:fetching, true) - send(self(), {:initial_load, next_summary_date}) - else - date = - dates - |> Enum.at(number - 1) + {:noreply, new_assign} - send(self(), {:load_at, date}) - end + {number, ""} when number > 1 and number <= length(dates) -> + new_assign = + socket + |> assign(:current_date_page, number) + |> assign(:transactions, []) + |> assign(:fetching, true) - {:noreply, new_assign} - end + date = Enum.at(dates, number - 1) + send(self(), {:load_at, date}) + + {:noreply, new_assign} _ -> - {:noreply, socket} + {:noreply, + push_redirect(socket, to: Routes.live_path(socket, __MODULE__, %{"page" => 1}))} end end @@ -120,12 +113,19 @@ defmodule ArchethicWeb.BeaconChainLive do {:noreply, push_redirect(socket, to: Routes.live_path(socket, __MODULE__, %{"page" => page}))} end - def handle_info({:initial_load, next_summary_time}, socket) do - transactions = list_transaction_by_date_from_tx_chain(next_summary_time) + def handle_info( + :initial_load, + socket = %{assigns: %{live_cache: live_cache}} + ) do + # Fetch from the latest transaction on the chain + transactions = list_transaction_from_chain() + + # Integrate the transactions updates from the live feed + all_txs = Enum.sort_by(live_cache ++ transactions, & &1.timestamp, {:desc, DateTime}) new_socket = socket - |> assign(:transactions, transactions) + |> assign(:transactions, all_txs) |> assign(:update_time, DateTime.utc_now()) |> assign(:fetching, false) @@ -133,10 +133,10 @@ defmodule ArchethicWeb.BeaconChainLive do end def handle_info({:load_at, date}, socket) do - # Caching transactions = list_transaction_by_date(date) + # Try to fetch from the cache, other fetch from the beacon summary {:ok, transactions} = TransactionCache.resolve(date, fn -> - list_transaction_by_date(date) + list_transactions_from_summary(date) end) new_assign = @@ -157,30 +157,27 @@ defmodule ArchethicWeb.BeaconChainLive do } } ) do - new_assign = - socket - |> assign(:update_time, DateTime.utc_now()) - if page == 1 and !Enum.any?(transactions, fn tx -> tx.address == tx_summary.address end) do # Only update the transaction listed when you are on the first page - new_assign = + new_socket = case Map.get(assigns, :summary_passed?) do true -> - new_assign + socket |> assign(:transactions, Enum.uniq([tx_summary | transactions])) + |> update(:live_cache, &[tx_summary | &1]) |> assign(:summary_passed?, false) + |> assign(:update_time, DateTime.utc_now()) _ -> - update( - new_assign, - :transactions, - &Enum.uniq([tx_summary | &1]) - ) + socket + |> update(:transactions, &Enum.uniq([tx_summary | &1])) + |> update(:live_cache, &[tx_summary | &1]) + |> assign(:update_time, DateTime.utc_now()) end - {:noreply, new_assign} + {:noreply, new_socket} else - {:noreply, new_assign} + {:noreply, socket} end end @@ -188,38 +185,27 @@ defmodule ArchethicWeb.BeaconChainLive do {:next_summary_time, next_summary_date}, socket = %{ assigns: %{ - current_date_page: page, dates: dates } } ) do - new_next_summary = - if :gt == DateTime.compare(next_summary_date, DateTime.utc_now()) do - next_summary_date - else - BeaconChain.next_summary_date(DateTime.utc_now()) - end + new_dates = [next_summary_date | dates] - new_dates = [new_next_summary | dates] + [last_date | _] = dates - # Caching transactions = - # new_dates - # |> Enum.at(page - 1) - # |> list_transaction_by_date() - date = - new_dates - |> Enum.at(page - 1) - - {:ok, transactions} = - TransactionCache.resolve(date, fn -> - list_transaction_by_date_from_beacon_summary(date) - end) + # Flush into cache the transactions from the chain for the last beacon cycle + TransactionCache.put( + last_date, + list_transaction_from_chain(last_date) + ) new_assign = socket - |> assign(:transactions, transactions) + |> assign(:transactions, []) + |> assign(:live_cache, []) |> assign(:dates, new_dates) - |> assign(:next_summary_time, new_next_summary) + |> assign(:next_summary_time, next_summary_date) + |> assign(:update_time, DateTime.utc_now()) {:noreply, new_assign} end @@ -228,164 +214,150 @@ defmodule ArchethicWeb.BeaconChainLive do {:current_epoch_of_slot_timer, date}, socket ) do - date - |> BeaconChain.register_to_beacon_pool_updates() + # We refresh the live feed subscription at each slot time + BeaconChain.register_to_beacon_pool_updates(date) {:noreply, socket} end defp get_beacon_dates do %Node{enrollment_date: enrollment_date} = - P2P.list_nodes() |> Enum.sort_by(& &1.enrollment_date, {:asc, DateTime}) |> Enum.at(0) + P2P.list_nodes() + |> Enum.sort_by(& &1.enrollment_date, {:asc, DateTime}) + |> Enum.at(0) enrollment_date |> SummaryTimer.previous_summaries() |> Enum.sort({:desc, DateTime}) end - defp get_beacon_summary_transaction_chain(beacon_address, nodes, patch) do - nodes - |> P2P.nearest_nodes(patch) - |> do_get_download_summary_transaction_chain(beacon_address) - end - - defp do_get_download_summary_transaction_chain(nodes, address, opts \\ [], acc \\ []) - - defp do_get_download_summary_transaction_chain( - nodes = [node | rest], - address, - opts, - acc - ) do - message = %GetTransactionChain{ - address: address, - paging_state: Keyword.get(opts, :paging_state) - } - - case P2P.send_message(node, message) do - {:ok, %TransactionList{transactions: transactions, more?: false}} -> - {:ok, Enum.uniq_by(acc ++ transactions, & &1.address)} - - {:ok, %TransactionList{transactions: transactions, more?: true, paging_state: paging_state}} -> - do_get_download_summary_transaction_chain( - nodes, - address, - [paging_state: paging_state], - Enum.uniq_by(acc ++ transactions, & &1.address) - ) - - {:error, _} -> - do_get_download_summary_transaction_chain( - rest, - address, - opts, - acc - ) - end - end + defp list_transactions_from_summary(date = %DateTime{}) do + %Node{network_patch: patch} = P2P.get_node_info() - defp do_get_download_summary_transaction_chain([], _, _, _), do: {:error, :network_issue} + node_list = P2P.authorized_and_available_nodes() - defp list_transaction_by_date(date = %DateTime{}) do - Enum.reduce(BeaconChain.list_subsets(), %{}, fn subset, acc -> - b_address = Crypto.derive_beacon_chain_address(subset, date, true) - node_list = P2P.authorized_and_available_nodes() + BeaconChain.list_subsets() + |> Flow.from_enumerable() + |> Flow.map(fn subset -> nodes = Election.beacon_storage_nodes(subset, date, node_list) - - Enum.reduce(nodes, acc, fn node, acc -> - Map.update(acc, node, [b_address], &[b_address | &1]) - end) + {subset, nodes} end) - |> Stream.transform([], fn - {_, []}, acc -> - {[], acc} + |> Flow.partition(key: {:elem, 0}) + |> Flow.reduce(fn -> [] end, fn {subset, nodes}, acc -> + address = Crypto.derive_beacon_chain_address(subset, date, true) - {node, addresses}, acc -> - addresses_to_fetch = Enum.reject(addresses, &(&1 in acc)) + case BeaconSummaryHandler.download_summary(address, nodes, patch) do + {:ok, %BeaconSummary{transaction_attestations: attestations}} -> + tx_summaries = Enum.map(attestations, & &1.transaction_summary) + tx_summaries ++ acc - case P2P.send_message(node, %GetBeaconSummaries{addresses: addresses_to_fetch}) do - {:ok, %BeaconSummaryList{summaries: summaries}} -> - summaries_address_resolved = - Enum.map( - summaries, - &Crypto.derive_beacon_chain_address(&1.subset, &1.summary_time, true) - ) - - {summaries, acc ++ summaries_address_resolved} - - _ -> - {[], acc} - end - end) - |> Stream.map(fn %BeaconSummary{transaction_attestations: transaction_attestations} -> - Enum.map(transaction_attestations, & &1.transaction_summary) + _ -> + acc + end end) - |> Stream.flat_map(& &1) + |> Enum.to_list() + |> List.flatten() + |> Enum.uniq_by(& &1.address) |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) end - defp list_transaction_by_date(nil), do: [] + defp list_transactions_from_summary(nil), do: [] - defp list_transaction_by_date_from_tx_chain(date = %DateTime{}) do + defp list_transaction_from_chain(date = %DateTime{} \\ DateTime.utc_now()) do %Node{network_patch: patch} = P2P.get_node_info() - Task.Supervisor.async_stream(TaskSupervisor, BeaconChain.list_subsets(), fn subset -> - b_address = Crypto.derive_beacon_chain_address(subset, date, true) - node_list = P2P.authorized_and_available_nodes() - nodes = Election.beacon_storage_nodes(subset, date, node_list) - - {:ok, transactions} = get_beacon_summary_transaction_chain(b_address, nodes, patch) + node_list = P2P.authorized_nodes() - transactions - |> Stream.map(&deserialize_beacon_transaction/1) - |> Enum.to_list() - end) - |> Enum.map(fn {:ok, txs} -> txs end) - |> :lists.flatten() - |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) - end + ref_time = DateTime.truncate(date, :millisecond) - def list_transaction_by_date_from_beacon_summary(nil), do: [] + genesis_date = BeaconChain.previous_summary_time(ref_time) + next_summary_date = BeaconChain.next_summary_date(ref_time) - def list_transaction_by_date_from_beacon_summary(date = %DateTime{}) do BeaconChain.list_subsets() |> Flow.from_enumerable() |> Flow.map(fn subset -> - b_address = Crypto.derive_beacon_chain_address(subset, date, true) - node_list = P2P.authorized_nodes() - nodes = Election.beacon_storage_nodes(subset, date, node_list) - %Node{network_patch: patch} = P2P.get_node_info() - {b_address, nodes, patch} - end) - |> Flow.partition() - |> Flow.reduce(fn -> [] end, fn {address, nodes, patch}, acc -> - transaction_attestations = - case BeaconSummaryHandler.download_summary(address, nodes, patch) do - {:ok, beacon_summary = %BeaconSummary{}} -> - %BeaconSummary{transaction_attestations: tx_ats} = beacon_summary - tx_ats - - {:ok, %NotFound{}} -> - [] + address = Crypto.derive_beacon_chain_address(subset, genesis_date) - _ -> + nodes = + subset + |> Election.beacon_storage_nodes(next_summary_date, node_list) + |> P2P.nearest_nodes(patch) + + {address, nodes, subset} + end) + |> Flow.partition(key: {:elem, 2}) + |> Flow.reduce(fn -> [] end, fn {address, nodes, _subset}, acc -> + transactions = + with {:ok, last_address} <- get_last_beacon_address(nodes, address), + {:ok, transactions} <- get_beacon_chain(nodes, last_address) do + transactions + |> Stream.filter(&(&1.type == :beacon)) + |> Stream.map(&deserialize_beacon_transaction/1) + |> Enum.to_list() + else + {:error, :network_issue} -> [] end - if Enum.empty?(transaction_attestations) do - acc - else - [%ReplicationAttestation{transaction_summary: tx_summary}] = transaction_attestations - [tx_summary | acc] - end + transactions ++ acc end) |> Enum.to_list() - |> Enum.uniq() |> List.flatten() + |> Enum.uniq_by(& &1.address) |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) end - defp deserialize_beacon_transaction(%Transaction{data: %TransactionData{content: content}}) do + defp get_last_beacon_address([node | rest], address) do + case P2P.send_message(node, %GetLastTransactionAddress{ + address: address, + timestamp: DateTime.utc_now() + }) do + {:ok, %LastTransactionAddress{address: last_address}} -> + {:ok, last_address} + + {:error, _} -> + get_last_beacon_address(rest, address) + end + end + + defp get_last_beacon_address([], _), do: {:error, :network_issue} + + defp get_beacon_chain(nodes, address, opts \\ [], acc \\ []) + + defp get_beacon_chain(nodes = [node | rest], address, opts, acc) do + message = %GetTransactionChain{ + address: address, + paging_state: Keyword.get(opts, :paging_state) + } + + case P2P.send_message(node, message) do + {:ok, %TransactionList{transactions: transactions, more?: false}} -> + {:ok, Enum.uniq_by(acc ++ transactions, & &1.address)} + + {:ok, %TransactionList{transactions: transactions, more?: true, paging_state: paging_state}} -> + get_beacon_chain( + nodes, + address, + [paging_state: paging_state], + Enum.uniq_by(acc ++ transactions, & &1.address) + ) + + {:error, _} -> + get_beacon_chain( + rest, + address, + opts, + acc + ) + end + end + + defp get_beacon_chain([], _, _, _), do: {:error, :network_issue} + + defp deserialize_beacon_transaction(%Transaction{ + type: :beacon, + data: %TransactionData{content: content} + }) do {slot, _} = Slot.deserialize(content) %Slot{transaction_attestations: transaction_attestations} = slot Enum.map(transaction_attestations, & &1.transaction_summary) diff --git a/lib/archethic_web/templates/explorer/beacon_chain_index.html.leex b/lib/archethic_web/templates/explorer/beacon_chain_index.html.leex index 91903c141..bed5a16f2 100644 --- a/lib/archethic_web/templates/explorer/beacon_chain_index.html.leex +++ b/lib/archethic_web/templates/explorer/beacon_chain_index.html.leex @@ -3,7 +3,7 @@

Beacon chain

- Beacon Chains are the entire summary of all the transactions based on time slots of the ARCHEThic P2P network. + Beacon Chains are the entire summary of all the transactions based on time slots of the Archethic P2P network. Since no node has the physical ability to know the status of each transaction in an unlimited network, Beacon Chains are a specific set of transaction chains responsible for the global synchronization of the network.

Last Changes From <%= format_date(@update_time) %>