Skip to content

Commit

Permalink
Self repair on transfert transaction (#325)
Browse files Browse the repository at this point in the history
* Fix timestamp size to millisecond
* Correct Transaction already exist
* ChainReader return transaction chain up to address passed in parameter
* Remove after_time from GetTransactionChain message
  • Loading branch information
Neylix committed May 19, 2022
1 parent 2b9bf18 commit df062cc
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 76 deletions.
18 changes: 9 additions & 9 deletions lib/archethic/db/embedded_impl/chain_index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,9 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
datetime = %DateTime{},
db_path
) do
unix_time = DateTime.to_unix(datetime)
unix_time = DateTime.to_unix(datetime, :millisecond)

encoded_data = <<unix_time::32, new_address::binary>>
encoded_data = <<unix_time::64, new_address::binary>>

{filename, genesis_address} =
case get_tx_entry(previous_address, db_path) do
Expand Down Expand Up @@ -353,7 +353,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
case :ets.lookup(:archethic_db_last_index, genesis_address) do
[] ->
# If not present, the we search in the index file
unix_time = DateTime.utc_now() |> DateTime.to_unix()
unix_time = DateTime.utc_now() |> DateTime.to_unix(:millisecond)

search_last_address_until(genesis_address, unix_time, db_path) || address

Expand All @@ -379,7 +379,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
@spec get_last_chain_address(address :: binary(), until :: DateTime.t(), db_path :: String.t()) ::
binary()
def get_last_chain_address(address, datetime = %DateTime{}, db_path) do
unix_time = DateTime.to_unix(datetime)
unix_time = DateTime.to_unix(datetime, :millisecond)

# We get the genesis address of this given transaction address
case get_tx_entry(address, db_path) do
Expand All @@ -406,7 +406,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
end

