Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move inputs from ETS to disk #684

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 50 additions & 21 deletions lib/archethic/account/mem_tables/token_ledger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule Archethic.Account.MemTables.TokenLedger do
@ledger_table :archethic_token_ledger
@unspent_output_index_table :archethic_token_unspent_output_index

alias Archethic.DB

alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput

alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput
Expand All @@ -17,8 +19,11 @@ defmodule Archethic.Account.MemTables.TokenLedger do

@doc """
Initialize the Token ledger tables:
- Main Token ledger as ETS set ({token, to, from, token_id}, amount, spent?, timestamp, protocol_version)
- Token Unspent Output Index as ETS bag (to, {from, token, token_id})
- Main Token ledger as ETS set ({to, from, token_address, token_id}, amount, spent?, timestamp, protocol_version)
- Token Unspent Output Index as ETS bag (to, from, token_address, token_id)

The ETS ledger and index caches the unspent UTXO
Once a UTXO is spent, it is removed from the ETS and written to disk to reduce memory footprint
"""
@spec start_link(args :: list()) :: GenServer.on_start()
def start_link(args \\ []) do
Expand Down Expand Up @@ -130,34 +135,58 @@ defmodule Archethic.Account.MemTables.TokenLedger do
"""
@spec spend_all_unspent_outputs(binary()) :: :ok
def spend_all_unspent_outputs(address) do
{:ok, pid} = DB.start_inputs_writer(:token, address)

@unspent_output_index_table
|> :ets.lookup(address)
|> Enum.each(fn {_, from, token_address, token_id} ->
:ets.update_element(@ledger_table, {address, from, token_address, token_id}, {3, true})
|> Enum.each(fn {to, from, token_address, token_id} ->
[{_, amount, _, timestamp, protocol_version}] =
:ets.lookup(@ledger_table, {to, from, token_address, token_id})

DB.append_input(pid, %VersionedTransactionInput{
protocol_version: protocol_version,
input: %TransactionInput{
from: from,
amount: amount,
spent?: true,
timestamp: timestamp,
type: {:token, token_address, token_id}
}
})

:ets.delete(@ledger_table, {address, from, token_address, token_id})
end)

:ets.delete(@unspent_output_index_table, address)

DB.stop_inputs_writer(pid)
end

@doc """
Retrieve the entire inputs for a given address (spent or unspent)
"""
@spec get_inputs(binary()) :: list(VersionedTransactionInput.t())
def get_inputs(address) when is_binary(address) do
@unspent_output_index_table
|> :ets.lookup(address)
|> Enum.map(fn {_, from, token_address, token_id} ->
[{_, amount, spent?, timestamp, protocol_version}] =
:ets.lookup(@ledger_table, {address, from, token_address, token_id})

%VersionedTransactionInput{
input: %TransactionInput{
from: from,
amount: amount,
type: {:token, token_address, token_id},
spent?: spent?,
timestamp: timestamp
},
protocol_version: protocol_version
}
end)
case :ets.lookup(@unspent_output_index_table, address) do
[] ->
DB.get_inputs(:token, address)

inputs ->
Enum.map(inputs, fn {_, from, token_address, token_id} ->
[{_, amount, spent?, timestamp, protocol_version}] =
:ets.lookup(@ledger_table, {address, from, token_address, token_id})

%VersionedTransactionInput{
input: %TransactionInput{
from: from,
amount: amount,
type: {:token, token_address, token_id},
spent?: spent?,
timestamp: timestamp
},
protocol_version: protocol_version
}
end)
end
end
end
74 changes: 52 additions & 22 deletions lib/archethic/account/mem_tables/uco_ledger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule Archethic.Account.MemTables.UCOLedger do
@ledger_table :archethic_uco_ledger
@unspent_output_index_table :archethic_uco_unspent_output_index

alias Archethic.DB

alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput

alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput
Expand All @@ -19,6 +21,9 @@ defmodule Archethic.Account.MemTables.UCOLedger do
Initialize the UCO ledger tables:
- Main UCO ledger as ETS set ({{to, from}, amount, spent?, timestamp, reward?, protocol_version})
- UCO Unspent Output Index as ETS bag (to, from)

The ETS ledger and index caches the unspent UTXO
Once a UTXO is spent, it is removed from the ETS and written to disk to reduce memory footprint
"""
@spec start_link(args :: list()) :: GenServer.on_start()
def start_link(args \\ []) do
Expand Down Expand Up @@ -113,46 +118,71 @@ defmodule Archethic.Account.MemTables.UCOLedger do
| acc
]

