Skip to content

Commit

Permalink
Ingest inputs in the genesis pool
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Sep 13, 2023
1 parent 26e3c89 commit 1e2613f
Show file tree
Hide file tree
Showing 18 changed files with 712 additions and 43 deletions.
1 change: 1 addition & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ config :archethic, :mut_dir, "data_test"
config :archethic, Archethic.Account.MemTablesLoader, enabled: false
config :archethic, Archethic.Account.MemTables.TokenLedger, enabled: false
config :archethic, Archethic.Account.MemTables.UCOLedger, enabled: false
config :archethic, Archethic.Account.MemTables.GenesisInputLedger, enabled: false

config :archethic, Archethic.BeaconChain.Subset, enabled: false

Expand Down
9 changes: 8 additions & 1 deletion lib/archethic/account.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Archethic.Account do

alias __MODULE__.MemTables.TokenLedger
alias __MODULE__.MemTables.UCOLedger
alias __MODULE__.MemTables.GenesisInputLedger
alias __MODULE__.MemTablesLoader

alias Archethic.Crypto
Expand Down Expand Up @@ -60,6 +61,12 @@ defmodule Archethic.Account do
@doc """
Load the transaction into the Account context filling the memory tables for ledgers
"""
@spec load_transaction(Transaction.t(), boolean()) :: :ok
@spec load_transaction(Transaction.t(), opts :: MemTablesLoader.load_options()) :: :ok
defdelegate load_transaction(transaction, io_transaction?), to: MemTablesLoader

@doc """
Returns the list of all the inputs which have not been consumed for the given chain's address
"""
@spec get_genesis_unspent_inputs(binary()) :: list(TransactionInput.t())
defdelegate get_genesis_unspent_inputs(address), to: GenesisInputLedger, as: :get_unspent_inputs
end
120 changes: 120 additions & 0 deletions lib/archethic/account/mem_tables/genesis_input_ledger.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
defmodule Archethic.Account.MemTables.GenesisInputLedger do
@moduledoc """
Represents a memory table for all the inputs associated to a chain
to give the latest view of unspent inputs based on the consumed inputs
"""

@table_name :archethic_genesis_input_ledger

use GenServer
@vsn Mix.Project.config()[:version]

require Logger

alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp
alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations

alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.TransactionMovement

alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput
alias Archethic.TransactionChain.TransactionInput

@spec start_link(arg :: list()) :: GenServer.on_start()
def start_link(args \\ []) do
GenServer.start_link(__MODULE__, args)
end

def init(_args) do
Logger.info("Initialize InMemory Genesis Input Ledger...")

:ets.new(@table_name, [:bag, :named_table, :public, read_concurrency: true])

{:ok, %{table_name: @table_name}}
end

@doc """
Add new input in the genesis ledger
"""
@spec add_chain_input(
TransactionMovement.t(),
tx_address :: binary(),
tx_timestamp :: DateTime.t(),
genesis_address :: binary()
) :: :ok
def add_chain_input(
%TransactionMovement{amount: amount, type: type},
tx_address,
tx_timestamp = %DateTime{},
genesis_address
)
when is_binary(tx_address) and is_binary(genesis_address) do
:ets.insert(
@table_name,
{genesis_address,
%TransactionInput{
from: tx_address,
amount: amount,
type: type,
timestamp: tx_timestamp
}}
)

:ok
end

@doc """
Update the chain unspent outputs after reduce of the consumed transaction inputs
"""
@spec update_chain_inputs(Transaction.t(), genesis_address :: binary()) :: :ok
def update_chain_inputs(
%Transaction{
validation_stamp: %ValidationStamp{
ledger_operations: %LedgerOperations{
consumed_inputs: consumed_inputs,
unspent_outputs: unspent_outputs
}
}
},
genesis_address,
phase2? \\ false
)
when is_binary(genesis_address) do
# Filter unspent outputs which have been consumed and updated (required in the AEIP21 Phase 1)
updated_inputs =
Enum.filter(unspent_outputs, fn %UnspentOutput{type: type} ->
phase2? or Enum.any?(consumed_inputs, &(&1.type == type))
end)
|> Enum.map(fn %UnspentOutput{from: from, type: type, timestamp: timestamp, amount: amount} ->
%TransactionInput{from: from, type: type, timestamp: timestamp, amount: amount}
end)

# Remove the consumed inputs
Enum.each(consumed_inputs, fn %UnspentOutput{
from: from,
type: type,
amount: amount,
timestamp: timestamp
} ->
Logger.debug("Consuming #{Base.encode16(from)} - for #{inspect(genesis_address)}")

:ets.delete_object(
@table_name,
{genesis_address,
%TransactionInput{from: from, type: type, amount: amount, timestamp: timestamp}}
)
end)