defp do_search_last_address_until(fd, until, acc \\ nil) do
with {:ok, <<timestamp::32>>} <- :file.read(fd, 4),
with {:ok, <<timestamp::64>>} <- :file.read(fd, 8),
{:ok, <<curve_id::8, hash_id::8>>} <- :file.read(fd, 2),
hash_size <- Crypto.hash_size(hash_id),
{:ok, hash} <- :file.read(fd, hash_size) do
Expand Down Expand Up @@ -456,11 +456,11 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
"""
@spec set_public_key(binary(), Crypto.key(), DateTime.t(), String.t()) :: :ok
def set_public_key(genesis_address, public_key, date = %DateTime{}, db_path) do
unix_time = DateTime.to_unix(date)
unix_time = DateTime.to_unix(date, :millisecond)

File.write!(
chain_keys_path(db_path, genesis_address),
<<unix_time::32, public_key::binary>>,
<<unix_time::64, public_key::binary>>,
[:binary, :append]
)
end
Expand All @@ -482,7 +482,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
{:ok, fd} ->
# We need to extract key metadata information to know how many bytes to decode
# as keys can have different sizes based on the curve used
with {:ok, <<_timestamp::32, curve_id::8, origin_id::8>>} <- :file.read(fd, 6),
with {:ok, <<_timestamp::64, curve_id::8, origin_id::8>>} <- :file.read(fd, 10),
key_size <- Crypto.key_size(curve_id),
{:ok, key} <- :file.read(fd, key_size) do
# We then take the first public key registered
Expand Down Expand Up @@ -542,7 +542,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
end

defp do_scan_chain(fd, acc \\ []) do
with {:ok, <<_timestamp::32>>} <- :file.read(fd, 4),
with {:ok, <<_timestamp::64>>} <- :file.read(fd, 8),
{:ok, <<curve_id::8, hash_id::8>>} <- :file.read(fd, 2),
hash_size <- Crypto.hash_size(hash_id),
{:ok, hash} <- :file.read(fd, hash_size) do
Expand Down
46 changes: 24 additions & 22 deletions lib/archethic/db/embedded_impl/chain_reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,22 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
fd = File.open!(filepath, [:binary, :read])

# Set the file cursor position to the paging state
position =
case Keyword.get(opts, :paging_state) do
nil ->
:file.position(fd, 0)
0

paging_address ->
{:ok, %{offset: offset, size: size}} =
ChainIndex.get_tx_entry(paging_address, db_path)

:file.position(fd, offset + size)
offset + size
end
case Keyword.get(opts, :paging_state) do
nil ->
:file.position(fd, 0)
0

paging_address ->
{:ok, %{offset: offset, size: size}} =
ChainIndex.get_tx_entry(paging_address, db_path)

:file.position(fd, offset + size)
end

column_names = fields_to_column_names(fields)

# Read the transactions until the nb of transactions to fullfil the page (ie. 10 transactions)
{transactions, more?, paging_state} = scan_chain(fd, column_names, position)
{transactions, more?, paging_state} = scan_chain(fd, column_names, address)
:file.close(fd)
{transactions, more?, paging_state}
end
Expand Down Expand Up @@ -115,7 +113,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
end
end

defp scan_chain(fd, fields, position, acc \\ []) do
defp scan_chain(fd, fields, limit_address, acc \\ []) do
case :file.read(fd, 8) do
{:ok, <<size::32, version::32>>} ->
if length(acc) == @page_size do
Expand All @@ -132,7 +130,11 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
|> Utils.atomize_keys()
|> Transaction.from_map()

scan_chain(fd, fields, position + 8 + size, [tx | acc])
if tx.address == limit_address do
{Enum.reverse([tx | acc]), false, nil}
else
scan_chain(fd, fields, limit_address, [tx | acc])
end
end

:eof ->
Expand All @@ -150,21 +152,21 @@ defmodule Archethic.DB.EmbeddedImpl.ChainReader do
"previous_public_key",
"validation_stamp.timestamp"
]
iex> ChainReader.fields_to_column_names([:address, :previous_public_key, validation_stamp: [ledger_operations: [:fee, :transaction_movements]]])
[
"address",
"previous_public_key",
"validation_stamp.ledger_operations.transaction_movements",
"validation_stamp.ledger_operations.fee",
]
iex> ChainReader.fields_to_column_names([
...> :address,
...> :previous_public_key,
...> data: [:content],
...> :address,
...> :previous_public_key,
...> data: [:content],
...> validation_stamp: [
...> :timestamp,
...> :timestamp,
...> ledger_operations: [ :fee, :transaction_movements ]
...> ]
...> ])
Expand Down
53 changes: 15 additions & 38 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,29 +187,12 @@ defmodule Archethic.P2P.Message do
<<3::8, tx_address::binary>>
end

def encode(%GetTransactionChain{address: tx_address, after: nil, paging_state: nil}) do
<<4::8, tx_address::binary, 0::32, 0::8>>
def encode(%GetTransactionChain{address: tx_address, paging_state: nil}) do
<<4::8, tx_address::binary, 0::8>>
end

def encode(%GetTransactionChain{
address: tx_address,
after: date = %DateTime{},
paging_state: nil
}) do
<<4::8, tx_address::binary, DateTime.to_unix(date)::32, 0::8>>
end

def encode(%GetTransactionChain{address: tx_address, after: nil, paging_state: paging_state}) do
<<4::8, tx_address::binary, 0::32, byte_size(paging_state)::8, paging_state::binary>>
end

def encode(%GetTransactionChain{
address: tx_address,
after: date = %DateTime{},
paging_state: paging_state
}) do
<<4::8, tx_address::binary, DateTime.to_unix(date)::32, byte_size(paging_state)::8,
paging_state::binary>>
def encode(%GetTransactionChain{address: tx_address, paging_state: paging_state}) do
<<4::8, tx_address::binary, byte_size(paging_state)::8, paging_state::binary>>
end

def encode(%GetUnspentOutputs{address: tx_address}) do
Expand Down Expand Up @@ -317,15 +300,16 @@ defmodule Archethic.P2P.Message do
end

def encode(%GetLastTransactionAddress{address: address, timestamp: timestamp}) do
<<21::8, address::binary, DateTime.to_unix(timestamp)::32>>
<<21::8, address::binary, DateTime.to_unix(timestamp, :millisecond)::64>>
end

def encode(%NotifyLastTransactionAddress{
address: address,
previous_address: previous_address,
timestamp: timestamp
}) do
<<22::8, address::binary, previous_address::binary, DateTime.to_unix(timestamp)::32>>
<<22::8, address::binary, previous_address::binary,
DateTime.to_unix(timestamp, :millisecond)::64>>
end

def encode(%GetTransactionSummary{address: address}) do
Expand Down Expand Up @@ -537,14 +521,8 @@ defmodule Archethic.P2P.Message do
#
def decode(<<4::8, rest::bitstring>>) do
{address,
<<timestamp::32, paging_state_size::8, paging_state::binary-size(paging_state_size),
rest::bitstring>>} = Utils.deserialize_address(rest)

after_time =
case timestamp do
0 -> nil
_ -> DateTime.from_unix!(timestamp)
end
<<paging_state_size::8, paging_state::binary-size(paging_state_size), rest::bitstring>>} =
Utils.deserialize_address(rest)

paging_state =
case paging_state do
Expand All @@ -556,7 +534,7 @@ defmodule Archethic.P2P.Message do
end

{
%GetTransactionChain{address: address, after: after_time, paging_state: paging_state},
%GetTransactionChain{address: address, paging_state: paging_state},
rest
}
end
Expand Down Expand Up @@ -727,22 +705,22 @@ defmodule Archethic.P2P.Message do
end

def decode(<<21::8, rest::bitstring>>) do
{address, <<timestamp::32, rest::bitstring>>} = Utils.deserialize_address(rest)
{address, <<timestamp::64, rest::bitstring>>} = Utils.deserialize_address(rest)

{%GetLastTransactionAddress{
address: address,
timestamp: DateTime.from_unix!(timestamp)
timestamp: DateTime.from_unix!(timestamp, :millisecond)
}, rest}
end

def decode(<<22::8, rest::bitstring>>) do
{address, rest} = Utils.deserialize_address(rest)
{previous_address, <<timestamp::32, rest::bitstring>>} = Utils.deserialize_address(rest)
{previous_address, <<timestamp::64, rest::bitstring>>} = Utils.deserialize_address(rest)

{%NotifyLastTransactionAddress{
address: address,
previous_address: previous_address,
timestamp: DateTime.from_unix!(timestamp)
timestamp: DateTime.from_unix!(timestamp, :millisecond)
}, rest}
end

Expand Down Expand Up @@ -1083,12 +1061,11 @@ defmodule Archethic.P2P.Message do
# paging_state recieved contains binary offset for next page , to be used for query
def process(%GetTransactionChain{
address: tx_address,
after: after_time,
paging_state: paging_state
}) do
{chain, more?, paging_state} =
tx_address
|> TransactionChain.get([], after: after_time, paging_state: paging_state)
|> TransactionChain.get([], paging_state: paging_state)

# empty list for fields/cols to be processed
# new_page_state contains binary offset for the next page
Expand Down
3 changes: 1 addition & 2 deletions lib/archethic/p2p/message/get_transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ defmodule Archethic.P2P.Message.GetTransactionChain do
Represents a message to request an entire transaction chain
"""
@enforce_keys [:address]
defstruct [:address, :after, :paging_state]
defstruct [:address, :paging_state]

