Skip to content

Commit

Permalink
Integrate atomic reads into beacon live explorer
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel committed Jun 9, 2022
1 parent ef15bf6 commit 19a71ec
Showing 1 changed file with 17 additions and 50 deletions.
67 changes: 17 additions & 50 deletions lib/archethic_web/live/chains/beacon_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@ defmodule ArchethicWeb.BeaconChainLive do

alias Archethic.P2P
alias Archethic.P2P.Message.GetLastTransactionAddress
alias Archethic.P2P.Message.GetTransactionChain
alias Archethic.P2P.Message.LastTransactionAddress
alias Archethic.P2P.Message.TransactionList

alias Archethic.P2P.Node

alias Archethic.PubSub

alias Archethic.SelfRepair.Sync.BeaconSummaryHandler

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionData

Expand Down Expand Up @@ -288,13 +287,15 @@ defmodule ArchethicWeb.BeaconChainLive do
|> Flow.partition(key: {:elem, 2})
|> Flow.reduce(fn -> [] end, fn {address, nodes, _subset}, acc ->
transactions =
with {:ok, last_address} <- get_last_beacon_address(nodes, address),
{:ok, transactions} <- get_beacon_chain(nodes, last_address) do
transactions
|> Stream.filter(&(&1.type == :beacon))
|> Stream.map(&deserialize_beacon_transaction/1)
|> Enum.to_list()
else
case get_last_beacon_address(nodes, address) do
{:ok, last_address} ->
last_address
|> TransactionChain.stream_remotely(nodes)
|> Stream.flat_map(& &1)
|> Stream.filter(&(&1.type == :beacon))
|> Stream.map(&deserialize_beacon_transaction/1)
|> Enum.to_list()

{:error, :network_issue} ->
[]
end
Expand All @@ -307,53 +308,19 @@ defmodule ArchethicWeb.BeaconChainLive do
|> Enum.sort_by(& &1.timestamp, {:desc, DateTime})
end

defp get_last_beacon_address([node | rest], address) do
case P2P.send_message(node, %GetLastTransactionAddress{
address: address,
timestamp: DateTime.utc_now()
}) do
defp get_last_beacon_address(nodes, address) do
case P2P.atomic_read(
nodes,
%GetLastTransactionAddress{address: address, timestamp: DateTime.utc_now()}
) do
{:ok, %LastTransactionAddress{address: last_address}} ->
{:ok, last_address}

{:error, _} ->
get_last_beacon_address(rest, address)
end
end

defp get_last_beacon_address([], _), do: {:error, :network_issue}

defp get_beacon_chain(nodes, address, opts \\ [], acc \\ [])

defp get_beacon_chain(nodes = [node | rest], address, opts, acc) do
message = %GetTransactionChain{
address: address,
paging_state: Keyword.get(opts, :paging_state)
}

case P2P.send_message(node, message) do
{:ok, %TransactionList{transactions: transactions, more?: false}} ->
{:ok, Enum.uniq_by(acc ++ transactions, & &1.address)}

{:ok, %TransactionList{transactions: transactions, more?: true, paging_state: paging_state}} ->
get_beacon_chain(
nodes,
address,
[paging_state: paging_state],
Enum.uniq_by(acc ++ transactions, & &1.address)
)

{:error, _} ->
get_beacon_chain(
rest,
address,
opts,
acc
)
{:error, :network_issue} ->
{:error, :network_issue}
end
end

defp get_beacon_chain([], _, _, _), do: {:error, :network_issue}

defp deserialize_beacon_transaction(%Transaction{
type: :beacon,
data: %TransactionData{content: content}
Expand Down

0 comments on commit 19a71ec

Please sign in to comment.