Enum.each(updated_inputs, &:ets.insert(@table_name, {genesis_address, &1}))
end

@doc """
Returns the list of all the inputs which have not been consumed for the given chain's address
"""
@spec get_unspent_inputs(binary()) :: list(TransactionInput.t())
def get_unspent_inputs(genesis_address) do
@table_name
|> :ets.lookup(genesis_address)
|> Enum.map(&elem(&1, 1))
end
end
68 changes: 59 additions & 9 deletions lib/archethic/account/mem_tables_loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ defmodule Archethic.Account.MemTablesLoader do
use GenServer
@vsn Mix.Project.config()[:version]

alias Archethic.Account.MemTables.GenesisInputLedger
alias Archethic.Account.MemTables.TokenLedger
alias Archethic.Account.MemTables.UCOLedger

alias Archethic.Crypto

alias Archethic.Election

alias Archethic.P2P

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp
Expand All @@ -20,6 +25,8 @@ defmodule Archethic.Account.MemTablesLoader do

alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput

alias Archethic.Utils

require Logger

alias Archethic.Reward
Expand All @@ -31,7 +38,7 @@ defmodule Archethic.Account.MemTablesLoader do
validation_stamp: [
:timestamp,
:protocol_version,
ledger_operations: [:fee, :unspent_outputs, :transaction_movements]
ledger_operations: [:fee, :unspent_outputs, :transaction_movements, :consumed_inputs]
]
]

Expand All @@ -40,7 +47,8 @@ defmodule Archethic.Account.MemTablesLoader do
:oracle_summary,
:node_shared_secrets,
:origin,
:on_chain_wallet
:keychain,
:keychain_access
]

@spec start_link(args :: list()) :: GenServer.on_start()
Expand All @@ -50,24 +58,33 @@ defmodule Archethic.Account.MemTablesLoader do

@spec init(args :: list()) :: {:ok, []}
def init(_args) do
authorized_nodes = P2P.authorized_and_available_nodes()

TransactionChain.list_io_transactions(@query_fields)
|> Stream.each(&load_transaction(&1, true))
|> Stream.each(
&load_transaction(&1, io_transaction?: true, authorized_nodes: authorized_nodes)
)
|> Stream.run()

TransactionChain.list_all(@query_fields)
|> Stream.reject(&(&1.type in @excluded_types))
|> Stream.each(&load_transaction(&1, false))
|> Enum.sort_by(& &1.validation_stamp.timestamp, {:asc, DateTime})
|> Stream.each(
&load_transaction(&1, io_transaction?: false, authorized_nodes: authorized_nodes)
)
|> Stream.run()

{:ok, []}
end

@type load_options :: [io_transaction?: boolean(), authorized_nodes: list(Node.t())]

@doc """
Load the transaction into the memory tables
"""
@spec load_transaction(Transaction.t(), boolean()) :: :ok
@spec load_transaction(Transaction.t(), load_options()) :: :ok
def load_transaction(
%Transaction{
tx = %Transaction{
address: address,
type: tx_type,
previous_public_key: previous_public_key,
Expand All @@ -76,19 +93,47 @@ defmodule Archethic.Account.MemTablesLoader do
protocol_version: protocol_version,
ledger_operations: %LedgerOperations{
unspent_outputs: unspent_outputs,
transaction_movements: transaction_movements
transaction_movements: transaction_movements,
consumed_inputs: consumed_inputs
}
}
},
io_transaction?
) do
opts \\ []
)
when is_list(opts) do
io_transaction? = Keyword.get(opts, :io_transaction?, false)
authorized_nodes = Keyword.get(opts, :authorized_nodes, P2P.authorized_and_available_nodes())

unless io_transaction? do
previous_address = Crypto.derive_address(previous_public_key)

UCOLedger.spend_all_unspent_outputs(previous_address)
TokenLedger.spend_all_unspent_outputs(previous_address)
end

Enum.each(transaction_movements, fn mvt = %TransactionMovement{to: to} ->
genesis_address = TransactionChain.get_genesis_address(to)

# We need to determine whether the node is responsible of the transaction movements destination genesis pool
if genesis_node?(genesis_address, authorized_nodes) do
GenesisInputLedger.add_chain_input(mvt, address, timestamp, genesis_address)
end
end)

genesis_address =
tx
|> Transaction.previous_address()
|> TransactionChain.get_genesis_address()

# We need to determine whether the node is responsible of the chain genesis pool as the transaction have been received as an I/O transaction.
chain_transaction? =
(not io_transaction? or TransactionChain.get_size(genesis_address) > 0) and
genesis_node?(genesis_address, authorized_nodes)