alias Archethic.Crypto

@type t :: %__MODULE__{
address: Crypto.versioned_hash(),
after: nil | DateTime.t(),
paging_state: nil | binary()
}
end
2 changes: 1 addition & 1 deletion lib/archethic/reward.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ defmodule Archethic.Reward do
end

defp get_transaction_chain_after([node | rest], address, date) do
case P2P.send_message(node, %GetTransactionChain{address: address, after: date}) do
case P2P.send_message(node, %GetTransactionChain{address: address}) do
{:ok, %TransactionList{transactions: transactions, more?: false}} ->
transactions

Expand Down
4 changes: 2 additions & 2 deletions test/archethic/db/embedded_impl_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,14 @@ defmodule Archethic.DB.EmbeddedTest do
EmbeddedImpl.write_transaction_chain(transactions)

{page, true, paging_state} =
EmbeddedImpl.get_transaction_chain(List.first(transactions).address)
EmbeddedImpl.get_transaction_chain(List.last(transactions).address)

assert length(page) == 10
assert page == Enum.take(transactions, 10)
assert paging_state == List.last(page).address

{page2, false, nil} =
EmbeddedImpl.get_transaction_chain(List.first(transactions).address, [],
EmbeddedImpl.get_transaction_chain(List.last(transactions).address, [],
paging_state: paging_state
)

Expand Down
4 changes: 2 additions & 2 deletions test/archethic/p2p/messages_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ defmodule Archethic.P2P.MessageTest do
test "GetLastTransactionAddress message" do
msg = %GetLastTransactionAddress{
address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
timestamp: DateTime.truncate(DateTime.utc_now(), :second)
timestamp: DateTime.truncate(DateTime.utc_now(), :millisecond)
}

assert msg ==
Expand All @@ -836,7 +836,7 @@ defmodule Archethic.P2P.MessageTest do
msg = %NotifyLastTransactionAddress{
address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
previous_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
timestamp: DateTime.utc_now() |> DateTime.truncate(:second)
timestamp: DateTime.utc_now() |> DateTime.truncate(:millisecond)
}

assert msg ==
Expand Down

0 comments on commit df062cc

Please sign in to comment.