Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix recursion of notify last address #688

Merged
merged 1 commit into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 10 additions & 23 deletions lib/archethic/mining/distributed_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ defmodule Archethic.Mining.DistributedWorkflow do
alias Archethic.P2P.Message.AddMiningContext
alias Archethic.P2P.Message.CrossValidate
alias Archethic.P2P.Message.CrossValidationDone
alias Archethic.P2P.Message.NotifyPreviousChain
alias Archethic.P2P.Message.ReplicateTransactionChain
alias Archethic.P2P.Message.ReplicateTransaction
alias Archethic.P2P.Message.ValidationError
alias Archethic.P2P.Node

alias Archethic.Replication

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations
Expand Down Expand Up @@ -749,29 +748,17 @@ defmodule Archethic.Mining.DistributedWorkflow do
:notify_previous_chain,
:replication,
_data = %{
context: %ValidationContext{
transaction: tx,
validation_stamp: %ValidationStamp{timestamp: tx_timestamp}
}
context:
context = %ValidationContext{
transaction: tx
}
}
) do
previous_address = Transaction.previous_address(tx)

genesis_address =
case TransactionChain.fetch_genesis_address_remotely(previous_address) do
{:ok, genesis_address} ->
genesis_address

_ ->
previous_address
end

Replication.acknowledge_previous_storage_nodes(
tx.address,
genesis_address,
previous_address,
tx_timestamp
)
unless Transaction.network_type?(tx.type) do
context
|> ValidationContext.get_confirmed_replication_nodes()
|> P2P.broadcast_message(%NotifyPreviousChain{address: tx.address})
end

:stop
end
Expand Down
35 changes: 11 additions & 24 deletions lib/archethic/mining/standalone_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@ defmodule Archethic.Mining.StandaloneWorkflow do
alias Archethic.Mining.WorkflowRegistry

alias Archethic.P2P
alias Archethic.P2P.Message.NotifyPreviousChain
alias Archethic.P2P.Message.ReplicateTransaction
alias Archethic.P2P.Message.ReplicateTransactionChain
alias Archethic.P2P.Message.ValidationError
alias Archethic.P2P.Node

alias Archethic.Replication

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp
alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations
alias Archethic.TransactionChain.TransactionSummary

Expand Down Expand Up @@ -331,26 +329,15 @@ defmodule Archethic.Mining.StandaloneWorkflow do
)
end

defp notify_previous_chain(%ValidationContext{
transaction: tx,
validation_stamp: %ValidationStamp{timestamp: tx_timestamp}
}) do
previous_address = Transaction.previous_address(tx)

genesis_address =
case TransactionChain.fetch_genesis_address_remotely(previous_address) do
{:ok, genesis_address} ->
genesis_address

_ ->
previous_address
end

Replication.acknowledge_previous_storage_nodes(
tx.address,
genesis_address,
previous_address,
tx_timestamp
)
defp notify_previous_chain(
context = %ValidationContext{
transaction: tx
}
) do
unless Transaction.network_type?(tx.type) do
context
|> ValidationContext.get_confirmed_replication_nodes()
|> P2P.broadcast_message(%NotifyPreviousChain{address: tx.address})
end
end
end
13 changes: 13 additions & 0 deletions lib/archethic/mining/validation_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,19 @@ defmodule Archethic.Mining.ValidationContext do
|> Enum.count() == nb_confirmed_replications
end

@doc """
Return the list of nodes which confirmed the transaction replication
"""
@spec get_confirmed_replication_nodes(t()) :: list(Node.t())
def get_confirmed_replication_nodes(%__MODULE__{
chain_storage_nodes: chain_storage_nodes,
storage_nodes_confirmations: storage_nodes_confirmations
}) do
Enum.map(storage_nodes_confirmations, fn {index, _} ->
Enum.at(chain_storage_nodes, index)
end)
end

@doc """
Get the list of I/O replication nodes
"""
Expand Down
39 changes: 27 additions & 12 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ defmodule Archethic.P2P.Message do
alias __MODULE__.NodeList
alias __MODULE__.NotFound
alias __MODULE__.NotifyEndOfNodeSync
alias __MODULE__.NotifyPreviousChain
alias __MODULE__.NotifyLastTransactionAddress
alias __MODULE__.Ok
alias __MODULE__.P2PView
Expand Down Expand Up @@ -137,6 +138,7 @@ defmodule Archethic.P2P.Message do
| ValidationError.t()
| GetCurrentSummaries.t()
| GetBeaconSummariesAggregate.t()
| NotifyPreviousChain.t()