if chain_transaction? and length(consumed_inputs) > 0 do
GenesisInputLedger.update_chain_inputs(tx, genesis_address)
end

:ok =
set_transaction_movements(
address,
Expand All @@ -106,6 +151,11 @@ defmodule Archethic.Account.MemTablesLoader do
)
end

defp genesis_node?(genesis_address, nodes) do
genesis_nodes = Election.chain_storage_nodes(genesis_address, nodes)
Utils.key_in_node_list?(genesis_nodes, Crypto.first_node_public_key())
end

defp set_unspent_outputs(address, unspent_outputs, protocol_version) do
unspent_outputs
|> Enum.filter(&(&1.amount > 0))
Expand Down
2 changes: 2 additions & 0 deletions lib/archethic/account/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Archethic.Account.Supervisor do

use Supervisor

alias Archethic.Account.MemTables.GenesisInputLedger
alias Archethic.Account.MemTables.TokenLedger
alias Archethic.Account.MemTables.UCOLedger
alias Archethic.Account.MemTablesLoader
Expand All @@ -17,6 +18,7 @@ defmodule Archethic.Account.Supervisor do
children = [
TokenLedger,
UCOLedger,
GenesisInputLedger,
MemTablesLoader
]

Expand Down
16 changes: 15 additions & 1 deletion lib/archethic/mining/distributed_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ defmodule Archethic.Mining.DistributedWorkflow do

chain_storage_nodes = Election.chain_storage_nodes(tx.address, authorized_nodes)

previous_address = Transaction.previous_address(tx)
previous_storage_nodes = Election.chain_storage_nodes(previous_address, authorized_nodes)

{:ok, genesis_address} =
TransactionChain.fetch_genesis_address(previous_address, previous_storage_nodes)

genesis_storage_nodes = Election.chain_storage_nodes(genesis_address, authorized_nodes)

beacon_storage_nodes =
Election.beacon_storage_nodes(
BeaconChain.subset_from_address(tx.address),
Expand All @@ -213,6 +221,12 @@ defmodule Archethic.Mining.DistributedWorkflow do
else
resolved_addresses
|> Enum.map(fn {_origin, resolved} -> resolved end)
|> Enum.flat_map(fn address ->
{:ok, genesis_address} =
TransactionChain.fetch_genesis_address(previous_address, previous_storage_nodes)

[genesis_address, address]
end)
|> Election.io_storage_nodes(authorized_nodes)
end

Expand All @@ -226,7 +240,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
cross_validation_nodes: cross_validation_nodes,
chain_storage_nodes: chain_storage_nodes,
beacon_storage_nodes: beacon_storage_nodes,
io_storage_nodes: io_storage_nodes,
io_storage_nodes: P2P.distinct_nodes(io_storage_nodes ++ genesis_storage_nodes),
validation_time: validation_time,
resolved_addresses: resolved_addresses,
contract_context: contract_context
Expand Down
16 changes: 15 additions & 1 deletion lib/archethic/mining/standalone_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,26 @@ defmodule Archethic.Mining.StandaloneWorkflow do

resolved_addresses = TransactionChain.resolve_transaction_addresses(tx, validation_time)

previous_address = Transaction.previous_address(tx)
previous_storage_nodes = Election.chain_storage_nodes(previous_address, authorized_nodes)

{:ok, genesis_address} =
TransactionChain.fetch_genesis_address(previous_address, previous_storage_nodes)

genesis_storage_nodes = Election.chain_storage_nodes(genesis_address, authorized_nodes)

io_storage_nodes =
if Transaction.network_type?(tx.type) do
P2P.list_nodes()
else
resolved_addresses
|> Enum.map(fn {_origin, resolved} -> resolved end)
|> Enum.flat_map(fn address ->
{:ok, genesis_address} =
TransactionChain.fetch_genesis_address(previous_address, previous_storage_nodes)

[genesis_address, address]
end)
|> Election.io_storage_nodes(authorized_nodes)
end

Expand Down Expand Up @@ -113,7 +127,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do
cross_validation_nodes: [current_node],
chain_storage_nodes: chain_storage_nodes,
beacon_storage_nodes: beacon_storage_nodes,
io_storage_nodes: io_storage_nodes,
io_storage_nodes: P2P.distinct_nodes(io_storage_nodes ++ genesis_storage_nodes),
validation_time: validation_time,
resolved_addresses: resolved_addresses,
contract_context: contract_context
Expand Down
Loading

0 comments on commit 1e2613f

Please sign in to comment.