_ ->
[] ->
acc
end
end)
end

@doc """
Spend all the unspent outputs for the given address
Spend all the unspent outputs for the given address.
"""
@spec spend_all_unspent_outputs(binary()) :: :ok
def spend_all_unspent_outputs(address) do
{:ok, pid} = DB.start_inputs_writer(:UCO, address)

@unspent_output_index_table
|> :ets.lookup(address)
|> Enum.each(&:ets.update_element(@ledger_table, &1, {3, true}))
|> Enum.each(fn {to, from} ->
[{_, amount, _, timestamp, reward?, protocol_version}] =
:ets.lookup(@ledger_table, {to, from})

:ok
DB.append_input(pid, %VersionedTransactionInput{
protocol_version: protocol_version,
input: %TransactionInput{
from: from,
amount: amount,
spent?: true,
reward?: reward?,
timestamp: timestamp,
type: :UCO
}
})

:ets.delete(@ledger_table, {to, from})
end)

:ets.delete(@unspent_output_index_table, address)

DB.stop_inputs_writer(pid)
end

@doc """
Retrieve the entire inputs for a given address (spent or unspent)
"""
@spec get_inputs(binary()) :: list(VersionedTransactionInput.t())
def get_inputs(address) when is_binary(address) do
@unspent_output_index_table
|> :ets.lookup(address)
|> Enum.map(fn {_, from} ->
[{_, amount, spent?, timestamp, reward?, protocol_version}] =
:ets.lookup(@ledger_table, {address, from})

%VersionedTransactionInput{
input: %TransactionInput{
from: from,
amount: amount,
spent?: spent?,
type: :UCO,
timestamp: timestamp,
reward?: reward?
},
protocol_version: protocol_version
}
end)
case :ets.lookup(@unspent_output_index_table, address) do
[] ->
DB.get_inputs(:UCO, address)

inputs ->
Enum.map(inputs, fn {_, from} ->
[{_, amount, spent?, timestamp, reward?, protocol_version}] =
:ets.lookup(@ledger_table, {address, from})

%VersionedTransactionInput{
input: %TransactionInput{
from: from,
amount: amount,
spent?: spent?,
type: :UCO,
timestamp: timestamp,
reward?: reward?
},
protocol_version: protocol_version
}
end)
end
end
end
8 changes: 8 additions & 0 deletions lib/archethic/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Archethic.DB do
alias __MODULE__.EmbeddedImpl

alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.VersionedTransactionInput

use Knigge, otp_app: :archethic, default: EmbeddedImpl

Expand Down Expand Up @@ -76,4 +77,11 @@ defmodule Archethic.DB do

@callback get_bootstrap_info(key :: String.t()) :: String.t() | nil
@callback set_bootstrap_info(key :: String.t(), value :: String.t()) :: :ok

@callback start_inputs_writer(ledger :: :UCO | :token, address :: binary()) :: {:ok, pid()}
@callback stop_inputs_writer(pid :: pid()) :: :ok
@callback append_input(pid :: pid(), VersionedTransactionInput.t()) ::
:ok
@callback get_inputs(ledger :: :UCO | :token, address :: binary()) ::
list(VersionedTransactionInput.t())
end
37 changes: 31 additions & 6 deletions lib/archethic/db/embedded_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ defmodule Archethic.DB.EmbeddedImpl do
alias __MODULE__.ChainIndex
alias __MODULE__.ChainReader
alias __MODULE__.ChainWriter
alias __MODULE__.InputsReader
alias __MODULE__.InputsWriter
alias __MODULE__.P2PView
alias __MODULE__.StatsInfo

alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.VersionedTransactionInput

alias Archethic.Utils

