From d8f58d10386ae64d877cd43877bdfc0470f08d8c Mon Sep 17 00:00:00 2001 From: tenmoves Date: Wed, 4 Jan 2023 17:22:49 +0100 Subject: [PATCH] handle rebase --- lib/archethic/beacon_chain.ex | 58 -------------------- lib/archethic_web/graphql_schema/resolver.ex | 2 +- 2 files changed, 1 insertion(+), 59 deletions(-) diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index fba1f6c4ba..f1e8fbf0a6 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -26,9 +26,6 @@ defmodule Archethic.BeaconChain do alias Archethic.P2P.Message.GetBeaconSummariesAggregate alias Archethic.P2P.Message.BeaconSummaryList alias Archethic.P2P.Message.NotFound - alias Archethic.P2P.Message.GetCurrentSummaries - alias Archethic.P2P.Message.TransactionSummaryList - alias Archethic.TaskSupervisor alias Archethic.TransactionChain.TransactionSummary @@ -416,59 +413,4 @@ defmodule Archethic.BeaconChain do e end end - - @doc """ - Fetch transaction from current slots for a given date - Slots which are already has been added - Real time transaction can be get from pubsub - """ - @spec list_transactions_from_current_slots(DateTime.t()) :: list(TransactionSummaryList.t()) - def list_transactions_from_current_slots(date = %DateTime{} \\ DateTime.utc_now()) do - authorized_nodes = P2P.authorized_and_available_nodes() - - ref_time = DateTime.truncate(date, :millisecond) - - next_summary_date = next_summary_date(ref_time) - - list_subsets() - |> Flow.from_enumerable(stages: 256) - |> Flow.flat_map(fn subset -> - # Foreach subset and date we compute concurrently the node election - subset - |> Election.beacon_storage_nodes(next_summary_date, authorized_nodes) - |> Enum.filter(&Node.locally_available?/1) - |> P2P.nearest_nodes() - |> Enum.take(3) - |> Enum.map(&{&1, subset}) - end) - # We partition by node - |> Flow.partition(key: {:elem, 0}) - |> Flow.reduce(fn -> %{} end, fn {node, subset}, acc -> - # We aggregate the subsets for a given node - Map.update(acc, node, [subset], &[subset | &1]) - end) - |> Flow.flat_map(fn {node, subsets} -> - # For this node we fetch the summaries - fetch_summaries_for_list_transactions(node, subsets) - end) - |> Stream.uniq_by(& &1.address) - |> Enum.sort_by(& &1.timestamp, {:desc, DateTime}) - end - - defp fetch_summaries_for_list_transactions(node, subsets) do - subsets - |> Stream.chunk_every(10) - |> Task.async_stream(fn subsets -> - case P2P.send_message(node, %GetCurrentSummaries{subsets: subsets}) do - {:ok, %TransactionSummaryList{transaction_summaries: transaction_summaries}} -> - transaction_summaries - - _ -> - [] - end - end) - |> Stream.filter(&match?({:ok, _}, &1)) - |> Stream.flat_map(&elem(&1, 1)) - |> Enum.to_list() - end end diff --git a/lib/archethic_web/graphql_schema/resolver.ex b/lib/archethic_web/graphql_schema/resolver.ex index 24d0f0bd7c..68f1c7b5cf 100644 --- a/lib/archethic_web/graphql_schema/resolver.ex +++ b/lib/archethic_web/graphql_schema/resolver.ex @@ -239,7 +239,7 @@ defmodule ArchethicWeb.GraphQLSchema.Resolver do if DateTime.compare(next_datetime_summary_time, next_current_date_summary_time) == :eq do datetime - |> BeaconChain.list_transactions_from_current_slots() + |> Archethic.list_transactions_summaries_from_current_slot() |> create_empty_beacon_summary_aggregate(next_current_date_summary_time) else {