@type response ::
Ok.t()
Expand Down Expand Up @@ -382,10 +384,9 @@ defmodule Archethic.P2P.Message do
def encode(%NotifyLastTransactionAddress{
last_address: last_address,
genesis_address: genesis_address,
previous_address: previous_address,
timestamp: timestamp
}) do
<<22::8, last_address::binary, genesis_address::binary, previous_address::binary,
<<22::8, last_address::binary, genesis_address::binary,
DateTime.to_unix(timestamp, :millisecond)::64>>
end

Expand Down Expand Up @@ -430,6 +431,10 @@ defmodule Archethic.P2P.Message do
<<33::8, DateTime.to_unix(date)::32>>
end

def encode(%NotifyPreviousChain{address: address}) do
<<34::8, address::binary>>
end

def encode(aggregate = %SummaryAggregate{}) do
<<231::8, SummaryAggregate.serialize(aggregate)::bitstring>>
end
Expand Down Expand Up @@ -883,13 +888,11 @@ defmodule Archethic.P2P.Message do

def decode(<<22::8, rest::bitstring>>) do
{last_address, rest} = Utils.deserialize_address(rest)
{genesis_address, rest} = Utils.deserialize_address(rest)
{previous_address, <<timestamp::64, rest::bitstring>>} = Utils.deserialize_address(rest)
{genesis_address, <<timestamp::64, rest::bitstring>>} = Utils.deserialize_address(rest)

{%NotifyLastTransactionAddress{
last_address: last_address,
genesis_address: genesis_address,
previous_address: previous_address,
timestamp: DateTime.from_unix!(timestamp, :millisecond)
}, rest}
end
Expand Down Expand Up @@ -965,6 +968,11 @@ defmodule Archethic.P2P.Message do
{%GetBeaconSummariesAggregate{date: DateTime.from_unix!(timestamp)}, rest}
end

def decode(<<34::8, rest::bitstring>>) do
{address, rest} = Utils.deserialize_address(rest)
{%NotifyPreviousChain{address: address}, rest}
end

def decode(<<231::8, rest::bitstring>>) do
SummaryAggregate.deserialize(rest)
end
Expand Down Expand Up @@ -1630,15 +1638,17 @@ defmodule Archethic.P2P.Message do
def process(%NotifyLastTransactionAddress{
last_address: last_address,
genesis_address: genesis_address,
previous_address: previous_address,
timestamp: timestamp
}) do
Replication.acknowledge_previous_storage_nodes(
last_address,
genesis_address,
previous_address,
timestamp
)
with {local_last_address, local_last_timestamp} <-
TransactionChain.get_last_address(genesis_address),
true <- local_last_address != last_address,
:gt <- DateTime.compare(timestamp, local_last_timestamp) do
TransactionChain.register_last_address(genesis_address, last_address, timestamp)

# Stop potential previous smart contract
Contracts.stop_contract(local_last_address)
end

%Ok{}
end
Expand Down Expand Up @@ -1756,4 +1766,9 @@ defmodule Archethic.P2P.Message do
%NotFound{}
end
end

def process(%NotifyPreviousChain{address: address}) do
Replication.acknowledge_previous_storage_nodes(address)
%Ok{}
end
end
5 changes: 2 additions & 3 deletions lib/archethic/p2p/message/notify_last_transaction_address.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ defmodule Archethic.P2P.Message.NotifyLastTransactionAddress do
@moduledoc """
Represents a message with to notify a pool of the last address of a previous address
"""
@enforce_keys [:last_address, :genesis_address, :previous_address, :timestamp]
defstruct [:last_address, :genesis_address, :previous_address, :timestamp]
@enforce_keys [:last_address, :genesis_address, :timestamp]
defstruct [:last_address, :genesis_address, :timestamp]

alias Archethic.Crypto

@type t :: %__MODULE__{
last_address: Crypto.versioned_hash(),
genesis_address: Crypto.versioned_hash(),
previous_address: Crypto.versioned_hash(),
timestamp: DateTime.t()
}
end
11 changes: 11 additions & 0 deletions lib/archethic/p2p/message/notify_previous_chain.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Archethic.P2P.Message.NotifyPreviousChain do
@moduledoc """
Represents a message used to notify previous chain storage nodes about the last transaction address
"""

