Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate chain and io transaction in DB storage #738

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ defmodule Archethic do
alias __MODULE__.P2P
alias __MODULE__.P2P.Node

alias __MODULE__.DB

alias __MODULE__.P2P.Message.Balance
alias __MODULE__.P2P.Message.GetBalance
alias __MODULE__.P2P.Message.NewTransaction
Expand Down Expand Up @@ -275,7 +273,7 @@ defmodule Archethic do
try do
{local_chain, paging_address} =
with true <- paging_address != nil,
true <- DB.transaction_exists?(paging_address),
true <- TransactionChain.transaction_exists?(paging_address),
last_address when last_address != nil <-
TransactionChain.get_last_local_address(address),
true <- last_address != paging_address do
Expand Down
4 changes: 2 additions & 2 deletions lib/archethic/account.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ defmodule Archethic.Account do
@doc """
Load the transaction into the Account context filling the memory tables for ledgers
"""
@spec load_transaction(Transaction.t()) :: :ok
defdelegate load_transaction(transaction), to: MemTablesLoader
@spec load_transaction(Transaction.t(), boolean()) :: :ok
defdelegate load_transaction(transaction, io_transaction?), to: MemTablesLoader
end
47 changes: 28 additions & 19 deletions lib/archethic/account/mem_tables_loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ defmodule Archethic.Account.MemTablesLoader do

@spec init(args :: list()) :: {:ok, []}
def init(_args) do
TransactionChain.list_io_transactions(@query_fields)
|> Stream.each(&load_transaction(&1, true))
|> Stream.run()

TransactionChain.list_all(@query_fields)
|> Stream.reject(&(&1.type in @excluded_types))
|> Stream.each(&load_transaction/1)
|> Stream.each(&load_transaction(&1, false))
|> Stream.run()

