Skip to content

Commit

Permalink
handle rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
tenmoves committed Jan 4, 2023
1 parent ea01aaa commit d8f58d1
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 59 deletions.
58 changes: 0 additions & 58 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ defmodule Archethic.BeaconChain do
alias Archethic.P2P.Message.GetBeaconSummariesAggregate
alias Archethic.P2P.Message.BeaconSummaryList
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Message.GetCurrentSummaries
alias Archethic.P2P.Message.TransactionSummaryList

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain.TransactionSummary
Expand Down Expand Up @@ -416,59 +413,4 @@ defmodule Archethic.BeaconChain do
e
end
end

@doc """
Fetch transaction from current slots for a given date
Slots which are already has been added
Real time transaction can be get from pubsub
"""
@spec list_transactions_from_current_slots(DateTime.t()) :: list(TransactionSummaryList.t())
def list_transactions_from_current_slots(date = %DateTime{} \\ DateTime.utc_now()) do
authorized_nodes = P2P.authorized_and_available_nodes()

ref_time = DateTime.truncate(date, :millisecond)

next_summary_date = next_summary_date(ref_time)

list_subsets()
|> Flow.from_enumerable(stages: 256)
|> Flow.flat_map(fn subset ->
# Foreach subset and date we compute concurrently the node election
subset
|> Election.beacon_storage_nodes(next_summary_date, authorized_nodes)
|> Enum.filter(&Node.locally_available?/1)
|> P2P.nearest_nodes()
|> Enum.take(3)
|> Enum.map(&{&1, subset})
end)
# We partition by node
|> Flow.partition(key: {:elem, 0})
|> Flow.reduce(fn -> %{} end, fn {node, subset}, acc ->
# We aggregate the subsets for a given node
Map.update(acc, node, [subset], &[subset | &1])
end)
|> Flow.flat_map(fn {node, subsets} ->
# For this node we fetch the summaries
fetch_summaries_for_list_transactions(node, subsets)
end)
|> Stream.uniq_by(& &1.address)
|> Enum.sort_by(& &1.timestamp, {:desc, DateTime})
end

defp fetch_summaries_for_list_transactions(node, subsets) do
subsets
|> Stream.chunk_every(10)
|> Task.async_stream(fn subsets ->
case P2P.send_message(node, %GetCurrentSummaries{subsets: subsets}) do
{:ok, %TransactionSummaryList{transaction_summaries: transaction_summaries}} ->
transaction_summaries

_ ->
[]
end
end)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(&elem(&1, 1))
|> Enum.to_list()
end
end
2 changes: 1 addition & 1 deletion lib/archethic_web/graphql_schema/resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ defmodule ArchethicWeb.GraphQLSchema.Resolver do

if DateTime.compare(next_datetime_summary_time, next_current_date_summary_time) == :eq do
datetime
|> BeaconChain.list_transactions_from_current_slots()
|> Archethic.list_transactions_summaries_from_current_slot()
|> create_empty_beacon_summary_aggregate(next_current_date_summary_time)
else
{
Expand Down

0 comments on commit d8f58d1

Please sign in to comment.