defstruct [:address]

@type t :: %__MODULE__{
address: binary()
}
end
80 changes: 28 additions & 52 deletions lib/archethic/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -331,59 +331,35 @@ defmodule Archethic.Replication do
end

@doc """
Notify the previous storage pool than a new transaction on the chain is present
Notify the previous storage nodes than a new transaction on the chain is present
"""
@spec acknowledge_previous_storage_nodes(
last_address :: binary(),
genesis_address :: binary(),
previous_address :: binary(),
tx_time :: DateTime.t()
) :: :ok
def acknowledge_previous_storage_nodes(
last_address,
genesis_address,
previous_address,
timestamp = %DateTime{}
)
when is_binary(last_address) and is_binary(genesis_address) and is_binary(previous_address) do
storage_nodes =
Election.chain_storage_nodes(previous_address, P2P.authorized_and_available_nodes())

# If the current is node was a previous storage nodes
if Utils.key_in_node_list?(storage_nodes, Crypto.first_node_public_key()) do
TransactionChain.register_last_address(genesis_address, last_address, timestamp)
Contracts.stop_contract(previous_address)

case TransactionChain.get_transaction(previous_address, [:previous_public_key]) do
{:ok, tx} ->
next_previous_address = Transaction.previous_address(tx)

previous_storage_nodes =
Election.chain_storage_nodes(
next_previous_address,
P2P.authorized_and_available_nodes()
)

# Take only the nodes which were not in the previous shard but previously on the chain
diff_nodes = previous_storage_nodes -- storage_nodes

P2P.broadcast_message(diff_nodes, %NotifyLastTransactionAddress{
last_address: last_address,
genesis_address: genesis_address,
previous_address: next_previous_address,
timestamp: timestamp
})

_ ->
:ok
end
else
P2P.broadcast_message(storage_nodes, %NotifyLastTransactionAddress{
last_address: last_address,
genesis_address: genesis_address,
previous_address: previous_address,
timestamp: timestamp
})
@spec acknowledge_previous_storage_nodes(address :: binary()) :: :ok
def acknowledge_previous_storage_nodes(address) when is_binary(address) do
case TransactionChain.get_genesis_address(address) do
^address ->
# No transaction chain found as the DB request return the argument if not found.
:ok

genesis_address ->
last_storage_nodes =
Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())

{:ok, %Transaction{validation_stamp: %ValidationStamp{timestamp: timestamp}}} =
TransactionChain.get_transaction(address, validation_stamp: [:timestamp])

# Send a message to all the previous storage nodes
address
|> TransactionChain.list_chain_addresses()
|> Stream.flat_map(fn {address, _} ->
Election.chain_storage_nodes(address, P2P.authorized_and_available_nodes())
end)
|> Enum.uniq_by(& &1.first_public_key)
|> Enum.reject(&Utils.key_in_node_list?(last_storage_nodes, &1.first_public_key))
|> P2P.broadcast_message(%NotifyLastTransactionAddress{
last_address: address,
genesis_address: genesis_address,
timestamp: timestamp
})
end
end

Expand Down
14 changes: 13 additions & 1 deletion test/archethic/p2p/messages_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ defmodule Archethic.P2P.MessageTest do
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Message.NotifyEndOfNodeSync
alias Archethic.P2P.Message.NotifyLastTransactionAddress
alias Archethic.P2P.Message.NotifyPreviousChain
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Message.P2PView
alias Archethic.P2P.Message.Ping
Expand Down Expand Up @@ -862,7 +863,6 @@ defmodule Archethic.P2P.MessageTest do
msg = %NotifyLastTransactionAddress{
last_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
genesis_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
previous_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
timestamp: DateTime.utc_now() |> DateTime.truncate(:millisecond)
}

Expand All @@ -873,6 +873,18 @@ defmodule Archethic.P2P.MessageTest do
|> elem(0)
end

test "NotifyPreviousChain message" do
msg = %NotifyPreviousChain{
address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>
}

assert msg ==
msg
|> Message.encode()
|> Message.decode()
|> elem(0)
end

test "GetTransactionSummary message" do
msg = %GetTransactionSummary{
address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>
Expand Down
Loading