diff --git a/lib/archethic/account/mem_tables/token_ledger.ex b/lib/archethic/account/mem_tables/token_ledger.ex index 803a704e6..e5cbe0608 100644 --- a/lib/archethic/account/mem_tables/token_ledger.ex +++ b/lib/archethic/account/mem_tables/token_ledger.ex @@ -15,12 +15,6 @@ defmodule Archethic.Account.MemTables.TokenLedger do Initialize the Token ledger tables: - Main Token ledger as ETS set ({token, to, from, token_id}, amount, spent?) - Token Unspent Output Index as ETS bag (to, {from, token, token_id}) - - ## Examples - - iex> {:ok, _} = TokenLedger.start_link() - iex> { :ets.info(:archethic_token_ledger)[:type], :ets.info(:archethic_token_unspent_output_index)[:type] } - { :set, :bag } """ def start_link(args \\ []) do GenServer.start_link(__MODULE__, args) diff --git a/lib/archethic/account/mem_tables/uco_ledger.ex b/lib/archethic/account/mem_tables/uco_ledger.ex index ecccf562d..435a1f8ee 100644 --- a/lib/archethic/account/mem_tables/uco_ledger.ex +++ b/lib/archethic/account/mem_tables/uco_ledger.ex @@ -16,11 +16,6 @@ defmodule Archethic.Account.MemTables.UCOLedger do - Main UCO ledger as ETS set ({to, from}, amount, spent?) - UCO Unspent Output Index as ETS bag (to, from) - ## Examples - - iex> {:ok, _} = UCOLedger.start_link() - iex> { :ets.info(:archethic_uco_ledger)[:type], :ets.info(:archethic_uco_unspent_output_index)[:type] } - { :set, :bag } """ def start_link(args \\ []) do GenServer.start_link(__MODULE__, args) diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index 9a057c2eb..5684887e9 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -228,12 +228,13 @@ defmodule Archethic.BeaconChain do @doc """ Return a list of beacon summaries from a list of transaction addresses """ - @spec get_beacon_summaries(list(binary)) :: Enumerable.t() | list(Summary.t()) + @spec get_beacon_summaries(list(binary)) :: list(Summary.t()) def get_beacon_summaries(addresses) do addresses |> Stream.map(&get_summary/1) |> Stream.reject(&match?({:error, :not_found}, &1)) |> Stream.map(fn {:ok, summary} -> summary end) + |> Enum.to_list() end @doc """ @@ -342,15 +343,28 @@ defmodule Archethic.BeaconChain do defp fetch_summaries(node, addresses) do addresses |> Stream.chunk_every(10) - |> Stream.flat_map(fn addresses -> - case P2P.send_message(node, %GetBeaconSummaries{addresses: addresses}) do - {:ok, %BeaconSummaryList{summaries: summaries}} -> - summaries - - _ -> - [] - end - end) + |> Stream.flat_map(&batch_summaries_fetching(&1, node)) |> Enum.to_list() end + + defp batch_summaries_fetching(addresses, node) do + %{summaries: local_summaries, remaining: remaining} = + Enum.reduce(addresses, %{summaries: [], remaining: []}, fn addr, acc -> + case get_summary(addr) do + {:ok, summary} -> + Map.update!(acc, :summaries, &[summary | &1]) + + _ -> + Map.update!(acc, :remaining, &[addr | &1]) + end + end) + + case P2P.send_message(node, %GetBeaconSummaries{addresses: remaining}) do + {:ok, %BeaconSummaryList{summaries: fetched_summaries}} -> + local_summaries ++ fetched_summaries + + _ -> + local_summaries + end + end end diff --git a/lib/archethic/beacon_chain/slot/end_of_node_sync.ex b/lib/archethic/beacon_chain/slot/end_of_node_sync.ex index 8c5d4dd01..758613fc2 100644 --- a/lib/archethic/beacon_chain/slot/end_of_node_sync.ex +++ b/lib/archethic/beacon_chain/slot/end_of_node_sync.ex @@ -74,8 +74,8 @@ defmodule Archethic.BeaconChain.Slot.EndOfNodeSync do } end - @spec from_map(map()) :: t() - def from_map(%{public_key: public_key, timestamp: timestamp}) do + @spec cast(map()) :: t() + def cast(%{public_key: public_key, timestamp: timestamp}) do %__MODULE__{ public_key: public_key, timestamp: timestamp diff --git a/lib/archethic/db/embedded_impl/chain_reader.ex b/lib/archethic/db/embedded_impl/chain_reader.ex index 17b459ca1..8362469dd 100644 --- a/lib/archethic/db/embedded_impl/chain_reader.ex +++ b/lib/archethic/db/embedded_impl/chain_reader.ex @@ -36,7 +36,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do Encoding.decode(version, column, data, acc) end) |> Utils.atomize_keys() - |> Transaction.from_map() + |> Transaction.cast() :file.close(fd) @@ -153,7 +153,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do Encoding.decode(version, column, data, acc) end) |> Utils.atomize_keys() - |> Transaction.from_map() + |> Transaction.cast() if tx.address == limit_address do {Enum.reverse([tx | acc]), false, nil} diff --git a/lib/archethic/mining.ex b/lib/archethic/mining.ex index 59e3f77eb..cf7bdc5cd 100644 --- a/lib/archethic/mining.ex +++ b/lib/archethic/mining.ex @@ -169,6 +169,31 @@ defmodule Archethic.Mining do |> DistributedWorkflow.add_cross_validation_stamp(stamp) end + @doc """ + Confirm the replication from a storage node + """ + @spec confirm_replication( + address :: binary(), + signature :: binary(), + node_public_key :: Crypto.key() + ) :: + :ok + def confirm_replication(tx_address, signature, node_public_key) do + pid = get_mining_process!(tx_address) + send(pid, {:ack_replication, signature, node_public_key}) + :ok + end + + @doc """ + Notify replication to the mining process + """ + @spec notify_replication_error(binary(), any()) :: :ok + def notify_replication_error(tx_address, error_reason) do + pid = get_mining_process!(tx_address) + send(pid, {:replication_error, error_reason}) + :ok + end + defp get_mining_process!(tx_address, timeout \\ @mining_timeout) do retry_while with: exponential_backoff(100, 2) |> expiry(timeout) do case Registry.lookup(WorkflowRegistry, tx_address) do diff --git a/lib/archethic/mining/distributed_workflow.ex b/lib/archethic/mining/distributed_workflow.ex index 323729d36..d398ec3de 100644 --- a/lib/archethic/mining/distributed_workflow.ex +++ b/lib/archethic/mining/distributed_workflow.ex @@ -27,20 +27,16 @@ defmodule Archethic.Mining.DistributedWorkflow do alias Archethic.P2P alias Archethic.P2P.Message - alias Archethic.P2P.Message.AcknowledgeStorage alias Archethic.P2P.Message.AddMiningContext alias Archethic.P2P.Message.CrossValidate alias Archethic.P2P.Message.CrossValidationDone - alias Archethic.P2P.Message.Error - alias Archethic.P2P.Message.ValidationError alias Archethic.P2P.Message.ReplicateTransactionChain alias Archethic.P2P.Message.ReplicateTransaction + alias Archethic.P2P.Message.ValidationError alias Archethic.P2P.Node alias Archethic.Replication - alias Archethic.TaskSupervisor - alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations @@ -673,7 +669,7 @@ defmodule Archethic.Mining.DistributedWorkflow do def handle_event( :info, - {:add_ack_storage, node_public_key, signature}, + {:ack_replication, signature, node_public_key}, :replication, data = %{start_time: start_time, context: context = %ValidationContext{transaction: tx}} ) do @@ -739,14 +735,16 @@ defmodule Archethic.Mining.DistributedWorkflow do beacon_storage_nodes = ValidationContext.get_beacon_replication_nodes(context) P2P.broadcast_message(P2P.distinct_nodes([welcome_node | beacon_storage_nodes]), message) + validated_tx = ValidationContext.get_validated_transaction(context) + context |> ValidationContext.get_io_replication_nodes() |> P2P.broadcast_message( - with tx <- ValidationContext.get_validated_transaction(context) do - if Transaction.network_type?(tx.type), - do: %ReplicateTransactionChain{transaction: tx}, - else: %ReplicateTransaction{transaction: tx} - end + if Transaction.network_type?(validated_tx.type), + do: %ReplicateTransactionChain{ + transaction: validated_tx + }, + else: %ReplicateTransaction{transaction: validated_tx} ) :keep_state_and_data @@ -986,8 +984,7 @@ defmodule Archethic.Mining.DistributedWorkflow do defp request_replication( context = %ValidationContext{ - transaction: tx, - previous_transaction: previous_transaction + transaction: tx } ) do storage_nodes = ValidationContext.get_chain_replication_nodes(context) @@ -998,54 +995,13 @@ defmodule Archethic.Mining.DistributedWorkflow do transaction_type: tx.type ) - # Get transaction chain size to calculate timeout - chain_size = - case previous_transaction do - nil -> - 1 - - previous_transaction -> - # Get transaction chain size to calculate timeout - case Archethic.get_transaction_chain_length(previous_transaction.address) do - {:ok, chain_size} -> - chain_size - - _ -> - 1 - end - end - - timeout = Message.get_max_timeout() + Message.get_max_timeout() * chain_size - validated_tx = ValidationContext.get_validated_transaction(context) message = %ReplicateTransactionChain{ - transaction: validated_tx + transaction: validated_tx, + replying_node: Crypto.first_node_public_key() } - me = self() - - Task.Supervisor.async_stream_nolink( - TaskSupervisor, - storage_nodes, - fn node -> - {P2P.send_message(node, message, timeout), node} - end, - ordered: false, - on_timeout: :kill_task, - timeout: timeout + 2000 - ) - |> Stream.filter(&match?({:ok, {{:ok, _}, _}}, &1)) - |> Stream.map(fn {:ok, {{:ok, response}, node}} -> {response, node} end) - |> Stream.each(fn - {%Error{reason: reason}, _node} -> - send(me, {:replication_error, reason}) - - {%AcknowledgeStorage{ - signature: signature - }, %Node{first_public_key: node_public_key}} -> - send(me, {:add_ack_storage, node_public_key, signature}) - end) - |> Stream.run() + P2P.broadcast_message(storage_nodes, message) end end diff --git a/lib/archethic/mining/standalone_workflow.ex b/lib/archethic/mining/standalone_workflow.ex index 0211c6563..17880824e 100644 --- a/lib/archethic/mining/standalone_workflow.ex +++ b/lib/archethic/mining/standalone_workflow.ex @@ -5,7 +5,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do The single node will auto validate the transaction """ - use Task + use GenServer alias Archethic.BeaconChain alias Archethic.BeaconChain.ReplicationAttestation @@ -17,17 +17,14 @@ defmodule Archethic.Mining.StandaloneWorkflow do alias Archethic.Mining.PendingTransactionValidation alias Archethic.Mining.TransactionContext alias Archethic.Mining.ValidationContext + alias Archethic.Mining.WorkflowRegistry alias Archethic.P2P - alias Archethic.P2P.Message.AcknowledgeStorage - alias Archethic.P2P.Message.Error - alias Archethic.P2P.Message.ValidationError alias Archethic.P2P.Message.ReplicateTransaction alias Archethic.P2P.Message.ReplicateTransactionChain + alias Archethic.P2P.Message.ValidationError alias Archethic.P2P.Node - alias Archethic.TaskSupervisor - alias Archethic.TransactionChain alias Archethic.TransactionChain.Transaction alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations @@ -35,13 +32,17 @@ defmodule Archethic.Mining.StandaloneWorkflow do require Logger - def start_link(opts \\ []) do - Task.start_link(__MODULE__, :run, [opts]) + def start_link(arg \\ []) do + GenServer.start_link(__MODULE__, arg) end - def run(opts) do - tx = Keyword.get(opts, :transaction) + def init(arg) do + tx = Keyword.get(arg, :transaction) + Registry.register(WorkflowRegistry, tx.address, []) + {:ok, %{}, {:continue, {:start_mining, tx}}} + end + def handle_continue({:start_mining, tx}, _state) do Logger.info("Start mining", transaction_address: Base.encode16(tx.address), transaction_type: tx.type @@ -97,29 +98,36 @@ defmodule Archethic.Mining.StandaloneWorkflow do false end - ValidationContext.new( - transaction: tx, - welcome_node: current_node, - coordinator_node: current_node, - cross_validation_nodes: [current_node], - chain_storage_nodes: chain_storage_nodes, - beacon_storage_nodes: beacon_storage_nodes, - io_storage_nodes: io_storage_nodes, - validation_time: validation_time, - resolved_addresses: resolved_addresses - ) - |> ValidationContext.set_pending_transaction_validation(valid_pending_transaction?) - |> ValidationContext.put_transaction_context( - prev_tx, - unspent_outputs, - previous_storage_nodes, - chain_storage_nodes_view, - beacon_storage_nodes_view, - io_storage_nodes_view - ) - |> validate() - |> replicate_and_aggregate_confirmations() - |> notify() + validation_context = + ValidationContext.new( + transaction: tx, + welcome_node: current_node, + coordinator_node: current_node, + cross_validation_nodes: [current_node], + chain_storage_nodes: chain_storage_nodes, + beacon_storage_nodes: beacon_storage_nodes, + io_storage_nodes: io_storage_nodes, + validation_time: validation_time, + resolved_addresses: resolved_addresses + ) + |> ValidationContext.set_pending_transaction_validation(valid_pending_transaction?) + |> ValidationContext.put_transaction_context( + prev_tx, + unspent_outputs, + previous_storage_nodes, + chain_storage_nodes_view, + beacon_storage_nodes_view, + io_storage_nodes_view + ) + |> validate() + + start_replication(validation_context) + + {:noreply, + %{ + context: validation_context, + confirmations: [] + }} end defp validate(context = %ValidationContext{}) do @@ -130,7 +138,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do |> ValidationContext.cross_validate() end - defp replicate_and_aggregate_confirmations(context = %ValidationContext{}) do + defp start_replication(context = %ValidationContext{}) do validated_tx = ValidationContext.get_validated_transaction(context) replication_nodes = ValidationContext.get_chain_replication_nodes(context) @@ -141,104 +149,75 @@ defmodule Archethic.Mining.StandaloneWorkflow do transaction_type: validated_tx.type ) - Task.Supervisor.async_stream_nolink( - TaskSupervisor, - replication_nodes, - fn node -> - {P2P.send_message(node, %ReplicateTransactionChain{ - transaction: validated_tx - }), node} - end, - on_timeout: :kill_task, - ordered: false - ) - |> Stream.filter(&match?({:ok, {{:ok, _res}, _node}}, &1)) - |> Stream.map(fn {:ok, {{:ok, res}, node}} -> {res, node} end) - |> Enum.reduce( - %{ - confirmations: [], - context: context, - transaction_summary: TransactionSummary.from_transaction(validated_tx) - }, - &reduce_confirmations/2 - ) + P2P.broadcast_message(replication_nodes, %ReplicateTransactionChain{ + transaction: validated_tx, + replying_node: Crypto.first_node_public_key() + }) end - defp reduce_confirmations( - {%AcknowledgeStorage{ - signature: signature - }, %Node{first_public_key: node_public_key}}, - acc = %{transaction_summary: tx_summary, context: context} - ) do - if Crypto.verify?(signature, TransactionSummary.serialize(tx_summary), node_public_key) do - {:ok, position} = ValidationContext.get_chain_storage_position(context, node_public_key) - Map.update!(acc, :confirmations, &[{position, signature} | &1]) - else - acc - end - end - - defp reduce_confirmations( - {%Error{reason: reason}, _}, - _acc = %{transaction_summary: tx_summary} - ) do + def handle_info( + {:replication_error, reason}, + _state = %{context: %ValidationContext{transaction: %Transaction{address: tx_address}}} + ) do Logger.warning("Invalid transaction #{inspect(reason)}") # notify welcome node - message = %ValidationError{address: tx_summary.address, reason: reason} + message = %ValidationError{address: tx_address, reason: reason} Task.Supervisor.async_nolink(Archethic.TaskSupervisor, fn -> P2P.send_message( Crypto.last_node_public_key(), message ) - - :ok end) - :error + :stop end - defp reduce_confirmations(_, :error), do: :error - - defp notify(:error), do: :skip - - defp notify(%{ - confirmations: [], - transaction_summary: %TransactionSummary{address: tx_address, type: tx_type} - }) do - # notify welcome node - message = %ValidationError{address: tx_address, reason: :network_issue} - - Task.Supervisor.async_nolink(Archethic.TaskSupervisor, fn -> - P2P.send_message( - Crypto.last_node_public_key(), - message - ) - - :ok - end) - - Logger.error("Not confirmations for the transaction", - transaction_address: Base.encode16(tx_address), - transaction_type: tx_type - ) + def handle_info( + {:ack_replication, signature, node_public_key}, + state = %{context: context = %ValidationContext{transaction: tx}} + ) do + with {:ok, node_index} <- + ValidationContext.get_chain_storage_position(context, node_public_key), + validated_tx <- ValidationContext.get_validated_transaction(context), + tx_summary <- TransactionSummary.from_transaction(validated_tx), + true <- + Crypto.verify?(signature, TransactionSummary.serialize(tx_summary), node_public_key) do + new_context = ValidationContext.add_storage_confirmation(context, node_index, signature) + + new_state = %{state | context: new_context} + + if ValidationContext.enough_storage_confirmations?(new_context) do + notify(new_state) + {:noreply, new_state} + else + {:noreply, new_state} + end + else + _reason -> + Logger.warning("Invalid storage ack", + transaction_address: Base.encode16(tx.address), + transaction_type: tx.type, + node: Base.encode16(node_public_key) + ) + + {:noreply, state} + end end - defp notify(%{ - confirmations: confirmations, - transaction_summary: tx_summary, - context: context - }) do - notify_attestation(confirmations, tx_summary, context) + defp notify(%{context: context}) do + notify_attestation(context) notify_io_nodes(context) end defp notify_attestation( - confirmations, - tx_summary, - context = %ValidationContext{} + context = %ValidationContext{ + welcome_node: welcome_node, + storage_nodes_confirmations: confirmations + } ) do - welcome_node = P2P.get_node_info() + validated_tx = ValidationContext.get_validated_transaction(context) + tx_summary = TransactionSummary.from_transaction(validated_tx) attestation = %ReplicationAttestation{ transaction_summary: tx_summary, @@ -272,7 +251,9 @@ defmodule Archethic.Mining.StandaloneWorkflow do end) |> P2P.broadcast_message( if Transaction.network_type?(validated_tx.type), - do: %ReplicateTransactionChain{transaction: validated_tx}, + do: %ReplicateTransactionChain{ + transaction: validated_tx + }, else: %ReplicateTransaction{transaction: validated_tx} ) end diff --git a/lib/archethic/networking/scheduler.ex b/lib/archethic/networking/scheduler.ex index 4f92e76c6..9ebf0c1ca 100644 --- a/lib/archethic/networking/scheduler.ex +++ b/lib/archethic/networking/scheduler.ex @@ -58,27 +58,31 @@ defmodule Archethic.Networking.Scheduler do {p2p_port, web_port} = open_ports() ip = IPLookup.get_node_ip() - %Node{ip: prev_ip, reward_address: reward_address, transport: transport} = P2P.get_node_info() - - if ip != prev_ip do - origin_public_key = Crypto.origin_node_public_key() - key_certificate = Crypto.get_key_certificate(origin_public_key) - - Transaction.new(:node, %TransactionData{ - content: - Node.encode_transaction_content( - ip, - p2p_port, - web_port, - transport, - reward_address, - origin_public_key, - key_certificate - ) - }) - |> Archethic.send_new_transaction() - else - Logger.debug("Same IP - no need to send a new node transaction") + case P2P.get_node_info(Crypto.first_node_public_key()) do + {:ok, %Node{ip: prev_ip, reward_address: reward_address, transport: transport}} + when ip != prev_ip -> + origin_public_key = Crypto.origin_node_public_key() + key_certificate = Crypto.get_key_certificate(origin_public_key) + + Transaction.new(:node, %TransactionData{ + content: + Node.encode_transaction_content( + ip, + p2p_port, + web_port, + transport, + reward_address, + origin_public_key, + key_certificate + ) + }) + |> Archethic.send_new_transaction() + + {:ok, %Node{}} -> + Logger.debug("Skip node update: Same IP - no need to send a new node transaction") + + {:error, :not_found} -> + Logger.debug("Skip node update: Not yet bootstrapped") end end diff --git a/lib/archethic/p2p/listener_protocol.ex b/lib/archethic/p2p/listener_protocol.ex index de55278f2..d6a36b891 100644 --- a/lib/archethic/p2p/listener_protocol.ex +++ b/lib/archethic/p2p/listener_protocol.ex @@ -1,10 +1,10 @@ defmodule Archethic.P2P.ListenerProtocol do @moduledoc false - alias __MODULE__.MessageProducer - require Logger + alias Archethic.TaskSupervisor + @behaviour :ranch_protocol def start_link(ref, transport, opts) do @@ -31,7 +31,30 @@ defmodule Archethic.P2P.ListenerProtocol do state = %{transport: transport} ) do :inet.setopts(socket, active: :once) - MessageProducer.new_message({socket, transport, msg}) + + %Archethic.P2P.MessageEnvelop{ + message_id: message_id, + message: message, + sender_public_key: sender_public_key + } = Archethic.P2P.MessageEnvelop.decode(msg) + + Archethic.P2P.MemTable.increase_node_availability(sender_public_key) + Archethic.P2P.Client.set_connected(sender_public_key) + + Task.Supervisor.start_child(TaskSupervisor, fn -> + response = Archethic.P2P.Message.process(message) + + encoded_response = + %Archethic.P2P.MessageEnvelop{ + message: response, + message_id: message_id, + sender_public_key: Archethic.Crypto.first_node_public_key() + } + |> Archethic.P2P.MessageEnvelop.encode(sender_public_key) + + transport.send(socket, encoded_response) + end) + {:noreply, state} end diff --git a/lib/archethic/p2p/listener_protocol/broadway_pipeline.ex b/lib/archethic/p2p/listener_protocol/broadway_pipeline.ex deleted file mode 100644 index e6502ef4d..000000000 --- a/lib/archethic/p2p/listener_protocol/broadway_pipeline.ex +++ /dev/null @@ -1,86 +0,0 @@ -defmodule Archethic.P2P.ListenerProtocol.BroadwayPipeline do - @moduledoc false - - alias Archethic.Crypto - - alias Archethic.P2P.ListenerProtocol.MessageProducer - alias Archethic.P2P.MemTable - alias Archethic.P2P.Message - alias Archethic.P2P.MessageEnvelop - alias Archethic.P2P.Client - alias Archethic.TaskSupervisor - - alias Broadway.Message, as: BroadwayMessage - - require Logger - - use Broadway - - def start_link(arg \\ []) do - Broadway.start_link(__MODULE__, - name: __MODULE__, - producer: [ - module: {MessageProducer, arg}, - transformer: {__MODULE__, :transform, []}, - concurrency: 1 - ], - processors: [ - default: [concurrency: System.schedulers_online() * 10, max_demand: 1] - ] - ) - end - - def transform(event, _) do - %BroadwayMessage{ - data: event, - acknowledger: {Broadway.NoopAcknowledger, _ack_ref = nil, _ack_data = nil} - } - end - - def handle_message(_, message, _context) do - # start_time = System.monotonic_time(:millisecond) - - BroadwayMessage.update_data(message, fn {socket, transport, data} -> - Task.Supervisor.async_nolink(TaskSupervisor, fn -> - message = - data - |> decode() - |> process() - |> encode() - - transport.send(socket, message) - end) - - # end_time = System.monotnonic_time(:millisecond) - # Logger.debug("Request processed in #{end_time - start_time} ms") - end) - end - - defp decode(data) do - %MessageEnvelop{ - message_id: message_id, - message: message, - sender_public_key: sender_public_key - } = MessageEnvelop.decode(data) - - MemTable.increase_node_availability(sender_public_key) - Client.set_connected(sender_public_key) - {System.monotonic_time(:millisecond), message_id, message, sender_public_key} - end - - defp process({_start_time, message_id, message, sender_public_key}) do - response = Message.process(message) - # end_time = System.monotonic_time(:millisecond) - - {message_id, response, sender_public_key} - end - - defp encode({message_id, message, sender_public_key}) do - %MessageEnvelop{ - message: message, - message_id: message_id, - sender_public_key: Crypto.first_node_public_key() - } - |> MessageEnvelop.encode(sender_public_key) - end -end diff --git a/lib/archethic/p2p/listener_protocol/message_producer.ex b/lib/archethic/p2p/listener_protocol/message_producer.ex deleted file mode 100644 index b55eb1278..000000000 --- a/lib/archethic/p2p/listener_protocol/message_producer.ex +++ /dev/null @@ -1,45 +0,0 @@ -defmodule Archethic.P2P.ListenerProtocol.MessageProducer do - @moduledoc false - use GenStage - - alias Archethic.P2P.ListenerProtocol.BroadwayPipeline.Broadway.Producer_0, as: Producer - - def start_link(arg, opts \\ [name: __MODULE__]) do - GenStage.start_link(__MODULE__, arg, opts) - end - - def new_message(pid \\ Producer, message) do - GenStage.cast(pid, {:new_message, message}) - end - - def init(_arg) do - {:producer, %{demand: 0, queue: :queue.new()}} - end - - def handle_cast( - {:new_message, {socket, transport, message}}, - state = %{queue: queue, demand: pending_demand} - ) do - queue = :queue.in({socket, transport, message}, queue) - - dispatch_events(queue, pending_demand, [], state) - end - - def handle_demand(incoming_demand, state = %{queue: queue, demand: pending_demand}) do - dispatch_events(queue, incoming_demand + pending_demand, [], state) - end - - defp dispatch_events(queue, 0, events, state) do - {:noreply, Enum.reverse(events), %{state | queue: queue, demand: 0}} - end - - defp dispatch_events(queue, demand, events, state) do - case :queue.out(queue) do - {{:value, event}, queue} -> - dispatch_events(queue, demand - 1, [event | events], state) - - {:empty, queue} -> - {:noreply, Enum.reverse(events), %{state | queue: queue, demand: demand}} - end - end -end diff --git a/lib/archethic/p2p/listener_supervisor.ex b/lib/archethic/p2p/listener_supervisor.ex index 9dfdcbf4c..e5a221b3e 100644 --- a/lib/archethic/p2p/listener_supervisor.ex +++ b/lib/archethic/p2p/listener_supervisor.ex @@ -3,7 +3,6 @@ defmodule Archethic.P2P.ListenerSupervisor do alias Archethic.P2P.BootstrappingSeeds alias Archethic.P2P.Listener - alias Archethic.P2P.ListenerProtocol.BroadwayPipeline alias Archethic.Utils @@ -21,7 +20,6 @@ defmodule Archethic.P2P.ListenerSupervisor do bootstraping_seeds_conf = Application.get_env(:archethic, BootstrappingSeeds) optional_children = [ - BroadwayPipeline, {Listener, Keyword.put(listener_conf, :port, port)}, {BootstrappingSeeds, bootstraping_seeds_conf} ] diff --git a/lib/archethic/p2p/message.ex b/lib/archethic/p2p/message.ex index d0e9cfa06..4777c8cc3 100644 --- a/lib/archethic/p2p/message.ex +++ b/lib/archethic/p2p/message.ex @@ -59,6 +59,7 @@ defmodule Archethic.P2P.Message do alias __MODULE__.RegisterBeaconUpdates alias __MODULE__.ReplicateTransaction alias __MODULE__.ReplicateTransactionChain + alias __MODULE__.ReplicationError alias __MODULE__.StartMining alias __MODULE__.TransactionChainLength alias __MODULE__.TransactionInputList @@ -80,6 +81,8 @@ defmodule Archethic.P2P.Message do alias Archethic.TransactionChain.TransactionInput alias Archethic.TransactionChain.TransactionSummary + alias Archethic.TaskSupervisor + alias Archethic.Utils alias Archethic.Utils.VarInt @@ -143,6 +146,7 @@ defmodule Archethic.P2P.Message do | Summary.t() | BeaconSummaryList.t() | FirstAddress.t() + | ReplicationError.t() @floor_upload_speed Application.compile_env!(:archethic, [__MODULE__, :floor_upload_speed]) @content_max_size Application.compile_env!(:archethic, :transaction_data_content_max_size) @@ -161,22 +165,30 @@ defmodule Archethic.P2P.Message do Return timeout depending of message type """ @spec get_timeout(__MODULE__.t()) :: non_neg_integer() - def get_timeout(message) do - full_size_message = [ - GetTransaction, - GetLastTransaction, - NewTransaction, - StartMining, - ReplicateTransaction - ] - - if message.__struct__ in full_size_message do - get_max_timeout() - else - 3_000 - end + def get_timeout(%GetTransaction{}), do: get_max_timeout() + def get_timeout(%GetLastTransaction{}), do: get_max_timeout() + def get_timeout(%NewTransaction{}), do: get_max_timeout() + def get_timeout(%StartMining{}), do: get_max_timeout() + def get_timeout(%ReplicateTransaction{}), do: get_max_timeout() + def get_timeout(%ReplicateTransactionChain{}), do: get_max_timeout() + + def get_timeout(%GetTransactionChain{}) do + # As we use 10 transaction in the pagination we can estimate the max time + get_max_timeout() * 10 end + # def get_timeout(%GetBeaconSummaries{addresses: addresses}) do + # # We can expect high beacon summary where a transaction replication will contains a single UCO transfer + # # CALC: Tx address + recipient address + tx type + tx timestamp + storage node public key + signature * 200 (max storage nodes) + # beacon_summary_high_estimation_bytes = 34 + 34 + 1 + 8 + (8 + 34 + 34 * 200) + # length(addresses) * trunc(beacon_summary_high_estimation_bytes / @floor_upload_speed * 1000) + # end + + def get_timeout(%GetUnspentOutputs{}), do: get_max_timeout() + def get_timeout(%GetTransactionInputs{}), do: get_max_timeout() + + def get_timeout(_), do: 3_000 + @doc """ Return the maximum timeout for a full sized transaction """ @@ -299,8 +311,12 @@ defmodule Archethic.P2P.Message do <<10::8, address::binary, CrossValidationStamp.serialize(stamp)::bitstring>> end - def encode(%ReplicateTransactionChain{transaction: tx}) do - <<11::8, Transaction.serialize(tx)::bitstring>> + def encode(%ReplicateTransactionChain{transaction: tx, replying_node: nil}) do + <<11::8, Transaction.serialize(tx)::bitstring, 0::1>> + end + + def encode(%ReplicateTransactionChain{transaction: tx, replying_node: replying_node_public_key}) do + <<11::8, Transaction.serialize(tx)::bitstring, 1::1, replying_node_public_key::binary>> end def encode(%ReplicateTransaction{transaction: tx}) do @@ -308,9 +324,12 @@ defmodule Archethic.P2P.Message do end def encode(%AcknowledgeStorage{ - signature: signature + address: address, + signature: signature, + node_public_key: node_public_key }) do - <<13::8, byte_size(signature)::8, signature::binary>> + <<13::8, address::binary, node_public_key::binary, byte_size(signature)::8, + signature::binary>> end def encode(%NotifyEndOfNodeSync{node_public_key: public_key, timestamp: timestamp}) do @@ -389,11 +408,13 @@ defmodule Archethic.P2P.Message do <<31::8, address::binary>> end - def encode(%ValidationError{reason: reason, address: nil}), - do: <<234::8, Error.serialize_reason(reason)::8>> + def encode(%ReplicationError{address: address, reason: reason}) do + <<233::8, address::binary, ReplicationError.serialize_reason(reason)::8>> + end - def encode(%ValidationError{reason: reason, address: address}), - do: <<234::8, 1::8, Error.serialize_reason(reason)::8, address::binary>> + def encode(%ValidationError{reason: reason, address: address}) do + <<234::8, Error.serialize_reason(reason)::8, address::binary>> + end def encode(%FirstAddress{address: address}) do <<235::8, address::binary>> @@ -715,11 +736,20 @@ defmodule Archethic.P2P.Message do end def decode(<<11::8, rest::bitstring>>) do - {tx, <>} = Transaction.deserialize(rest) + {tx, <>} = Transaction.deserialize(rest) - {%ReplicateTransactionChain{ - transaction: tx - }, rest} + if replying_node == 1 do + {node_public_key, rest} = Utils.deserialize_public_key(rest) + + {%ReplicateTransactionChain{ + transaction: tx, + replying_node: node_public_key + }, rest} + else + {%ReplicateTransactionChain{ + transaction: tx + }, rest} + end end def decode(<<12::8, rest::bitstring>>) do @@ -730,11 +760,16 @@ defmodule Archethic.P2P.Message do }, rest} end - def decode( - <<13::8, signature_size::8, signature::binary-size(signature_size), rest::bitstring>> - ) do + def decode(<<13::8, rest::bitstring>>) do + {address, rest} = Utils.deserialize_address(rest) + + {public_key, <>} = + Utils.deserialize_public_key(rest) + {%AcknowledgeStorage{ - signature: signature + address: address, + signature: signature, + node_public_key: public_key }, rest} end @@ -862,14 +897,22 @@ defmodule Archethic.P2P.Message do {%GetFirstAddress{address: address}, rest} end - def decode(<<234::8, 1::8, reason::8, rest::bitstring>>) do - reason = ValidationError.deserialize_reason(reason) - {address, rest} = Utils.deserialize_address(rest) - {%ValidationError{reason: reason, address: address}, rest} + def decode(<<233::8, rest::bitstring>>) do + {address, <>} = Utils.deserialize_address(rest) + + { + %ReplicationError{ + address: address, + reason: ReplicationError.deserialize_reason(reason) + }, + rest + } end def decode(<<234::8, reason::8, rest::bitstring>>) do - {%ValidationError{reason: ValidationError.deserialize_reason(reason), address: nil}, rest} + reason = ValidationError.deserialize_reason(reason) + {address, rest} = Utils.deserialize_address(rest) + {%ValidationError{reason: reason, address: address}, rest} end def decode(<<235::8, rest::bitstring>>) do @@ -1238,21 +1281,36 @@ defmodule Archethic.P2P.Message do %Ok{} end - def process(%ReplicateTransactionChain{transaction: tx}) do - case Replication.validate_and_store_transaction_chain(tx) do - :ok -> - tx_summary = TransactionSummary.from_transaction(tx) - - %AcknowledgeStorage{ - signature: Crypto.sign_with_first_node_key(TransactionSummary.serialize(tx_summary)) - } - - {:error, :transaction_already_exists} -> - %Error{reason: :transaction_already_exists} + def process(%ReplicateTransactionChain{ + transaction: tx, + replying_node: replying_node_public_key + }) do + Task.Supervisor.start_child(TaskSupervisor, fn -> + response = + case Replication.validate_and_store_transaction_chain(tx) do + :ok -> + tx_summary = TransactionSummary.from_transaction(tx) + + %AcknowledgeStorage{ + address: tx.address, + signature: + Crypto.sign_with_first_node_key(TransactionSummary.serialize(tx_summary)), + node_public_key: Crypto.first_node_public_key() + } + + {:error, :transaction_already_exists} -> + %ReplicationError{address: tx.address, reason: :transaction_already_exists} + + {:error, :invalid_transaction} -> + %ReplicationError{address: tx.address, reason: :invalid_transaction} + end + + if replying_node_public_key do + P2P.send_message(replying_node_public_key, response) + end + end) - {:error, :invalid_transaction} -> - %Error{reason: :invalid_transaction} - end + %Ok{} end def process(%ReplicateTransaction{transaction: tx}) do @@ -1268,6 +1326,23 @@ defmodule Archethic.P2P.Message do end end + def process(%AcknowledgeStorage{ + address: address, + signature: signature, + node_public_key: node_public_key + }) do + Mining.confirm_replication(address, signature, node_public_key) + %Ok{} + end + + def process(%ReplicationError{ + address: address, + reason: reason + }) do + Mining.notify_replication_error(address, reason) + %Ok{} + end + def process(%CrossValidate{ address: tx_address, validation_stamp: stamp, diff --git a/lib/archethic/p2p/message/acknowledge_storage.ex b/lib/archethic/p2p/message/acknowledge_storage.ex index 830949a7d..01cd6b1d7 100644 --- a/lib/archethic/p2p/message/acknowledge_storage.ex +++ b/lib/archethic/p2p/message/acknowledge_storage.ex @@ -4,10 +4,14 @@ defmodule Archethic.P2P.Message.AcknowledgeStorage do This message is used during the transaction replication """ - @enforce_keys [:signature] - defstruct [:signature] + @enforce_keys [:address, :signature, :node_public_key] + defstruct [:address, :signature, :node_public_key] + + alias Archethic.Crypto @type t :: %__MODULE__{ - signature: binary() + address: binary(), + signature: binary(), + node_public_key: Crypto.key() } end diff --git a/lib/archethic/p2p/message/replicate_transaction_chain.ex b/lib/archethic/p2p/message/replicate_transaction_chain.ex index 79ddc1951..4cf510d92 100644 --- a/lib/archethic/p2p/message/replicate_transaction_chain.ex +++ b/lib/archethic/p2p/message/replicate_transaction_chain.ex @@ -3,11 +3,13 @@ defmodule Archethic.P2P.Message.ReplicateTransactionChain do Represents a message to initiate the replication of the transaction chain related to the given transaction """ @enforce_keys [:transaction] - defstruct [:transaction] + defstruct [:transaction, :replying_node] + alias Archethic.Crypto alias Archethic.TransactionChain.Transaction @type t :: %__MODULE__{ - transaction: Transaction.t() + transaction: Transaction.t(), + replying_node: nil | Crypto.key() } end diff --git a/lib/archethic/p2p/message/replication_error.ex b/lib/archethic/p2p/message/replication_error.ex new file mode 100644 index 000000000..210a37738 --- /dev/null +++ b/lib/archethic/p2p/message/replication_error.ex @@ -0,0 +1,30 @@ +defmodule Archethic.P2P.Message.ReplicationError do + @moduledoc """ + Represents a replication error message + """ + + @enforce_keys [:address, :reason] + defstruct [:address, :reason] + + @type reason :: + :invalid_transaction | :transaction_already_exists + + @type t :: %__MODULE__{ + address: binary(), + reason: reason() + } + + @doc """ + Serialize an error reason + """ + @spec serialize_reason(reason()) :: non_neg_integer() + def serialize_reason(:invalid_transaction), do: 1 + def serialize_reason(:transaction_already_exists), do: 2 + + @doc """ + Deserialize an error reason + """ + @spec deserialize_reason(non_neg_integer()) :: reason() + def deserialize_reason(1), do: :invalid_transaction + def deserialize_reason(2), do: :transaction_already_exists +end diff --git a/lib/archethic/replication.ex b/lib/archethic/replication.ex index c2a8acf75..aefc901db 100644 --- a/lib/archethic/replication.ex +++ b/lib/archethic/replication.ex @@ -52,20 +52,15 @@ defmodule Archethic.Replication do It will download the transaction chain and unspents to validate the new transaction and store the new transaction chain and update the internal ledger and views """ - @spec validate_and_store_transaction_chain( - validated_tx :: Transaction.t(), - options :: [self_repair?: boolean()] - ) :: + @spec validate_and_store_transaction_chain(validated_tx :: Transaction.t()) :: :ok | {:error, :invalid_transaction} | {:error, :transaction_already_exists} def validate_and_store_transaction_chain( tx = %Transaction{ address: address, type: type, validation_stamp: %ValidationStamp{timestamp: timestamp} - }, - opts \\ [] - ) - when is_list(opts) do + } + ) do if TransactionChain.transaction_exists?(address) do Logger.warning("Transaction already exists", transaction_address: Base.encode16(address), @@ -95,16 +90,9 @@ defmodule Archethic.Replication do Enum.to_list(inputs) ) do :ok -> - # Stream the insertion of the chain - tx - |> stream_previous_chain() - |> Stream.reject(&Enum.empty?/1) - |> Stream.each(&TransactionChain.write/1) - |> Stream.run() - - :ok = TransactionChain.write_transaction(tx) + # Store the previous and the new one, as being the reference for next transactions + :ok = TransactionChain.write(Enum.filter([previous_tx, tx], & &1)) :ok = ingest_transaction(tx) - PubSub.notify_new_transaction(address, type, timestamp) Logger.info("Replication finished", @@ -120,6 +108,18 @@ defmodule Archethic.Replication do %{role: :chain} ) + if previous_tx do + # Load the rest of the chain asynchronously + Task.start(fn -> + # Stream the insertion of the chain + previous_tx + |> stream_previous_chain() + |> Stream.reject(&Enum.empty?/1) + |> Stream.each(&TransactionChain.write/1) + |> Stream.run() + end) + end + :ok {:error, reason} -> diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 77bbc9e1d..d7ec8bb55 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -179,7 +179,7 @@ defmodule Archethic.SelfRepair.Sync do ) |> Stream.filter(&match?({:ok, _}, &1)) |> Stream.each(fn {:ok, tx} -> - TransactionHandler.process_transaction(tx) + :ok = TransactionHandler.process_transaction(tx) end) |> Stream.run() end diff --git a/lib/archethic/self_repair/sync/transaction_handler.ex b/lib/archethic/self_repair/sync/transaction_handler.ex index e896a875f..bae275c2f 100644 --- a/lib/archethic/self_repair/sync/transaction_handler.ex +++ b/lib/archethic/self_repair/sync/transaction_handler.ex @@ -100,7 +100,7 @@ defmodule Archethic.SelfRepair.Sync.TransactionHandler do cond do Election.chain_storage_node?(address, type, Crypto.first_node_public_key(), node_list) -> - Replication.validate_and_store_transaction_chain(tx, self_repair?: true) + Replication.validate_and_store_transaction_chain(tx) Election.io_storage_node?(tx, Crypto.first_node_public_key(), node_list) -> Replication.validate_and_store_transaction(tx) diff --git a/lib/archethic/transaction_chain/transaction.ex b/lib/archethic/transaction_chain/transaction.ex index fd4fdae01..d1c8a6d94 100755 --- a/lib/archethic/transaction_chain/transaction.ex +++ b/lib/archethic/transaction_chain/transaction.ex @@ -823,8 +823,8 @@ defmodule Archethic.TransactionChain.Transaction do } end - @spec from_map(map()) :: t() - def from_map(tx = %{}) do + @spec cast(map()) :: t() + def cast(tx = %{}) do type = case Map.get(tx, :type) do nil -> @@ -841,14 +841,14 @@ defmodule Archethic.TransactionChain.Transaction do version: Map.get(tx, :version), address: Map.get(tx, :address), type: type, - data: Map.get(tx, :data, %TransactionData{}) |> TransactionData.from_map(), + data: Map.get(tx, :data, %TransactionData{}) |> TransactionData.cast(), previous_public_key: Map.get(tx, :previous_public_key), previous_signature: Map.get(tx, :previous_signature), origin_signature: Map.get(tx, :origin_signature), - validation_stamp: Map.get(tx, :validation_stamp) |> ValidationStamp.from_map(), + validation_stamp: Map.get(tx, :validation_stamp) |> ValidationStamp.cast(), cross_validation_stamps: (Map.get(tx, :cross_validation_stamps) || []) - |> Enum.map(&CrossValidationStamp.from_map/1) + |> Enum.map(&CrossValidationStamp.cast/1) } end end diff --git a/lib/archethic/transaction_chain/transaction/cross_validation_stamp.ex b/lib/archethic/transaction_chain/transaction/cross_validation_stamp.ex index 15f6793de..ea9349617 100644 --- a/lib/archethic/transaction_chain/transaction/cross_validation_stamp.ex +++ b/lib/archethic/transaction_chain/transaction/cross_validation_stamp.ex @@ -196,8 +196,8 @@ defmodule Archethic.TransactionChain.Transaction.CrossValidationStamp do defp do_reduce_inconsistencies(<<8::8, rest::bitstring>>), do: {:node_movements, rest} defp do_reduce_inconsistencies(<<9::8, rest::bitstring>>), do: {:errors, rest} - @spec from_map(map()) :: t() - def from_map(stamp = %{}) do + @spec cast(map()) :: t() + def cast(stamp = %{}) do %__MODULE__{ node_public_key: Map.get(stamp, :node_public_key), signature: Map.get(stamp, :signature), diff --git a/lib/archethic/transaction_chain/transaction/data.ex b/lib/archethic/transaction_chain/transaction/data.ex index 7705f457f..373de15c0 100755 --- a/lib/archethic/transaction_chain/transaction/data.ex +++ b/lib/archethic/transaction_chain/transaction/data.ex @@ -189,13 +189,13 @@ defmodule Archethic.TransactionChain.TransactionData do reduce_recipients(rest, nb_recipients, [recipient_address | acc]) end - @spec from_map(map()) :: t() - def from_map(data = %{}) do + @spec cast(map()) :: t() + def cast(data = %{}) do %__MODULE__{ content: Map.get(data, :content, ""), code: Map.get(data, :code, ""), - ledger: Map.get(data, :ledger, %Ledger{}) |> Ledger.from_map(), - ownerships: Map.get(data, :ownerships, []) |> Enum.map(&Ownership.from_map/1), + ledger: Map.get(data, :ledger, %Ledger{}) |> Ledger.cast(), + ownerships: Map.get(data, :ownerships, []) |> Enum.map(&Ownership.cast/1), recipients: Map.get(data, :recipients, []) } end diff --git a/lib/archethic/transaction_chain/transaction/data/ledger.ex b/lib/archethic/transaction_chain/transaction/data/ledger.ex index 0dd7836fb..f569f47a0 100755 --- a/lib/archethic/transaction_chain/transaction/data/ledger.ex +++ b/lib/archethic/transaction_chain/transaction/data/ledger.ex @@ -124,11 +124,11 @@ defmodule Archethic.TransactionChain.TransactionData.Ledger do } end - @spec from_map(map()) :: t() - def from_map(ledger = %{}) do + @spec cast(map()) :: t() + def cast(ledger = %{}) do %__MODULE__{ - uco: Map.get(ledger, :uco, %UCOLedger{}) |> UCOLedger.from_map(), - token: Map.get(ledger, :token, %TokenLedger{}) |> TokenLedger.from_map() + uco: Map.get(ledger, :uco, %UCOLedger{}) |> UCOLedger.cast(), + token: Map.get(ledger, :token, %TokenLedger{}) |> TokenLedger.cast() } end diff --git a/lib/archethic/transaction_chain/transaction/data/ledger/token.ex b/lib/archethic/transaction_chain/transaction/data/ledger/token.ex index 1ee71ce71..ade0711d1 100755 --- a/lib/archethic/transaction_chain/transaction/data/ledger/token.ex +++ b/lib/archethic/transaction_chain/transaction/data/ledger/token.ex @@ -107,10 +107,10 @@ defmodule Archethic.TransactionChain.TransactionData.TokenLedger do do_reduce_transfers(rest, nb_transfers, [transfer | acc]) end - @spec from_map(map()) :: t() - def from_map(token_ledger = %{}) do + @spec cast(map()) :: t() + def cast(token_ledger = %{}) do %__MODULE__{ - transfers: Map.get(token_ledger, :transfers, []) |> Enum.map(&Transfer.from_map/1) + transfers: Map.get(token_ledger, :transfers, []) |> Enum.map(&Transfer.cast/1) } end diff --git a/lib/archethic/transaction_chain/transaction/data/ledger/token/transfer.ex b/lib/archethic/transaction_chain/transaction/data/ledger/token/transfer.ex index 85ddd57db..49876bedf 100644 --- a/lib/archethic/transaction_chain/transaction/data/ledger/token/transfer.ex +++ b/lib/archethic/transaction_chain/transaction/data/ledger/token/transfer.ex @@ -108,7 +108,7 @@ defmodule Archethic.TransactionChain.TransactionData.TokenLedger.Transfer do ...> amount: 1_050_000_000, ...> token_id: 0 ...> } - ...> |> Transfer.from_map() + ...> |> Transfer.cast() %Transfer{ token: <<0, 0, 49, 101, 72, 154, 152, 3, 174, 47, 2, 35, 7, 92, 122, 206, 185, 71, 140, 74, 197, 46, 99, 117, 89, 96, 100, 20, 0, 34, 181, 215, 143, 175>>, @@ -118,8 +118,8 @@ defmodule Archethic.TransactionChain.TransactionData.TokenLedger.Transfer do token_id: 0 } """ - @spec from_map(map()) :: t() - def from_map(transfer = %{}) do + @spec cast(map()) :: t() + def cast(transfer = %{}) do %__MODULE__{ token: Map.get(transfer, :token), to: Map.get(transfer, :to), diff --git a/lib/archethic/transaction_chain/transaction/data/ledger/uco.ex b/lib/archethic/transaction_chain/transaction/data/ledger/uco.ex index 1373004c7..efb08bc51 100755 --- a/lib/archethic/transaction_chain/transaction/data/ledger/uco.ex +++ b/lib/archethic/transaction_chain/transaction/data/ledger/uco.ex @@ -96,10 +96,10 @@ defmodule Archethic.TransactionChain.TransactionData.UCOLedger do do_reduce_transfers(rest, nb_transfers, [transfer | acc]) end - @spec from_map(map()) :: t() - def from_map(uco_ledger = %{}) do + @spec cast(map()) :: t() + def cast(uco_ledger = %{}) do %__MODULE__{ - transfers: Map.get(uco_ledger, :transfers, []) |> Enum.map(&Transfer.from_map/1) + transfers: Map.get(uco_ledger, :transfers, []) |> Enum.map(&Transfer.cast/1) } end diff --git a/lib/archethic/transaction_chain/transaction/data/ledger/uco/transfer.ex b/lib/archethic/transaction_chain/transaction/data/ledger/uco/transfer.ex index c427b9561..82106c0cf 100755 --- a/lib/archethic/transaction_chain/transaction/data/ledger/uco/transfer.ex +++ b/lib/archethic/transaction_chain/transaction/data/ledger/uco/transfer.ex @@ -70,8 +70,8 @@ defmodule Archethic.TransactionChain.TransactionData.UCOLedger.Transfer do } end - @spec from_map(map()) :: t() - def from_map(transfer = %{}) do + @spec cast(map()) :: t() + def cast(transfer = %{}) do %__MODULE__{ to: Map.get(transfer, :to), amount: Map.get(transfer, :amount) diff --git a/lib/archethic/transaction_chain/transaction/data/ownership.ex b/lib/archethic/transaction_chain/transaction/data/ownership.ex index a1e582c97..d50e50ef4 100644 --- a/lib/archethic/transaction_chain/transaction/data/ownership.ex +++ b/lib/archethic/transaction_chain/transaction/data/ownership.ex @@ -167,8 +167,8 @@ defmodule Archethic.TransactionChain.TransactionData.Ownership do ) end - @spec from_map(map()) :: t() - def from_map(ownership = %{}) do + @spec cast(map()) :: t() + def cast(ownership = %{}) do %__MODULE__{ secret: Map.get(ownership, :secret, <<>>), authorized_keys: Map.get(ownership, :authorized_keys, %{}) diff --git a/lib/archethic/transaction_chain/transaction/validation_stamp.ex b/lib/archethic/transaction_chain/transaction/validation_stamp.ex index ae7b6b48e..b27ed285a 100755 --- a/lib/archethic/transaction_chain/transaction/validation_stamp.ex +++ b/lib/archethic/transaction_chain/transaction/validation_stamp.ex @@ -272,22 +272,22 @@ defmodule Archethic.TransactionChain.Transaction.ValidationStamp do } end - @spec from_map(map()) :: __MODULE__.t() - def from_map(stamp = %{}) do + @spec cast(map()) :: __MODULE__.t() + def cast(stamp = %{}) do %__MODULE__{ timestamp: Map.get(stamp, :timestamp), proof_of_work: Map.get(stamp, :proof_of_work), proof_of_integrity: Map.get(stamp, :proof_of_integrity), proof_of_election: Map.get(stamp, :proof_of_election), ledger_operations: - Map.get(stamp, :ledger_operations, %LedgerOperations{}) |> LedgerOperations.from_map(), + Map.get(stamp, :ledger_operations, %LedgerOperations{}) |> LedgerOperations.cast(), recipients: Map.get(stamp, :recipients, []), signature: Map.get(stamp, :signature), errors: Map.get(stamp, :errors, []) } end - def from_map(nil), do: nil + def cast(nil), do: nil @spec to_map(__MODULE__.t()) :: map() def to_map(%__MODULE__{ diff --git a/lib/archethic/transaction_chain/transaction/validation_stamp/ledger_operations.ex b/lib/archethic/transaction_chain/transaction/validation_stamp/ledger_operations.ex index fc35047cc..f2927b5c2 100644 --- a/lib/archethic/transaction_chain/transaction/validation_stamp/ledger_operations.ex +++ b/lib/archethic/transaction_chain/transaction/validation_stamp/ledger_operations.ex @@ -568,15 +568,15 @@ defmodule Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperation reduce_unspent_outputs(rest, nb, [unspent_output | acc]) end - @spec from_map(map()) :: t() - def from_map(ledger_ops = %{}) do + @spec cast(map()) :: t() + def cast(ledger_ops = %{}) do %__MODULE__{ transaction_movements: Map.get(ledger_ops, :transaction_movements, []) - |> Enum.map(&TransactionMovement.from_map/1), + |> Enum.map(&TransactionMovement.cast/1), unspent_outputs: Map.get(ledger_ops, :unspent_outputs, []) - |> Enum.map(&UnspentOutput.from_map/1), + |> Enum.map(&UnspentOutput.cast/1), fee: Map.get(ledger_ops, :fee) } end diff --git a/lib/archethic/transaction_chain/transaction/validation_stamp/ledger_operations/transaction_movement.ex b/lib/archethic/transaction_chain/transaction/validation_stamp/ledger_operations/transaction_movement.ex index a4df34978..f4342980d 100644 --- a/lib/archethic/transaction_chain/transaction/validation_stamp/ledger_operations/transaction_movement.ex +++ b/lib/archethic/transaction_chain/transaction/validation_stamp/ledger_operations/transaction_movement.ex @@ -129,9 +129,9 @@ defmodule Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperation iex> %{ ...> to: <<0, 0, 214, 107, 17, 107, 227, 11, 17, 43, 204, 48, 78, 129, 145, 126, 45, 68, 194, 159, 19, 92, 240, 29, 37, 105, 183, 232, 56, 42, 163, 236, 251, 186>>, ...> amount: 30_000_000, - ...> type: "UCO" + ...> type: :UCO ...> } - ...> |> TransactionMovement.from_map() + ...> |> TransactionMovement.cast() %TransactionMovement{ to: <<0, 0, 214, 107, 17, 107, 227, 11, 17, 43, 204, 48, 78, 129, 145, 126, 45, 68, 194, 159, 19, 92, 240, 29, 37, 105, 183, 232, 56, 42, 163, 236, 251, 186>>, amount: 30_000_000, @@ -142,11 +142,10 @@ defmodule Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperation ...> to: <<0, 0, 214, 107, 17, 107, 227, 11, 17, 43, 204, 48, 78, 129, 145, 126, 45, 68, ...> 194,159, 19, 92, 240, 29, 37, 105, 183, 232, 56, 42, 163, 236, 251, 186>>, ...> amount: 30_000_000, - ...> type: "Token", token_address: <<0, 0, 49, 101, 72, 154, 152, 3, 174, 47, 2, 35, 7, 92, - ...> 122, 206, 185, 71, 140, 74, 197, 46, 99, 117, 89, 96, 100, 20, 0, 34, 181, 215, - ...> 143, 175>>, token_id: 0 + ...> type: {:token, <<0, 0, 49, 101, 72, 154, 152, 3, 174, 47, 2, 35, 7, 92, 122, 206, 185, 71, + ...> 140, 74, 197, 46, 99, 117, 89, 96, 100, 20, 0, 34, 181, 215, 143, 175>>, 0} ...> } - ...> |> TransactionMovement.from_map() + ...> |> TransactionMovement.cast() %TransactionMovement{ to: <<0, 0, 214, 107, 17, 107, 227, 11, 17, 43, 204, 48, 78, 129, 145, 126, 45, 68, 194, 159, 19, 92, 240, 29, 37, 105, 183, 232, 56, 42, 163, 236, 251, 186>>, @@ -156,20 +155,13 @@ defmodule Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperation } """ - @spec from_map(map()) :: t() - def from_map(movement = %{}) do - res = %__MODULE__{ + @spec cast(map()) :: t() + def cast(movement = %{}) do + %__MODULE__{ to: Map.get(movement, :to), - amount: Map.get(movement, :amount) + amount: Map.get(movement, :amount), + type: Map.get(movement, :type) } - - case Map.get(movement, :type) do - "Token" -> - %{res | type: {:token, Map.get(movement, :token_address), Map.get(movement, :token_id)}} - - _ -> - %{res | type: :UCO} - end end @doc """ diff --git a/lib/archethic/transaction_chain/transaction/validation_stamp/ledger_operations/unspent_output.ex b/lib/archethic/transaction_chain/transaction/validation_stamp/ledger_operations/unspent_output.ex index e44a70573..d7b5d9e2e 100644 --- a/lib/archethic/transaction_chain/transaction/validation_stamp/ledger_operations/unspent_output.ex +++ b/lib/archethic/transaction_chain/transaction/validation_stamp/ledger_operations/unspent_output.ex @@ -131,7 +131,7 @@ defmodule Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperation ...> 159, 19, 92, 240, 29, 37, 105, 183, 232, 56, 42, 163, 236, 251, 186>>, ...> amount: 1_050_000_000, ...> type: :UCO - ...> } |> UnspentOutput.from_map() + ...> } |> UnspentOutput.cast() %UnspentOutput{ from: <<0, 0, 214, 107, 17, 107, 227, 11, 17, 43, 204, 48, 78, 129, 145, 126, 45, 68, 194,159, 19, 92, 240, 29, 37, 105, 183, 232, 56, 42, 163, 236, 251, 186>>, amount: 1_050_000_000, @@ -147,7 +147,7 @@ defmodule Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperation ...> amount: 1_050_000_000, ...> type: {:token, <<0, 49, 101, 72, 154, 152, 3, 174, 47, 2, 35, 7, 92, 122, 206, 185, 71, 140, 74, ...> 197, 46, 99, 117, 89, 96, 100, 20, 0, 34, 181, 215, 143, 175>>, 0} - ...> } |> UnspentOutput.from_map() + ...> } |> UnspentOutput.cast() %UnspentOutput{ from: <<0, 0, 214, 107, 17, 107, 227, 11, 17, 43, 204, 48, 78, 129, 145, 126, 45, 68, 194,159, 19, 92, 240, 29, 37, 105, 183, 232, 56, 42, 163, 236, 251, 186>>, amount: 1_050_000_000, @@ -156,8 +156,8 @@ defmodule Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperation timestamp: nil } """ - @spec from_map(map()) :: __MODULE__.t() - def from_map(unspent_output = %{}) do + @spec cast(map()) :: __MODULE__.t() + def cast(unspent_output = %{}) do %__MODULE__{ from: Map.get(unspent_output, :from), amount: Map.get(unspent_output, :amount), diff --git a/lib/archethic/transaction_chain/transaction_input.ex b/lib/archethic/transaction_chain/transaction_input.ex index 4ed8e2aea..a0b19ae5a 100644 --- a/lib/archethic/transaction_chain/transaction_input.ex +++ b/lib/archethic/transaction_chain/transaction_input.ex @@ -141,8 +141,8 @@ defmodule Archethic.TransactionChain.TransactionInput do end end - @spec from_map(map()) :: __MODULE__.t() - def from_map(input = %{}) do + @spec cast(map()) :: __MODULE__.t() + def cast(input = %{}) do res = %__MODULE__{ amount: Map.get(input, :amount), from: Map.get(input, :from), diff --git a/lib/archethic/transaction_chain/transaction_summary.ex b/lib/archethic/transaction_chain/transaction_summary.ex index b418fe190..9178d5060 100644 --- a/lib/archethic/transaction_chain/transaction_summary.ex +++ b/lib/archethic/transaction_chain/transaction_summary.ex @@ -155,8 +155,8 @@ defmodule Archethic.TransactionChain.TransactionSummary do } end - @spec from_map(map()) :: t() - def from_map(%{ + @spec cast(map()) :: t() + def cast(%{ address: address, timestamp: timestamp, type: type, diff --git a/lib/archethic_web/controllers/api/transaction_controller.ex b/lib/archethic_web/controllers/api/transaction_controller.ex index b1db69e4f..48f8d8b09 100644 --- a/lib/archethic_web/controllers/api/transaction_controller.ex +++ b/lib/archethic_web/controllers/api/transaction_controller.ex @@ -23,7 +23,7 @@ defmodule ArchethicWeb.API.TransactionController do tx = changeset |> TransactionPayload.to_map() - |> Transaction.from_map() + |> Transaction.cast() case Archethic.send_new_transaction(tx) do :ok -> @@ -99,7 +99,7 @@ defmodule ArchethicWeb.API.TransactionController do fee = changeset |> TransactionPayload.to_map() - |> Transaction.from_map() + |> Transaction.cast() |> Mining.get_transaction_fee(uco_usd) conn diff --git a/mix.exs b/mix.exs index 841cd9c1c..5db319d6e 100644 --- a/mix.exs +++ b/mix.exs @@ -66,7 +66,7 @@ defmodule Archethic.MixProject do {:credo, "~> 1.5", only: [:dev, :test], runtime: false}, {:elixir_make, "~> 0.6.0", runtime: false}, {:dialyxir, "~> 1.0", runtime: false}, - # {:broadway_dashboard, "~> 0.2.0", only: :dev}, + {:logger_file_backend, "~> 0.0.13", only: :dev}, # Test {:mox, "~> 0.5.2", only: [:test]}, @@ -99,11 +99,9 @@ defmodule Archethic.MixProject do {:retry, "~> 0.14.1"}, {:gen_stage, "~> 1.1"}, {:flow, "~> 1.0"}, - {:broadway, "~> 1.0"}, {:knigge, "~> 1.4"}, {:ex_json_schema, "~> 0.9.1", override: true}, - {:pathex, "~> 2.0"}, - {:logger_file_backend, "~> 0.0.13"} + {:pathex, "~> 2.0"} ] end diff --git a/mix.lock b/mix.lock index c49ade581..df4b3c8b0 100644 --- a/mix.lock +++ b/mix.lock @@ -37,6 +37,7 @@ "inet_cidr": {:hex, :erl_cidr, "1.2.0", "9205ffb290c0de8d2b82147976602fbf5bfa6d594834e60556afaf3b82856b95", [:rebar3], [], "hexpm", "3505f5dfac7d862806c7051a3dd475363a45bccf39ca1faee8eda6a6b33cf335"}, "inet_ext": {:hex, :inet_ext, "1.0.0", "40a82557082827a2dc403ee7007bb389869f347465fb9d25d0abf0769b247c34", [:rebar3], [{:inet_cidr, "~>1.0.2", [hex: :erl_cidr, repo: "hexpm", optional: false]}], "hexpm", "62a8aad524b798de3e1617e2ccfad212cb4cce971574f1adc260441ed99a250c"}, "jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"}, + "jobs": {:hex, :jobs, "0.10.0", "329c024a889249ea8262d0ef70bc7a498b7c96da95977ebd026b13b069fb0e13", [:rebar3], [{:parse_trans, "3.4.0", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:setup, "2.1.0", [hex: :setup, repo: "hexpm", optional: false]}], "hexpm", "09d486146bd32897726bb1f63dd95d1eb7f6f5a4c21c38ca58ab1646e8229724"}, "knigge": {:hex, :knigge, "1.4.1", "8258067fc0a1b73730c9136757b6fc8848c19cebae3a9d29212c2683a3b0fa77", [:mix], [{:bunt, "~> 0.2", [hex: :bunt, repo: "hexpm", optional: false]}], "hexpm", "55cbff4648eac4d3a9068e248d27028a66db32a51fcc227da82ca16a60947e10"}, "logger_file_backend": {:hex, :logger_file_backend, "0.0.13", "df07b14970e9ac1f57362985d76e6f24e3e1ab05c248055b7d223976881977c2", [:mix], [], "hexpm", "71a453a7e6e899ae4549fb147b1c6621f4233f8f48f58ca10a64ec67b6c50018"}, "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, @@ -49,6 +50,7 @@ "nimble_options": {:hex, :nimble_options, "0.3.7", "1e52dd7673d36138b1a5dede183b5d86dff175dc46d104a8e98e396b85b04670", [:mix], [], "hexpm", "2086907e6665c6b6579be54ef5001928df5231f355f71ed258f80a55e9f63633"}, "nimble_parsec": {:hex, :nimble_parsec, "1.2.0", "b44d75e2a6542dcb6acf5d71c32c74ca88960421b6874777f79153bbbbd7dccc", [:mix], [], "hexpm", "52b2871a7515a5ac49b00f214e4165a40724cf99798d8e4a65e4fd64ebd002c1"}, "observer_cli": {:hex, :observer_cli, "1.7.1", "c9ca1f623a3ef0158283a3c37cd7b7235bfe85927ad6e26396dd247e2057f5a1", [:mix, :rebar3], [{:recon, "~>2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "4ccafaaa2ce01b85ddd14591f4d5f6731b4e13b610a70fb841f0701178478280"}, + "parse_trans": {:hex, :parse_trans, "3.4.0", "bb87ac362a03ca674ebb7d9d498f45c03256aded7214c9101f7035ef44b798c7", [:rebar3], [], "hexpm", "f99e368830bea44552224e37e04943a54874f08b8590485de8d13832b63a2dc3"}, "pathex": {:hex, :pathex, "2.0.0", "b129359b22bf2005d5ed6bf41beb7aed64ca83d8510a0624a596a3fa23d34a1c", [:mix], [], "hexpm", "ffe038484ad8313fd91bec3ed51d0f821a4ba3eb6f2a1b14d9ab2685c5c3c876"}, "phoenix": {:hex, :phoenix, "1.5.13", "d4e0805ec0973bed80d67302631130fb47d75b1a0b7335a0b23c4432b6ce55ee", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 2.13 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.1.2 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1a7c4f1900e6e60bb60ae6680e48418e3f7c360d58bcb9f812487b6d0d281a0f"}, "phoenix_html": {:hex, :phoenix_html, "2.14.3", "51f720d0d543e4e157ff06b65de38e13303d5778a7919bcc696599e5934271b8", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "efd697a7fff35a13eeeb6b43db884705cba353a1a41d127d118fda5f90c8e80f"}, @@ -63,6 +65,7 @@ "recase": {:hex, :recase, "0.6.0", "1dd2dd2f4e06603b74977630e739f08b7fedbb9420cc14de353666c2fc8b99f4", [:mix], [], "hexpm", "8712e318420a228eb2e6366ada230148ed3a4316a798319edd5512f64d78c990"}, "recon": {:hex, :recon, "2.5.2", "cba53fa8db83ad968c9a652e09c3ed7ddcc4da434f27c3eaa9ca47ffb2b1ff03", [:mix, :rebar3], [], "hexpm", "2c7523c8dee91dff41f6b3d63cba2bd49eb6d2fe5bf1eec0df7f87eb5e230e1c"}, "retry": {:hex, :retry, "0.14.1", "722d1b0cf87096b71213f5801d99fface7ca76adc83fc9dbf3e1daee952aef10", [:mix], [], "hexpm", "b3a609f286f6fe4f6b2c15f32cd4a8a60427d78d05d7b68c2dd9110981111ae0"}, + "setup": {:hex, :setup, "2.1.0", "05f69185a5eb71474c9bc6ba892565651ec7507791f85632b7b914dbfe130510", [:rebar3], [], "hexpm", "efd072578f0cf85bea96caaffc7adb0992398272522660a136e10567377071c5"}, "sizeable": {:hex, :sizeable, "1.0.2", "625fe06a5dad188b52121a140286f1a6ae1adf350a942cf419499ecd8a11ee29", [:mix], [], "hexpm", "4bab548e6dfba777b400ca50830a9e3a4128e73df77ab1582540cf5860601762"}, "stream_data": {:hex, :stream_data, "0.5.0", "b27641e58941685c75b353577dc602c9d2c12292dd84babf506c2033cd97893e", [:mix], [], "hexpm", "012bd2eec069ada4db3411f9115ccafa38540a3c78c4c0349f151fc761b9e271"}, "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, diff --git a/test/archethic/beacon_chain/subset_test.exs b/test/archethic/beacon_chain/subset_test.exs index 17f7acc53..6131c98b1 100644 --- a/test/archethic/beacon_chain/subset_test.exs +++ b/test/archethic/beacon_chain/subset_test.exs @@ -415,7 +415,7 @@ defmodule Archethic.BeaconChain.SubsetTest do ) MockClient - |> expect(:send_message, fn + |> stub(:send_message, fn _, %BeaconUpdate{transaction_attestations: transaction_attestations}, _ -> send(me, {:transaction_attestations, transaction_attestations}) {:ok, %Ok{}} @@ -425,6 +425,9 @@ defmodule Archethic.BeaconChain.SubsetTest do _, %NewBeaconTransaction{}, _ -> {:ok, %Ok{}} + + _, %Ping{}, _ -> + {:ok, %Ok{}} end) Subset.subscribe_for_beacon_updates(subset, first_public_key) diff --git a/test/archethic/bootstrap/network_init_test.exs b/test/archethic/bootstrap/network_init_test.exs index 223fb0c01..bc2af2a59 100644 --- a/test/archethic/bootstrap/network_init_test.exs +++ b/test/archethic/bootstrap/network_init_test.exs @@ -181,7 +181,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do me = self() MockDB - |> stub(:write_transaction, fn ^tx -> + |> stub(:write_transaction_chain, fn [^tx] -> send(me, :write_transaction) :ok end) @@ -230,7 +230,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do me = self() MockDB - |> expect(:write_transaction, fn tx -> + |> expect(:write_transaction_chain, fn [tx] -> send(me, {:transaction, tx}) :ok end) @@ -382,7 +382,7 @@ defmodule Archethic.Bootstrap.NetworkInitTest do me = self() MockDB - |> expect(:write_transaction, fn tx -> + |> expect(:write_transaction_chain, fn [tx] -> send(me, {:transaction, tx}) :ok end) diff --git a/test/archethic/mining/distributed_workflow_test.exs b/test/archethic/mining/distributed_workflow_test.exs index 500306169..3a25eb97f 100644 --- a/test/archethic/mining/distributed_workflow_test.exs +++ b/test/archethic/mining/distributed_workflow_test.exs @@ -16,7 +16,6 @@ defmodule Archethic.Mining.DistributedWorkflowTest do alias Archethic.Mining.ValidationContext alias Archethic.P2P - alias Archethic.P2P.Message.AcknowledgeStorage alias Archethic.P2P.Message.AddMiningContext alias Archethic.P2P.Message.CrossValidate alias Archethic.P2P.Message.CrossValidationDone @@ -789,38 +788,43 @@ defmodule Archethic.Mining.DistributedWorkflowTest do {other_validator_pub, other_validator_pv} = Crypto.generate_deterministic_keypair("seed") - sig = + {sig, pub} = cond do first_public_key == Crypto.first_node_public_key() -> - Crypto.sign_with_first_node_key(TransactionSummary.serialize(tx_summary)) + { + Crypto.sign_with_first_node_key(TransactionSummary.serialize(tx_summary)), + Crypto.first_node_public_key() + } first_public_key == elem(storage_node_keypair, 0) -> - Crypto.sign( - TransactionSummary.serialize(tx_summary), - elem(storage_node_keypair, 1) - ) + {Crypto.sign( + TransactionSummary.serialize(tx_summary), + elem(storage_node_keypair, 1) + ), elem(storage_node_keypair, 0)} first_public_key == elem(storage_node_keypair2, 0) -> - Crypto.sign( - TransactionSummary.serialize(tx_summary), - elem(storage_node_keypair2, 1) - ) + { + Crypto.sign( + TransactionSummary.serialize(tx_summary), + elem(storage_node_keypair2, 1) + ), + elem(storage_node_keypair2, 0) + } first_public_key == other_validator_pub -> - Crypto.sign( - TransactionSummary.serialize(tx_summary), - other_validator_pv - ) + {Crypto.sign( + TransactionSummary.serialize(tx_summary), + other_validator_pv + ), other_validator_pub} end - {:ok, - %AcknowledgeStorage{ - signature: sig - }} + send(me, {:ack_replication, sig, pub}) + + {:ok, %Ok{}} _, %ReplicationAttestation{}, _ -> send(me, :replication_done) - %Ok{} + {:ok, %Ok{}} end) {:ok, coordinator_pid} = @@ -914,6 +918,26 @@ defmodule Archethic.Mining.DistributedWorkflowTest do Workflow.add_cross_validation_stamp(coordinator_pid, stamp) end + receive do + {:ack_replication, sig, pub} -> + send(coordinator_pid, {:ack_replication, sig, pub}) + end + + receive do + {:ack_replication, sig, pub} -> + send(cross_validator_pid, {:ack_replication, sig, pub}) + end + + receive do + {:ack_replication, sig, pub} -> + send(cross_validator_pid, {:ack_replication, sig, pub}) + end + + receive do + {:ack_replication, sig, pub} -> + send(coordinator_pid, {:ack_replication, sig, pub}) + end + assert_receive :replication_done end end diff --git a/test/archethic/mining/standalone_workflow_test.exs b/test/archethic/mining/standalone_workflow_test.exs index 5a96bacd0..55ed06d86 100644 --- a/test/archethic/mining/standalone_workflow_test.exs +++ b/test/archethic/mining/standalone_workflow_test.exs @@ -8,7 +8,6 @@ defmodule Archethic.Mining.StandaloneWorkflowTest do alias Archethic.Mining.StandaloneWorkflow alias Archethic.P2P - alias Archethic.P2P.Message.AcknowledgeStorage alias Archethic.P2P.Message.GetTransaction alias Archethic.P2P.Message.GetUnspentOutputs alias Archethic.P2P.Message.NotFound @@ -61,12 +60,11 @@ defmodule Archethic.Mining.StandaloneWorkflowTest do _, %ReplicateTransactionChain{transaction: tx}, _ -> tx_summary = TransactionSummary.from_transaction(tx) - sig = Crypto.sign_with_last_node_key(TransactionSummary.serialize(tx_summary)) + sig = Crypto.sign_with_first_node_key(TransactionSummary.serialize(tx_summary)) - {:ok, - %AcknowledgeStorage{ - signature: sig - }} + send(me, {:ack_replication, sig, Crypto.first_node_public_key()}) + + {:ok, %Ok{}} _, %ReplicationAttestation{}, _ -> send(me, :transaction_replicated) @@ -74,7 +72,12 @@ defmodule Archethic.Mining.StandaloneWorkflowTest do end) tx = Transaction.new(:transfer, %TransactionData{}, "seed", 0) - assert :ok = StandaloneWorkflow.run(transaction: tx) + assert {:ok, pid} = StandaloneWorkflow.start_link(transaction: tx) + + receive do + {:ack_replication, sig, public_key} -> + send(pid, {:ack_replication, sig, public_key}) + end assert_receive :transaction_replicated end diff --git a/test/archethic/p2p/messages_test.exs b/test/archethic/p2p/messages_test.exs index a518dc4c4..d6b4d3d0d 100644 --- a/test/archethic/p2p/messages_test.exs +++ b/test/archethic/p2p/messages_test.exs @@ -381,12 +381,18 @@ defmodule Archethic.P2P.MessageTest do test "AcknowledgeStorage message" do signature = :crypto.strong_rand_bytes(32) + tx_address = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> + node_public_key = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> assert %AcknowledgeStorage{ - signature: signature + signature: signature, + address: tx_address, + node_public_key: node_public_key } == %AcknowledgeStorage{ - signature: signature + signature: signature, + address: tx_address, + node_public_key: node_public_key } |> Message.encode() |> Message.decode() diff --git a/test/archethic/replication_test.exs b/test/archethic/replication_test.exs index 3b7437ecd..f73487662 100644 --- a/test/archethic/replication_test.exs +++ b/test/archethic/replication_test.exs @@ -70,7 +70,7 @@ defmodule Archethic.ReplicationTest do tx = create_valid_transaction(unspent_outputs) MockDB - |> expect(:write_transaction, fn ^tx -> + |> expect(:write_transaction_chain, fn [^tx] -> send(me, :replicated) :ok end) @@ -106,7 +106,7 @@ defmodule Archethic.ReplicationTest do {:ok, %NotFound{}} end) - assert :ok = Replication.validate_and_store_transaction_chain(tx, []) + assert :ok = Replication.validate_and_store_transaction_chain(tx) Process.sleep(200) diff --git a/test/archethic/self_repair/sync/transaction_handler_test.exs b/test/archethic/self_repair/sync/transaction_handler_test.exs index 777231e93..8cb5a228e 100644 --- a/test/archethic/self_repair/sync/transaction_handler_test.exs +++ b/test/archethic/self_repair/sync/transaction_handler_test.exs @@ -160,7 +160,7 @@ defmodule Archethic.SelfRepair.Sync.TransactionHandlerTest do end) MockDB - |> stub(:write_transaction, fn ^tx -> + |> stub(:write_transaction_chain, fn [^tx] -> send(me, :transaction_replicated) :ok end) diff --git a/test/archethic/self_repair/sync_test.exs b/test/archethic/self_repair/sync_test.exs index 25bee655a..ac4a088bf 100644 --- a/test/archethic/self_repair/sync_test.exs +++ b/test/archethic/self_repair/sync_test.exs @@ -169,7 +169,7 @@ defmodule Archethic.SelfRepair.SyncTest do me = self() MockDB - |> stub(:write_transaction, fn ^tx -> + |> stub(:write_transaction_chain, fn [^tx] -> send(me, :storage) :ok end) @@ -309,7 +309,7 @@ defmodule Archethic.SelfRepair.SyncTest do me = self() MockDB - |> stub(:write_transaction, fn ^transfer_tx -> + |> stub(:write_transaction_chain, fn [^transfer_tx] -> send(me, :transaction_stored) :ok end)