{:ok, []}
Expand All @@ -67,25 +71,30 @@ defmodule Archethic.Account.MemTablesLoader do
@doc """
Load the transaction into the memory tables
"""
@spec load_transaction(Transaction.t()) :: :ok
def load_transaction(%Transaction{
address: address,
type: tx_type,
previous_public_key: previous_public_key,
validation_stamp: %ValidationStamp{
timestamp: timestamp,
protocol_version: protocol_version,
ledger_operations: %LedgerOperations{
fee: fee,
unspent_outputs: unspent_outputs,
transaction_movements: transaction_movements
@spec load_transaction(Transaction.t(), boolean()) :: :ok
def load_transaction(
%Transaction{
address: address,
type: tx_type,
previous_public_key: previous_public_key,
validation_stamp: %ValidationStamp{
timestamp: timestamp,
protocol_version: protocol_version,
ledger_operations: %LedgerOperations{
fee: fee,
unspent_outputs: unspent_outputs,
transaction_movements: transaction_movements
}
}
}
}) do
previous_address = Crypto.derive_address(previous_public_key)

UCOLedger.spend_all_unspent_outputs(previous_address)
TokenLedger.spend_all_unspent_outputs(previous_address)
},
io_transaction?
) do
unless io_transaction? do
previous_address = Crypto.derive_address(previous_public_key)

UCOLedger.spend_all_unspent_outputs(previous_address)
TokenLedger.spend_all_unspent_outputs(previous_address)
end

burn_storage_nodes =
Election.storage_nodes(
Expand Down
22 changes: 12 additions & 10 deletions lib/archethic/contracts/loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ defmodule Archethic.Contracts.Loader do
def init(_opts) do
DB.list_last_transaction_addresses()
|> Stream.map(fn address ->
{:ok, tx} =
DB.get_transaction(address, [
:address,
:previous_public_key,
:data,
validation_stamp: [:timestamp]
])

tx
DB.get_transaction(address, [
:address,
:previous_public_key,
:data,
validation_stamp: [:timestamp]
])
end)
|> Stream.filter(&(&1.data.code != ""))
|> Stream.filter(fn
{:ok, %Transaction{data: %TransactionData{code: ""}}} -> false
{:error, _} -> false
_ -> true
end)
|> Stream.map(fn {:ok, tx} -> tx end)
|> Stream.each(&load_transaction(&1, true))
|> Stream.run()

Expand Down
7 changes: 5 additions & 2 deletions lib/archethic/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ defmodule Archethic.DB do

use Knigge, otp_app: :archethic, default: EmbeddedImpl

@type storage_type() :: :chain | :io

@callback get_transaction(address :: binary(), fields :: list()) ::
{:ok, Transaction.t()} | {:error, :transaction_not_exists}
@callback get_beacon_summary(summary_address :: binary()) ::
Expand All @@ -24,12 +26,13 @@ defmodule Archethic.DB do
fields :: list(),
opts :: [paging_state: nil | binary(), after: DateTime.t()]
) :: Enumerable.t()
@callback write_transaction(Transaction.t()) :: :ok
@callback write_transaction(Transaction.t(), storage_type()) :: :ok
@callback write_beacon_summary(Summary.t()) :: :ok
@callback clear_beacon_summaries() :: :ok
@callback write_beacon_summaries_aggregate(SummaryAggregate.t()) :: :ok
@callback write_transaction_chain(Enumerable.t()) :: :ok
@callback list_transactions(fields :: list()) :: Enumerable.t()
@callback list_io_transactions(fields :: list()) :: Enumerable.t()
@callback add_last_transaction_address(binary(), binary(), DateTime.t()) :: :ok
@callback list_last_transaction_addresses() :: Enumerable.t()

Expand Down Expand Up @@ -61,7 +64,7 @@ defmodule Archethic.DB do
@callback get_latest_burned_fees() :: non_neg_integer()
@callback get_nb_transactions() :: non_neg_integer()

@callback transaction_exists?(binary()) :: boolean()
@callback transaction_exists?(binary(), storage_type()) :: boolean()

@callback register_p2p_summary(list({Crypto.key(), boolean(), float(), DateTime.t()})) :: :ok

Expand Down
56 changes: 41 additions & 15 deletions lib/archethic/db/embedded_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule Archethic.DB.EmbeddedImpl do

alias Archethic.Crypto

alias Archethic.DB

alias __MODULE__.BootstrapInfo
alias __MODULE__.ChainIndex
alias __MODULE__.ChainReader
Expand Down Expand Up @@ -74,38 +76,54 @@ defmodule Archethic.DB.EmbeddedImpl do
Enum.each(sorted_chain, fn tx ->
unless ChainIndex.transaction_exists?(tx.address, db_path()) do
ChainWriter.append_transaction(genesis_address, tx)

# Delete IO transaction if it exists as it is now stored as a chain
delete_io_transaction(tx.address)
end
end)
end

@doc """
Write a single transaction and append it to its chain
"""
@spec write_transaction(Transaction.t()) :: :ok
def write_transaction(tx = %Transaction{}) do
@spec write_transaction(Transaction.t(), DB.storage_type()) :: :ok
def write_transaction(tx, storage_type \\ :chain)

def write_transaction(tx = %Transaction{}, :chain) do
if ChainIndex.transaction_exists?(tx.address, db_path()) do
{:error, :transaction_already_exists}
else
previous_address = Transaction.previous_address(tx)

case ChainIndex.get_tx_entry(previous_address, db_path()) do
{:ok, %{genesis_address: genesis_address}} ->
do_write_transaction(genesis_address, tx)
genesis_address =
case ChainIndex.get_tx_entry(previous_address, db_path()) do
{:ok, %{genesis_address: genesis_address}} ->
genesis_address

{:error, :not_exists} ->
ChainWriter.append_transaction(previous_address, tx)
end
{:error, :not_exists} ->
previous_address
end

ChainWriter.append_transaction(genesis_address, tx)

# Delete IO transaction if it exists as it is now stored as a chain
delete_io_transaction(tx.address)
end
end

defp do_write_transaction(genesis_address, tx) do
if ChainIndex.transaction_exists?(tx.address, db_path()) do
def write_transaction(tx = %Transaction{}, :io) do
if ChainIndex.transaction_exists?(tx.address, :io, db_path()) do
{:error, :transaction_already_exists}
else
ChainWriter.append_transaction(genesis_address, tx)
ChainWriter.write_io_transaction(tx, db_path())
end
end

defp delete_io_transaction(address) do
ChainWriter.io_path(db_path(), address) |> File.rm()
:ok
end

@doc """
Write a beacon summary in DB
"""
Expand Down Expand Up @@ -137,9 +155,9 @@ defmodule Archethic.DB.EmbeddedImpl do
@doc """
Determine if the transaction exists or not
"""
@spec transaction_exists?(address :: binary()) :: boolean()
def transaction_exists?(address) when is_binary(address) do
ChainIndex.transaction_exists?(address, db_path())
@spec transaction_exists?(address :: binary(), storage_type :: DB.storage_type()) :: boolean()
def transaction_exists?(address, storage_type) when is_binary(address) do
ChainIndex.transaction_exists?(address, storage_type, db_path())
end

@doc """
Expand Down Expand Up @@ -278,14 +296,22 @@ defmodule Archethic.DB.EmbeddedImpl do
end

@doc """
List all the transactions
List all the transactions in chain storage
"""
@spec list_transactions(fields :: list()) :: Enumerable.t() | list(Transaction.t())
def list_transactions(fields \\ []) when is_list(fields) do
ChainIndex.list_genesis_addresses()
|> Stream.flat_map(&ChainReader.stream_scan_chain(&1, nil, fields, db_path()))
end

@doc """
List all the transactions in io storage
"""
@spec list_io_transactions(fields :: list()) :: Enumerable.t() | list(Transaction.t())
def list_io_transactions(fields \\ []) do
ChainReader.list_io_transactions(fields, db_path())
end

@doc """
List all the last transaction chain addresses
"""
Expand Down
11 changes: 8 additions & 3 deletions lib/archethic/db/embedded_impl/chain_index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
@vsn Mix.Project.config()[:version]

alias Archethic.Crypto
alias Archethic.DB
alias Archethic.DB.EmbeddedImpl.ChainWriter
alias Archethic.TransactionChain.Transaction

Expand Down Expand Up @@ -196,14 +197,18 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
@doc """
Determine if a transaction exists
"""
@spec transaction_exists?(binary(), String.t()) :: boolean()
def transaction_exists?(address = <<_::8, _::8, _subset::8, _digest::binary>>, db_path) do
@spec transaction_exists?(binary(), DB.storage_type(), String.t()) :: boolean()
def transaction_exists?(address, storage_type \\ :chain, db_path)

def transaction_exists?(address, storage_type, db_path) do
case get_tx_entry(address, db_path) do
{:ok, _} ->
true

{:error, :not_exists} ->
false
if storage_type == :io,
do: ChainWriter.io_path(db_path, address) |> File.exists?(),
else: false
end
end

Expand Down
54 changes: 54 additions & 0 deletions lib/archethic/db/embedded_impl/chain_reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,60 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
end
end

@doc """
List all the transactions in io storage
"""
@spec list_io_transactions(fields :: list(), db_path :: String.t()) ::
Enumerable.t() | list(Transaction.t())
def list_io_transactions(fields, db_path) do
io_transactions_path =
ChainWriter.base_io_path(db_path)
|> Path.join("*")
|> Path.wildcard()

Stream.resource(
fn -> io_transactions_path end,
fn
[filepath | rest] -> {[read_io_transaction(filepath, fields)], rest}
[] -> {:halt, nil}
end,
fn _ -> :ok end
)
end

defp read_io_transaction(filepath, fields) do
# Open the file as the position from the transaction in the chain file
fd = File.open!(filepath, [:binary, :read])

{:ok, <<size::32, version::32>>} = :file.read(fd, 8)
column_names = fields_to_column_names(fields)

# Ensure the validation stamp's protocol version is retrieved if we fetch validation stamp fields
has_validation_stamp_fields? =
Enum.any?(column_names, &String.starts_with?(&1, "validation_stamp."))

has_validation_stamp_protocol_field? =
Enum.any?(column_names, &(&1 == "validation_stamp.protocol_version"))

column_names =
if has_validation_stamp_fields? and !has_validation_stamp_protocol_field? do
["validation_stamp.protocol_version" | column_names]
else
column_names
end

# Read the transaction and extract requested columns from the fields arg
tx =
fd
|> read_transaction(column_names, size, 0)
|> Enum.into(%{})
|> decode_transaction_columns(version)

:file.close(fd)

tx
end

defp process_get_chain(fd, address, fields, opts, db_path) do
# Set the file cursor position to the paging state
case Keyword.get(opts, :paging_state) do
Expand Down
Loading