Skip to content

Commit

Permalink
Prioritize local fetch of transaction in the replication
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Dec 13, 2022
1 parent e8bca36 commit eff0635
Showing 1 changed file with 20 additions and 55 deletions.
75 changes: 20 additions & 55 deletions lib/archethic/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -208,71 +208,36 @@ defmodule Archethic.Replication do
end
end

defp fetch_context(tx = %Transaction{type: type}, self_repair?, download_nodes) do
if Transaction.network_type?(type) do
fetch_context_for_network_transaction(tx, self_repair?, download_nodes)
else
fetch_context_for_regular_transaction(tx, self_repair?, download_nodes)
end
end

defp fetch_context_for_network_transaction(tx = %Transaction{}, self_repair?, download_nodes) do
defp fetch_context(tx = %Transaction{}, self_repair?, download_nodes) do
previous_address = Transaction.previous_address(tx)

Logger.debug(
"Try to fetch network previous transaction locally (#{Base.encode16(previous_address)})",
transaction_address: Base.encode16(tx.address),
transaction_type: tx.type
)

# If the transaction is missing (orphan) and the previous chain has not been synchronized
# We request other nodes to give us the information
previous_transaction =
case TransactionChain.get_transaction(previous_address) do
{:ok, tx = %Transaction{}} ->
tx

{:error, :transaction_not_exists} ->
Logger.debug(
"Try to fetch network previous transaction (#{Base.encode16(previous_address)}) from remote nodes (possibility of an orphan state)",
transaction_address: Base.encode16(tx.address),
transaction_type: tx.type
)

TransactionContext.fetch_transaction(previous_address, download_nodes)
end

Logger.debug("Previous transaction #{inspect(previous_transaction)}",
"Try to fetch previous transaction (#{Base.encode16(previous_address)})",
transaction_address: Base.encode16(tx.address),
transaction_type: tx.type
)

inputs = if self_repair?, do: [], else: fetch_inputs(tx, download_nodes)

{previous_transaction, inputs}
end

defp fetch_context_for_regular_transaction(tx = %Transaction{}, self_repair?, download_nodes) do
previous_address = Transaction.previous_address(tx)

t1 =
# First, we check locally if the transaction exists
# Otherwise we check others nodes
previous_transaction_task =
Task.Supervisor.async(TaskSupervisor, fn ->
Logger.debug(
"Fetch previous transaction #{Base.encode16(previous_address)}",
transaction_address: Base.encode16(tx.address),
transaction_type: tx.type
)

TransactionContext.fetch_transaction(previous_address, download_nodes)
case TransactionChain.get_transaction(previous_address) do
{:ok, tx = %Transaction{}} ->
tx

{:error, :transaction_not_exists} ->
Logger.debug(
"Try to fetch previous transaction (#{Base.encode16(previous_address)}) from remote nodes",
transaction_address: Base.encode16(tx.address),
transaction_type: tx.type
)

TransactionContext.fetch_transaction(previous_address, download_nodes)
end
end)

{previous_transaction, inputs} =
if self_repair? do
{Task.await(t1, Message.get_max_timeout() + 1000), []}
else
inputs = fetch_inputs(tx, download_nodes)
{Task.await(t1, Message.get_max_timeout() + 1000), inputs}
end
inputs = if self_repair?, do: [], else: fetch_inputs(tx, download_nodes)
previous_transaction = Task.await(previous_transaction_task, Message.get_max_timeout() + 1000)

Logger.debug("Previous transaction #{inspect(previous_transaction)}",
transaction_address: Base.encode16(tx.address),
Expand Down

0 comments on commit eff0635

Please sign in to comment.