Skip to content

Commit

Permalink
worker_should_not_be_executed_while_replicating_a_transaction_from_se…
Browse files Browse the repository at this point in the history
…lf_repair_#852
  • Loading branch information
apoorv-2204 committed Jan 25, 2023
1 parent e1cf03d commit ecbfebf
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 22 deletions.
4 changes: 2 additions & 2 deletions lib/archethic/contracts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ defmodule Archethic.Contracts do
@doc """
Load transaction into the Smart Contract context leveraging the interpreter
"""
@spec load_transaction(Transaction.t()) :: :ok
defdelegate load_transaction(tx), to: Loader
@spec load_transaction(Transaction.t(), list()) :: :ok
defdelegate load_transaction(tx, opts \\ []), to: Loader

@spec accept_new_contract?(Transaction.t() | nil, Transaction.t(), DateTime.t()) :: boolean()
def accept_new_contract?(nil, _, _), do: true
Expand Down
34 changes: 22 additions & 12 deletions lib/archethic/contracts/loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule Archethic.Contracts.Loader do
_ -> true
end)
|> Stream.map(fn {:ok, tx} -> tx end)
|> Stream.each(&load_transaction(&1, true))
|> Stream.each(&load_transaction(&1, from_db: true))
|> Stream.run()

{:ok, []}
Expand All @@ -43,16 +43,22 @@ defmodule Archethic.Contracts.Loader do
@doc """
Load the smart contracts based on transaction involving smart contract code
"""
@spec load_transaction(Transaction.t()) :: :ok
def load_transaction(_tx, from_db \\ false)
@spec load_transaction(Transaction.t(), list()) :: :ok
def load_transaction(tx, opts \\ []) do
from_db = Keyword.get(opts, :from_db, false)
from_self_repair = Keyword.get(opts, :from_self_repair, false)

def load_transaction(
do_load_transaction(tx, from_db, from_self_repair)
end

def do_load_transaction(
tx = %Transaction{
address: address,
type: type,
data: %TransactionData{code: code}
},
_from_db
_from_db,
_from_self_repair
)
when code != "" do
stop_contract(Transaction.previous_address(tx))
Expand All @@ -77,7 +83,7 @@ defmodule Archethic.Contracts.Loader do
end
end

def load_transaction(
def do_load_transaction(
tx = %Transaction{
address: tx_address,
type: tx_type,
Expand All @@ -87,7 +93,8 @@ defmodule Archethic.Contracts.Loader do
protocol_version: protocol_version
}
},
false
_from_db = false,
from_self_repair?
)
when recipients != [] do
Enum.each(recipients, fn contract_address ->
Expand All @@ -96,8 +103,10 @@ defmodule Archethic.Contracts.Loader do
transaction_type: tx_type
)

# execute asynchronously the contract
Worker.execute(contract_address, tx)
unless from_self_repair? do
# execute asynchronously the contract
Worker.execute(contract_address, tx)
end

TransactionLookup.add_contract_transaction(
contract_address,
Expand All @@ -113,7 +122,7 @@ defmodule Archethic.Contracts.Loader do
end)
end

def load_transaction(
def do_load_transaction(
%Transaction{
address: address,
type: type,
Expand All @@ -123,7 +132,8 @@ defmodule Archethic.Contracts.Loader do
protocol_version: protocol_version
}
},
true
_from_db = true,
false
)
when recipients != [] do
Enum.each(
Expand All @@ -137,7 +147,7 @@ defmodule Archethic.Contracts.Loader do
)
end

def load_transaction(_tx, _), do: :ok
def do_load_transaction(_tx, _, _), do: :ok

@doc """
Termine a contract execution
Expand Down
18 changes: 10 additions & 8 deletions lib/archethic/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ defmodule Archethic.Replication do
type: type,
validation_stamp: %ValidationStamp{timestamp: timestamp}
},
download_nodes \\ P2P.authorized_and_available_nodes()
download_nodes \\ P2P.authorized_and_available_nodes(),
self_repair? \\ false
) do
start_time = System.monotonic_time()

Expand All @@ -155,13 +156,13 @@ defmodule Archethic.Replication do
|> Election.chain_storage_nodes(download_nodes)
|> Utils.key_in_node_list?(Crypto.first_node_public_key())

if storage_node?, do: ingest_transaction(tx, false)
if storage_node?, do: ingest_transaction(tx, false, self_repair?)
end)
|> Stream.run()

TransactionChain.write_transaction(tx)

:ok = ingest_transaction(tx, false)
:ok = ingest_transaction(tx, false, self_repair?)

Logger.info("Replication finished",
transaction_address: Base.encode16(address),
Expand Down Expand Up @@ -202,7 +203,7 @@ defmodule Archethic.Replication do
) do
case validate_transaction(tx, self_repair?, download_nodes) do
:ok ->
sync_transaction_chain(tx, download_nodes)
sync_transaction_chain(tx, download_nodes, self_repair?)

{:error, reason} ->
Logger.warning("Invalid transaction for replication - #{inspect(reason)}",
Expand Down Expand Up @@ -247,7 +248,7 @@ defmodule Archethic.Replication do
case TransactionValidator.validate(tx, self_repair?) do
:ok ->
:ok = TransactionChain.write_transaction(tx, :io)
ingest_transaction(tx, true)
ingest_transaction(tx, true, self_repair?)

Logger.info("Replication finished",
transaction_address: Base.encode16(address),
Expand Down Expand Up @@ -564,14 +565,15 @@ defmodule Archethic.Replication do
- Transactions with smart contract deploy instances of them or can put in pending state waiting approval signatures
- Code approval transactions may trigger the TestNets deployments or hot-reloads
"""
@spec ingest_transaction(Transaction.t(), boolean()) :: :ok
def ingest_transaction(tx = %Transaction{}, io_transaction?) do
@spec ingest_transaction(Transaction.t(), boolean(), boolean()) :: :ok
def ingest_transaction(tx = %Transaction{}, io_transaction?, self_repair? \\ false) do
#also called from bootstrap
TransactionChain.load_transaction(tx)
Crypto.load_transaction(tx)
P2P.load_transaction(tx)
SharedSecrets.load_transaction(tx)
Account.load_transaction(tx, io_transaction?)
Contracts.load_transaction(tx)
Contracts.load_transaction(tx, from_self_repair: self_repair?)
OracleChain.load_transaction(tx)
Reward.load_transaction(tx)
:ok
Expand Down

0 comments on commit ecbfebf

Please sign in to comment.