Skip to content

Commit

Permalink
Replicate transaction chain asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel committed Aug 5, 2022
1 parent 948a677 commit 53845dd
Show file tree
Hide file tree
Showing 17 changed files with 386 additions and 280 deletions.
25 changes: 25 additions & 0 deletions lib/archethic/mining.ex
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,31 @@ defmodule Archethic.Mining do
|> DistributedWorkflow.add_cross_validation_stamp(stamp)
end

@doc """
Confirm the replication from a storage node
"""
@spec confirm_replication(
address :: binary(),
signature :: binary(),
node_public_key :: Crypto.key()
) ::
:ok
def confirm_replication(tx_address, signature, node_public_key) do
pid = get_mining_process!(tx_address)
send(pid, {:ack_replication, signature, node_public_key})
:ok
end

@doc """
Notify replication to the mining process
"""
@spec notify_replication_error(binary(), any()) :: :ok
def notify_replication_error(tx_address, error_reason) do
pid = get_mining_process!(tx_address)
send(pid, {:replication_error, error_reason})
:ok
end

defp get_mining_process!(tx_address, timeout \\ @mining_timeout) do
retry_while with: exponential_backoff(100, 2) |> expiry(timeout) do
case Registry.lookup(WorkflowRegistry, tx_address) do
Expand Down
70 changes: 13 additions & 57 deletions lib/archethic/mining/distributed_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,16 @@ defmodule Archethic.Mining.DistributedWorkflow do

alias Archethic.P2P
alias Archethic.P2P.Message
alias Archethic.P2P.Message.AcknowledgeStorage
alias Archethic.P2P.Message.AddMiningContext
alias Archethic.P2P.Message.CrossValidate
alias Archethic.P2P.Message.CrossValidationDone
alias Archethic.P2P.Message.Error
alias Archethic.P2P.Message.ValidationError
alias Archethic.P2P.Message.ReplicateTransactionChain
alias Archethic.P2P.Message.ReplicateTransaction
alias Archethic.P2P.Message.ValidationError
alias Archethic.P2P.Node

alias Archethic.Replication

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations
Expand Down Expand Up @@ -673,7 +669,7 @@ defmodule Archethic.Mining.DistributedWorkflow do

def handle_event(
:info,
{:add_ack_storage, node_public_key, signature},
{:ack_replication, signature, node_public_key},
:replication,
data = %{start_time: start_time, context: context = %ValidationContext{transaction: tx}}
) do
Expand Down Expand Up @@ -739,14 +735,16 @@ defmodule Archethic.Mining.DistributedWorkflow do
beacon_storage_nodes = ValidationContext.get_beacon_replication_nodes(context)
P2P.broadcast_message(P2P.distinct_nodes([welcome_node | beacon_storage_nodes]), message)

validated_tx = ValidationContext.get_validated_transaction(context)

context
|> ValidationContext.get_io_replication_nodes()
|> P2P.broadcast_message(
with tx <- ValidationContext.get_validated_transaction(context) do
if Transaction.network_type?(tx.type),
do: %ReplicateTransactionChain{transaction: tx},
else: %ReplicateTransaction{transaction: tx}
end
if Transaction.network_type?(validated_tx.type),
do: %ReplicateTransactionChain{
transaction: validated_tx
},
else: %ReplicateTransaction{transaction: validated_tx}
)

:keep_state_and_data
Expand Down Expand Up @@ -986,8 +984,7 @@ defmodule Archethic.Mining.DistributedWorkflow do

defp request_replication(
context = %ValidationContext{
transaction: tx,
previous_transaction: previous_transaction
transaction: tx
}
) do
storage_nodes = ValidationContext.get_chain_replication_nodes(context)
Expand All @@ -998,54 +995,13 @@ defmodule Archethic.Mining.DistributedWorkflow do
transaction_type: tx.type
)

# Get transaction chain size to calculate timeout
chain_size =
case previous_transaction do
nil ->
1

previous_transaction ->
# Get transaction chain size to calculate timeout
case Archethic.get_transaction_chain_length(previous_transaction.address) do
{:ok, chain_size} ->
chain_size

_ ->
1
end
end

timeout = Message.get_max_timeout() + Message.get_max_timeout() * chain_size

validated_tx = ValidationContext.get_validated_transaction(context)

message = %ReplicateTransactionChain{
transaction: validated_tx
transaction: validated_tx,
replying_node: Crypto.first_node_public_key()
}

me = self()

Task.Supervisor.async_stream_nolink(
TaskSupervisor,
storage_nodes,
fn node ->
{P2P.send_message(node, message, timeout), node}
end,
ordered: false,
on_timeout: :kill_task,
timeout: timeout + 2000
)
|> Stream.filter(&match?({:ok, {{:ok, _}, _}}, &1))
|> Stream.map(fn {:ok, {{:ok, response}, node}} -> {response, node} end)
|> Stream.each(fn
{%Error{reason: reason}, _node} ->
send(me, {:replication_error, reason})

{%AcknowledgeStorage{
signature: signature
}, %Node{first_public_key: node_public_key}} ->
send(me, {:add_ack_storage, node_public_key, signature})
end)
|> Stream.run()
P2P.broadcast_message(storage_nodes, message)
end
end
Loading

0 comments on commit 53845dd

Please sign in to comment.