diff --git a/lib/archethic/crypto.ex b/lib/archethic/crypto.ex index f9edaf921..22c2158a0 100755 --- a/lib/archethic/crypto.ex +++ b/lib/archethic/crypto.ex @@ -906,9 +906,9 @@ defmodule Archethic.Crypto do defp do_hash(data, :blake2b), do: :crypto.hash(:blake2b, data) @doc """ - Generate an address as per ARCHEthic specification + Generate an address as per Archethic specification - The fist-byte representing the curve type second-byte representing hash algorithm used and rest is the hash of publicKey as per ARCHEthic specifications . + The fist-byte representing the curve type second-byte representing hash algorithm used and rest is the hash of publicKey as per Archethic specifications . ## Examples diff --git a/lib/archethic_web/live/chains/beacon_live.ex b/lib/archethic_web/live/chains/beacon_live.ex index f82aa6a5c..f24c7b406 100644 --- a/lib/archethic_web/live/chains/beacon_live.ex +++ b/lib/archethic_web/live/chains/beacon_live.ex @@ -13,40 +13,41 @@ defmodule ArchethicWeb.BeaconChainLive do alias Archethic.Election alias Archethic.P2P + alias Archethic.P2P.Message.GetLastTransactionAddress alias Archethic.P2P.Message.GetTransactionChain - alias Archethic.P2P.Message.GetBeaconSummaries - alias Archethic.P2P.Message.BeaconSummaryList - alias Archethic.P2P.Message.NotFound + alias Archethic.P2P.Message.LastTransactionAddress alias Archethic.P2P.Message.TransactionList + alias Archethic.P2P.Node + alias Archethic.PubSub alias Archethic.SelfRepair.Sync.BeaconSummaryHandler - alias Archethic.TaskSupervisor - alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.TransactionData alias ArchethicWeb.ExplorerView + alias ArchethicWeb.TransactionCache alias Phoenix.View require Logger - alias ArchethicWeb.TransactionCache def mount(_params, _session, socket) do next_summary_time = BeaconChain.next_summary_date(DateTime.utc_now()) if connected?(socket) do PubSub.register_to_next_summary_time() - # register for client to able to get the current added transaction to the beacon pool PubSub.register_to_current_epoch_of_slot_time() PubSub.register_to_new_replication_attestations() + + # register for client to able to get the current added transaction to the beacon pool + BeaconChain.register_to_beacon_pool_updates() end beacon_dates = - case get_beacon_dates() |> Enum.to_list() do + case get_beacon_dates() do [] -> [next_summary_time] @@ -54,18 +55,14 @@ defmodule ArchethicWeb.BeaconChainLive do [next_summary_time | dates] end - BeaconChain.register_to_beacon_pool_updates() - new_assign = socket |> assign(:next_summary_time, next_summary_time) |> assign(:dates, beacon_dates) |> assign(:current_date_page, 1) |> assign(:update_time, DateTime.utc_now()) - |> assign( - :transactions, - [] - ) + |> assign(:transactions, []) + |> assign(:live_cache, []) |> assign(:fetching, true) {:ok, new_assign} @@ -79,36 +76,32 @@ defmodule ArchethicWeb.BeaconChainLive do page = Map.get(params, "page", "1") case Integer.parse(page) do - {number, ""} when number > 0 and is_list(dates) -> - if number > length(dates) do - {:noreply, - push_redirect(socket, to: Routes.live_path(socket, __MODULE__, %{"page" => 1}))} - else - new_assign = - socket - |> assign(:current_date_page, number) - |> assign(:transactions, []) - |> assign(:fetching, true) + {1, ""} -> + send(self(), :initial_load) - if Enum.at(dates, 0) == Enum.at(dates, number - 1) do - next_summary_date = - dates - |> Enum.at(number - 1) + new_assign = + socket + |> assign(:current_date_page, 1) + |> assign(:transactions, []) + |> assign(:fetching, true) - send(self(), {:initial_load, next_summary_date}) - else - date = - dates - |> Enum.at(number - 1) + {:noreply, new_assign} - send(self(), {:load_at, date}) - end + {number, ""} when number > 1 and number <= length(dates) -> + new_assign = + socket + |> assign(:current_date_page, number) + |> assign(:transactions, []) + |> assign(:fetching, true) - {:noreply, new_assign} - end + date = Enum.at(dates, number - 1) + send(self(), {:load_at, date}) + + {:noreply, new_assign} _ -> - {:noreply, socket} + {:noreply, + push_redirect(socket, to: Routes.live_path(socket, __MODULE__, %{"page" => 1}))} end end @@ -120,12 +113,19 @@ defmodule ArchethicWeb.BeaconChainLive do {:noreply, push_redirect(socket, to: Routes.live_path(socket, __MODULE__, %{"page" => page}))} end - def handle_info({:initial_load, next_summary_time}, socket) do - transactions = list_transaction_by_date_from_tx_chain(next_summary_time) + def handle_info( + :initial_load, + socket = %{assigns: %{live_cache: live_cache}} + ) do + # Fetch from the latest transaction on the chain + transactions = list_transaction_from_chain() + + # Integrate the transactions updates from the live feed + all_txs = Enum.sort_by(live_cache ++ transactions, & &1.timestamp, {:desc, DateTime}) new_socket = socket - |> assign(:transactions, transactions) + |> assign(:transactions, all_txs) |> assign(:update_time, DateTime.utc_now()) |> assign(:fetching, false) @@ -133,10 +133,10 @@ defmodule ArchethicWeb.BeaconChainLive do end def handle_info({:load_at, date}, socket) do - # Caching transactions = list_transaction_by_date(date) + # Try to fetch from the cache, other fetch from the beacon summary {:ok, transactions} = TransactionCache.resolve(date, fn -> - list_transaction_by_date(date) + list_transactions_from_summary(date) end) new_assign = @@ -157,30 +157,27 @@ defmodule ArchethicWeb.BeaconChainLive do } } ) do - new_assign = - socket - |> assign(:update_time, DateTime.utc_now()) - if page == 1 and !Enum.any?(transactions, fn tx -> tx.address == tx_summary.address end) do # Only update the transaction listed when you are on the first page - new_assign = + new_socket = case Map.get(assigns, :summary_passed?) do true -> - new_assign + socket |> assign(:transactions, Enum.uniq([tx_summary | transactions])) + |> update(:live_cache, &[tx_summary | &1]) |> assign(:summary_passed?, false) + |> assign(:update_time, DateTime.utc_now()) _ -> - update( - new_assign, - :transactions, - &Enum.uniq([tx_summary | &1]) - ) + socket + |> update(:transactions, &Enum.uniq([tx_summary | &1])) + |> update(:live_cache, &[tx_summary | &1]) + |> assign(:update_time, DateTime.utc_now()) end - {:noreply, new_assign} + {:noreply, new_socket} else - {:noreply, new_assign} + {:noreply, socket} end end @@ -188,38 +185,27 @@ defmodule ArchethicWeb.BeaconChainLive do {:next_summary_time, next_summary_date}, socket = %{ assigns: %{ - current_date_page: page, dates: dates } } ) do - new_next_summary = - if :gt == DateTime.compare(next_summary_date, DateTime.utc_now()) do - next_summary_date - else - BeaconChain.next_summary_date(DateTime.utc_now()) - end + new_dates = [next_summary_date | dates] - new_dates = [new_next_summary | dates] + [last_date | _] = dates - # Caching transactions = - # new_dates - # |> Enum.at(page - 1) - # |> list_transaction_by_date() - date = - new_dates - |> Enum.at(page - 1) - - {:ok, transactions} = - TransactionCache.resolve(date, fn -> - list_transaction_by_date_from_beacon_summary(date) - end) + # Flush into cache the transactions from the chain for the last beacon cycle + TransactionCache.put( + last_date, + list_transaction_from_chain(last_date) + ) new_assign = socket - |> assign(:transactions, transactions) + |> assign(:transactions, []) + |> assign(:live_cache, []) |> assign(:dates, new_dates) - |> assign(:next_summary_time, new_next_summary) + |> assign(:next_summary_time, next_summary_date) + |> assign(:update_time, DateTime.utc_now()) {:noreply, new_assign} end @@ -228,164 +214,150 @@ defmodule ArchethicWeb.BeaconChainLive do {:current_epoch_of_slot_timer, date}, socket ) do - date - |> BeaconChain.register_to_beacon_pool_updates() + # We refresh the live feed subscription at each slot time + BeaconChain.register_to_beacon_pool_updates(date) {:noreply, socket} end defp get_beacon_dates do %Node{enrollment_date: enrollment_date} = - P2P.list_nodes() |> Enum.sort_by(& &1.enrollment_date, {:asc, DateTime}) |> Enum.at(0) + P2P.list_nodes() + |> Enum.sort_by(& &1.enrollment_date, {:asc, DateTime}) + |> Enum.at(0) enrollment_date |> SummaryTimer.previous_summaries() |> Enum.sort({:desc, DateTime}) end - defp get_beacon_summary_transaction_chain(beacon_address, nodes, patch) do - nodes - |> P2P.nearest_nodes(patch) - |> do_get_download_summary_transaction_chain(beacon_address) - end - - defp do_get_download_summary_transaction_chain(nodes, address, opts \\ [], acc \\ []) - - defp do_get_download_summary_transaction_chain( - nodes = [node | rest], - address, - opts, - acc - ) do - message = %GetTransactionChain{ - address: address, - paging_state: Keyword.get(opts, :paging_state) - } - - case P2P.send_message(node, message) do - {:ok, %TransactionList{transactions: transactions, more?: false}} -> - {:ok, Enum.uniq_by(acc ++ transactions, & &1.address)} - - {:ok, %TransactionList{transactions: transactions, more?: true, paging_state: paging_state}} -> - do_get_download_summary_transaction_chain( - nodes, - address, - [paging_state: paging_state], - Enum.uniq_by(acc ++ transactions, & &1.address) - ) - - {:error, _} -> - do_get_download_summary_transaction_chain( - rest, - address, - opts, - acc - ) - end - end + defp list_transactions_from_summary(date = %DateTime{}) do + %Node{network_patch: patch} = P2P.get_node_info() - defp do_get_download_summary_transaction_chain([], _, _, _), do: {:error, :network_issue} + node_list = P2P.authorized_and_available_nodes() - defp list_transaction_by_date(date = %DateTime{}) do - Enum.reduce(BeaconChain.list_subsets(), %{}, fn subset, acc -> - b_address = Crypto.derive_beacon_chain_address(subset, date, true) - node_list = P2P.authorized_and_available_nodes() + BeaconChain.list_subsets() + |> Flow.from_enumerable() + |> Flow.map(fn subset -> nodes = Election.beacon_storage_nodes(subset, date, node_list) - - Enum.reduce(nodes, acc, fn node, acc -> - Map.update(acc, node, [b_address], &[b_address | &1]) - end) + {subset, nodes} end) - |> Stream.transform([], fn - {_, []}, acc -> - {[], acc} + |> Flow.partition(key: {:elem, 0}) + |> Flow.reduce(fn -> [] end, fn {subset, nodes}, acc -> + address = Crypto.derive_beacon_chain_address(subset, date, true) - {node, addresses}, acc -> - addresses_to_fetch = Enum.reject(addresses, &(&1 in acc)) + case BeaconSummaryHandler.download_summary(address, nodes, patch) do + {:ok, %BeaconSummary{transaction_attestations: attestations}} -> + tx_summaries = Enum.map(attestations, & &1.transaction_summary) + tx_summaries ++ acc - case P2P.send_message(node, %GetBeaconSummaries{addresses: addresses_to_fetch}) do - {:ok, %BeaconSummaryList{summaries: summaries}} -> - summaries_address_resolved = - Enum.map( - summaries, - &Crypto.derive_beacon_chain_address(&1.subset, &1.summary_time, true) - ) - - {summaries, acc ++ summaries_address_resolved} - - _ -> - {[], acc} - end - end) - |> Stream.map(fn %BeaconSummary{transaction_attestations: transaction_attestations} -> - Enum.map(transaction_attestations, & &1.transaction_summary) + _ -> + acc + end end) - |> Stream.flat_map(& &1) + |> Enum.to_list() + |> List.flatten() + |> Enum.uniq_by(& &1.address) |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) end - defp list_transaction_by_date(nil), do: [] + defp list_transactions_from_summary(nil), do: [] - defp list_transaction_by_date_from_tx_chain(date = %DateTime{}) do + defp list_transaction_from_chain(date = %DateTime{} \\ DateTime.utc_now()) do %Node{network_patch: patch} = P2P.get_node_info() - Task.Supervisor.async_stream(TaskSupervisor, BeaconChain.list_subsets(), fn subset -> - b_address = Crypto.derive_beacon_chain_address(subset, date, true) - node_list = P2P.authorized_and_available_nodes() - nodes = Election.beacon_storage_nodes(subset, date, node_list) - - {:ok, transactions} = get_beacon_summary_transaction_chain(b_address, nodes, patch) + node_list = P2P.authorized_nodes() - transactions - |> Stream.map(&deserialize_beacon_transaction/1) - |> Enum.to_list() - end) - |> Enum.map(fn {:ok, txs} -> txs end) - |> :lists.flatten() - |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) - end + ref_time = DateTime.truncate(date, :millisecond) - def list_transaction_by_date_from_beacon_summary(nil), do: [] + genesis_date = BeaconChain.previous_summary_time(ref_time) + next_summary_date = BeaconChain.next_summary_date(ref_time) - def list_transaction_by_date_from_beacon_summary(date = %DateTime{}) do BeaconChain.list_subsets() |> Flow.from_enumerable() |> Flow.map(fn subset -> - b_address = Crypto.derive_beacon_chain_address(subset, date, true) - node_list = P2P.authorized_nodes() - nodes = Election.beacon_storage_nodes(subset, date, node_list) - %Node{network_patch: patch} = P2P.get_node_info() - {b_address, nodes, patch} - end) - |> Flow.partition() - |> Flow.reduce(fn -> [] end, fn {address, nodes, patch}, acc -> - transaction_attestations = - case BeaconSummaryHandler.download_summary(address, nodes, patch) do - {:ok, beacon_summary = %BeaconSummary{}} -> - %BeaconSummary{transaction_attestations: tx_ats} = beacon_summary - tx_ats - - {:ok, %NotFound{}} -> - [] + address = Crypto.derive_beacon_chain_address(subset, genesis_date) - _ -> + nodes = + subset + |> Election.beacon_storage_nodes(next_summary_date, node_list) + |> P2P.nearest_nodes(patch) + + {address, nodes, subset} + end) + |> Flow.partition(key: {:elem, 2}) + |> Flow.reduce(fn -> [] end, fn {address, nodes, _subset}, acc -> + transactions = + with {:ok, last_address} <- get_last_beacon_address(nodes, address), + {:ok, transactions} <- get_beacon_chain(nodes, last_address) do + transactions + |> Stream.filter(&(&1.type == :beacon)) + |> Stream.map(&deserialize_beacon_transaction/1) + |> Enum.to_list() + else + {:error, :network_issue} -> [] end - if Enum.empty?(transaction_attestations) do - acc - else - [%ReplicationAttestation{transaction_summary: tx_summary}] = transaction_attestations - [tx_summary | acc] - end + transactions ++ acc end) |> Enum.to_list() - |> Enum.uniq() |> List.flatten() + |> Enum.uniq_by(& &1.address) |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) end - defp deserialize_beacon_transaction(%Transaction{data: %TransactionData{content: content}}) do + defp get_last_beacon_address([node | rest], address) do + case P2P.send_message(node, %GetLastTransactionAddress{ + address: address, + timestamp: DateTime.utc_now() + }) do + {:ok, %LastTransactionAddress{address: last_address}} -> + {:ok, last_address} + + {:error, _} -> + get_last_beacon_address(rest, address) + end + end + + defp get_last_beacon_address([], _), do: {:error, :network_issue} + + defp get_beacon_chain(nodes, address, opts \\ [], acc \\ []) + + defp get_beacon_chain(nodes = [node | rest], address, opts, acc) do + message = %GetTransactionChain{ + address: address, + paging_state: Keyword.get(opts, :paging_state) + } + + case P2P.send_message(node, message) do + {:ok, %TransactionList{transactions: transactions, more?: false}} -> + {:ok, Enum.uniq_by(acc ++ transactions, & &1.address)} + + {:ok, %TransactionList{transactions: transactions, more?: true, paging_state: paging_state}} -> + get_beacon_chain( + nodes, + address, + [paging_state: paging_state], + Enum.uniq_by(acc ++ transactions, & &1.address) + ) + + {:error, _} -> + get_beacon_chain( + rest, + address, + opts, + acc + ) + end + end + + defp get_beacon_chain([], _, _, _), do: {:error, :network_issue} + + defp deserialize_beacon_transaction(%Transaction{ + type: :beacon, + data: %TransactionData{content: content} + }) do {slot, _} = Slot.deserialize(content) %Slot{transaction_attestations: transaction_attestations} = slot Enum.map(transaction_attestations, & &1.transaction_summary) diff --git a/lib/archethic_web/templates/explorer/beacon_chain_index.html.leex b/lib/archethic_web/templates/explorer/beacon_chain_index.html.leex index 91903c141..bed5a16f2 100644 --- a/lib/archethic_web/templates/explorer/beacon_chain_index.html.leex +++ b/lib/archethic_web/templates/explorer/beacon_chain_index.html.leex @@ -3,7 +3,7 @@
Last Changes From <%= format_date(@update_time) %>