Skip to content

Commit

Permalink
eod
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorv-2204 committed Nov 8, 2022
1 parent cfe5b05 commit 03f082d
Show file tree
Hide file tree
Showing 21 changed files with 1,706 additions and 327 deletions.
3 changes: 2 additions & 1 deletion lib/archethic/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ defmodule Archethic.DB do
@callback get_nb_transactions() :: non_neg_integer()

@callback transaction_exists?(binary()) :: boolean()

@callback register_p2p_summary(
node_public_key :: Crypto.key(),
date :: DateTime.t(),
Expand All @@ -76,4 +75,6 @@ defmodule Archethic.DB do

@callback get_bootstrap_info(key :: String.t()) :: String.t() | nil
@callback set_bootstrap_info(key :: String.t(), value :: String.t()) :: :ok

@callback stream_genesis_addresses() :: Enumerable.t()
end
8 changes: 8 additions & 0 deletions lib/archethic/db/embedded_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -366,4 +366,12 @@ defmodule Archethic.DB.EmbeddedImpl do
def scan_chain(genesis_address, limit_address, fields \\ [], paging_state \\ nil) do
ChainReader.scan_chain(genesis_address, limit_address, fields, paging_state, db_path())
end

@doc """
Stream Genesis Addresses from last index ETS Table
"""
@spec stream_genesis_addresses :: Enumerable.t()
def stream_genesis_addresses() do
ChainIndex.list_genesis_addresses()
end
end
47 changes: 26 additions & 21 deletions lib/archethic/db/embedded_impl/chain_index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,22 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
alias Archethic.DB.EmbeddedImpl.ChainWriter
alias Archethic.TransactionChain.Transaction

@archethic_db_tx_index :archethic_db_tx_index
@archethic_db_chain_stats :archethic_db_chain_stats
@archethic_db_last_index :archethic_db_last_index
@archethic_db_type_stats :archethic_db_type_stats

def start_link(arg \\ []) do
GenServer.start_link(__MODULE__, arg, name: __MODULE__)
end

def init(opts) do
db_path = Keyword.fetch!(opts, :path)

:ets.new(:archethic_db_tx_index, [:set, :named_table, :public, read_concurrency: true])
:ets.new(:archethic_db_chain_stats, [:set, :named_table, :public, read_concurrency: true])
:ets.new(:archethic_db_last_index, [:set, :named_table, :public, read_concurrency: true])
:ets.new(:archethic_db_type_stats, [:set, :named_table, :public, read_concurrency: true])
:ets.new(@archethic_db_tx_index, [:set, :named_table, :public, read_concurrency: true])
:ets.new(@archethic_db_chain_stats, [:set, :named_table, :public, read_concurrency: true])
:ets.new(@archethic_db_last_index, [:set, :named_table, :public, read_concurrency: true])
:ets.new(@archethic_db_type_stats, [:set, :named_table, :public, read_concurrency: true])

fill_tables(db_path)

Expand Down Expand Up @@ -59,12 +64,12 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do

true =
:ets.insert(
:archethic_db_tx_index,
@archethic_db_tx_index,
{current_address, %{size: size, offset: offset, genesis_address: genesis_address}}
)

:ets.update_counter(
:archethic_db_chain_stats,
@archethic_db_chain_stats,
genesis_address,
[
{2, size},
Expand All @@ -88,7 +93,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do

true =
:ets.insert(
:archethic_db_last_index,
@archethic_db_last_index,
{genesis_address, last_address, DateTime.to_unix(timestamp, :millisecond)}
)
end)
Expand All @@ -99,10 +104,10 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
case File.open(type_path(db_path, type), [:read, :binary]) do
{:ok, fd} ->
nb_txs = do_scan_types(fd)
:ets.insert(:archethic_db_type_stats, {type, nb_txs})
:ets.insert(@archethic_db_type_stats, {type, nb_txs})

{:error, _} ->
:ets.insert(:archethic_db_type_stats, {type, 0})
:ets.insert(@archethic_db_type_stats, {type, 0})
end
end)
end
Expand Down Expand Up @@ -141,12 +146,12 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
# Write fast lookup entry for this transaction on memory
true =
:ets.insert(
:archethic_db_tx_index,
@archethic_db_tx_index,
{tx_address, %{size: size, offset: last_offset, genesis_address: genesis_address}}
)

:ets.update_counter(
:archethic_db_chain_stats,
@archethic_db_chain_stats,
genesis_address,
[
{2, size},
Expand All @@ -161,7 +166,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
@spec get_file_stats(binary()) ::
{offset :: non_neg_integer(), nb_transactions :: non_neg_integer()}
def get_file_stats(genesis_address) do
case :ets.lookup(:archethic_db_chain_stats, genesis_address) do
case :ets.lookup(@archethic_db_chain_stats, genesis_address) do
[{_, last_offset, nb_txs}] ->
{last_offset, nb_txs}

Expand Down Expand Up @@ -206,7 +211,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
"""
@spec get_tx_entry(binary(), String.t()) :: {:ok, map()} | {:error, :not_exists}
def get_tx_entry(address, db_path) do
case :ets.lookup(:archethic_db_tx_index, address) do
case :ets.lookup(@archethic_db_tx_index, address) do
[] ->
# If the transaction is not found in the in memory lookup
# we scan the index file for the subset of the transaction to find the relative information
Expand Down Expand Up @@ -334,7 +339,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
"""
@spec count_transactions_by_type(Transaction.transaction_type()) :: non_neg_integer()
def count_transactions_by_type(type) do
case :ets.lookup(:archethic_db_type_stats, type) do
case :ets.lookup(@archethic_db_type_stats, type) do
[] ->
0

Expand All @@ -350,7 +355,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
:ok
def add_tx_type(type, address, db_path) do
File.write!(type_path(db_path, type), address, [:append, :binary])
:ets.update_counter(:archethic_db_type_stats, type, {2, 1}, {type, 0})
:ets.update_counter(@archethic_db_type_stats, type, {2, 1}, {type, 0})
:ok
end

Expand All @@ -370,13 +375,13 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do

filename = chain_addresses_path(db_path, genesis_address)

case :ets.lookup(:archethic_db_last_index, genesis_address) do
case :ets.lookup(@archethic_db_last_index, genesis_address) do
[{_, ^new_address, _}] ->
:ok

_ ->
:ok = File.write!(filename, encoded_data, [:binary, :append])
true = :ets.insert(:archethic_db_last_index, {genesis_address, new_address, unix_time})
true = :ets.insert(@archethic_db_last_index, {genesis_address, new_address, unix_time})
:ok
end
end
Expand All @@ -391,7 +396,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
case get_tx_entry(address, db_path) do
{:ok, %{genesis_address: genesis_address}} ->
# Search in the latest in memory index
case :ets.lookup(:archethic_db_last_index, genesis_address) do
case :ets.lookup(@archethic_db_last_index, genesis_address) do
[] ->
# If not present, the we search in the index file
unix_time = DateTime.utc_now() |> DateTime.to_unix(:millisecond)
Expand All @@ -405,7 +410,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do

{:error, :not_exists} ->
# We try if the request address is the genesis address to fetch the in memory index
case :ets.lookup(:archethic_db_last_index, address) do
case :ets.lookup(@archethic_db_last_index, address) do
[] ->
# If not present, the we search in the index file
unix_time = DateTime.utc_now() |> DateTime.to_unix(:millisecond)
Expand Down Expand Up @@ -605,14 +610,14 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
end

defp stream_genesis_addresses(acc = []) do
case :ets.first(:archethic_db_chain_stats) do
case :ets.first(@archethic_db_chain_stats) do
:"$end_of_table" -> {:halt, acc}
first_key -> {[first_key], first_key}
end
end

defp stream_genesis_addresses(acc) do
case :ets.next(:archethic_db_chain_stats, acc) do
case :ets.next(@archethic_db_chain_stats, acc) do
:"$end_of_table" -> {:halt, acc}
next_key -> {[next_key], next_key}
end
Expand Down
6 changes: 6 additions & 0 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ defmodule Archethic.P2P do
@spec list_nodes() :: list(Node.t())
defdelegate list_nodes, to: MemTable

@doc """
Stream the registered nodes.
"""
@spec stream_nodes() :: Enumerable.t()
defdelegate stream_nodes(), to: MemTable

@doc """
Return the list of available nodes
"""
Expand Down
45 changes: 45 additions & 0 deletions lib/archethic/p2p/mem_table.ex
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,51 @@ defmodule Archethic.P2P.MemTable do
)
end

@doc """
Stream List of nodes from heap to stack
"""
@spec stream_nodes() :: Enumerable.t()
def stream_nodes() do
Stream.resource(
fn ->
nil
end,
fn
nil ->
# first iteration
node_from_key(:ets.first(@discovery_table))

prev_ele ->
# next and last iteration
next_key = :ets.next(@discovery_table, prev_ele)
node_from_key(next_key)
end,
fn _ ->
:ok
end
)
end

def node_from_key(:"$end_of_table"), do: {:halt, nil}

def node_from_key(key) when is_binary(key) do
case :ets.lookup(@discovery_table, key) do
[] ->
{:halt, nil}

[object] ->
node =
object
|> Node.cast()
|> toggle_node_authorization
|> toggle_node_availability

{[node], key}
end
end

def node_from_key(_), do: {:halt, nil}

@doc """
List the authorized nodes
Expand Down
Loading

0 comments on commit 03f082d

Please sign in to comment.