Expand Down Expand Up @@ -279,12 +282,8 @@ defmodule Archethic.DB.EmbeddedImpl do
"""
@spec list_transactions(fields :: list()) :: Enumerable.t() | list(Transaction.t())
def list_transactions(fields \\ []) when is_list(fields) do
db_path()
|> ChainIndex.list_all_addresses()
|> Stream.map(fn address ->
{:ok, tx} = get_transaction(address, fields)
tx
end)
ChainIndex.list_genesis_addresses()
|> Stream.flat_map(&ChainReader.stream_scan_chain(&1, nil, fields, db_path()))
end

@doc """
Expand Down Expand Up @@ -366,4 +365,30 @@ defmodule Archethic.DB.EmbeddedImpl do
def scan_chain(genesis_address, limit_address, fields \\ [], paging_state \\ nil) do
ChainReader.scan_chain(genesis_address, limit_address, fields, paging_state, db_path())
end

@doc """
Start a process responsible to write the inputs
"""
@spec start_inputs_writer(ledger :: :UCO | :token, address :: binary()) :: {:ok, pid()}
defdelegate start_inputs_writer(ledger, address), to: InputsWriter, as: :start_link

@doc """
Stop the process responsible to write the inputs
"""
@spec stop_inputs_writer(pid :: pid()) :: :ok
defdelegate stop_inputs_writer(pid), to: InputsWriter, as: :stop

@doc """
Appends one input to existing inputs
"""
@spec append_input(pid :: pid(), VersionedTransactionInput.t()) ::
:ok
defdelegate append_input(pid, input), to: InputsWriter, as: :append_input

@doc """
Read the list of inputs available at address
"""
@spec get_inputs(ledger :: :UCO | :token, address :: binary()) ::
list(VersionedTransactionInput.t())
defdelegate get_inputs(ledger, address), to: InputsReader, as: :get_inputs
end
31 changes: 29 additions & 2 deletions lib/archethic/db/embedded_impl/chain_reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
"""
@spec scan_chain(
genesis_address :: binary(),
limit_address :: binary(),
list(),
limit_address :: nil | binary(),
fields :: list(),
paging_address :: nil | binary(),
db_path :: binary()
) ::
Expand Down Expand Up @@ -279,6 +279,33 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
end
end

@doc """
Stream chain tx from the beginning until a given limit address
"""
@spec stream_scan_chain(
genesis_address :: binary(),
limit_address :: nil | binary(),
fields :: list(),
db_path :: binary()
) :: Enumerable.t()
def stream_scan_chain(genesis_address, limit_address, fields, db_path) do
Stream.resource(
fn -> scan_chain(genesis_address, limit_address, fields, nil, db_path) end,
fn
{transactions, true, paging_state} ->
{transactions,
scan_chain(genesis_address, limit_address, fields, paging_state, db_path)}

{transactions, false, _} ->
{transactions, :eof}

:eof ->
{:halt, nil}
end,
fn _ -> :ok end
)
end

@doc """

## Examples
Expand Down
34 changes: 34 additions & 0 deletions lib/archethic/db/embedded_impl/inputs_reader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Archethic.DB.EmbeddedImpl.InputsReader do
@moduledoc """
Inputs are stored by destination address. 1 file per address per ledger
"""

alias Archethic.DB.EmbeddedImpl.InputsWriter
alias Archethic.TransactionChain.VersionedTransactionInput
alias Archethic.Utils

@spec get_inputs(ledger :: InputsWriter.ledger(), address :: binary()) ::
list(VersionedTransactionInput.t())
def get_inputs(ledger, address) do
filename = InputsWriter.address_to_filename(ledger, address)

case File.read(filename) do
{:error, :enoent} ->
[]

{:ok, bin} ->
bin
|> deserialize_inputs_file([])
end
end

defp deserialize_inputs_file(<<>>, acc), do: acc

defp deserialize_inputs_file(bitstring, acc) do
{input_bit_size, rest} = Utils.VarInt.get_value(bitstring)
<<input_bitstring::bitstring-size(input_bit_size), rest::bitstring>> = rest

{input, _padding} = VersionedTransactionInput.deserialize(input_bitstring)
deserialize_inputs_file(rest, [input | acc])
end
end
Loading