Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize ack of previous shards #672

Merged
merged 1 commit into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/archethic/mining/distributed_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -755,9 +755,21 @@ defmodule Archethic.Mining.DistributedWorkflow do
}
}
) do
previous_address = Transaction.previous_address(tx)

genesis_address =
case TransactionChain.fetch_genesis_address_remotely(previous_address) do
{:ok, genesis_address} ->
genesis_address

_ ->
previous_address
end

Replication.acknowledge_previous_storage_nodes(
tx.address,
Transaction.previous_address(tx),
genesis_address,
previous_address,
tx_timestamp
)

Expand Down
27 changes: 27 additions & 0 deletions lib/archethic/mining/standalone_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ defmodule Archethic.Mining.StandaloneWorkflow do
alias Archethic.P2P.Message.ValidationError
alias Archethic.P2P.Node

alias Archethic.Replication

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp
alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations
alias Archethic.TransactionChain.TransactionSummary

Expand Down Expand Up @@ -277,6 +280,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do
defp notify(%{context: context}) do
notify_attestation(context)
notify_io_nodes(context)
notify_previous_chain(context)
end

defp notify_attestation(
Expand Down Expand Up @@ -326,4 +330,27 @@ defmodule Archethic.Mining.StandaloneWorkflow do
else: %ReplicateTransaction{transaction: validated_tx}
)
end

defp notify_previous_chain(%ValidationContext{
transaction: tx,
validation_stamp: %ValidationStamp{timestamp: tx_timestamp}
}) do
previous_address = Transaction.previous_address(tx)

genesis_address =
case TransactionChain.fetch_genesis_address_remotely(previous_address) do
{:ok, genesis_address} ->
genesis_address

_ ->
previous_address
end

Replication.acknowledge_previous_storage_nodes(
tx.address,
genesis_address,
previous_address,
tx_timestamp
)
end
end
22 changes: 16 additions & 6 deletions lib/archethic/p2p/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -380,11 +380,12 @@ defmodule Archethic.P2P.Message do
end

def encode(%NotifyLastTransactionAddress{
address: address,
last_address: last_address,
genesis_address: genesis_address,
previous_address: previous_address,
timestamp: timestamp
}) do
<<22::8, address::binary, previous_address::binary,
<<22::8, last_address::binary, genesis_address::binary, previous_address::binary,
DateTime.to_unix(timestamp, :millisecond)::64>>
end

Expand Down Expand Up @@ -881,11 +882,13 @@ defmodule Archethic.P2P.Message do
end

def decode(<<22::8, rest::bitstring>>) do
{address, rest} = Utils.deserialize_address(rest)
{last_address, rest} = Utils.deserialize_address(rest)
{genesis_address, rest} = Utils.deserialize_address(rest)
{previous_address, <<timestamp::64, rest::bitstring>>} = Utils.deserialize_address(rest)

{%NotifyLastTransactionAddress{
address: address,
last_address: last_address,
genesis_address: genesis_address,
previous_address: previous_address,
timestamp: DateTime.from_unix!(timestamp, :millisecond)
}, rest}
Expand Down Expand Up @@ -1625,11 +1628,18 @@ defmodule Archethic.P2P.Message do
end

def process(%NotifyLastTransactionAddress{
address: address,
last_address: last_address,
genesis_address: genesis_address,
previous_address: previous_address,
timestamp: timestamp
}) do
Replication.acknowledge_previous_storage_nodes(address, previous_address, timestamp)
Replication.acknowledge_previous_storage_nodes(
last_address,
genesis_address,
previous_address,
timestamp
)

%Ok{}
end

Expand Down
7 changes: 4 additions & 3 deletions lib/archethic/p2p/message/notify_last_transaction_address.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ defmodule Archethic.P2P.Message.NotifyLastTransactionAddress do
@moduledoc """
Represents a message with to notify a pool of the last address of a previous address
"""
@enforce_keys [:address, :previous_address, :timestamp]
defstruct [:address, :previous_address, :timestamp]
@enforce_keys [:last_address, :genesis_address, :previous_address, :timestamp]
defstruct [:last_address, :genesis_address, :previous_address, :timestamp]

alias Archethic.Crypto

@type t :: %__MODULE__{
address: Crypto.versioned_hash(),
last_address: Crypto.versioned_hash(),
genesis_address: Crypto.versioned_hash(),
previous_address: Crypto.versioned_hash(),
timestamp: DateTime.t()
}
Expand Down
61 changes: 37 additions & 24 deletions lib/archethic/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -330,43 +330,56 @@ defmodule Archethic.Replication do
Notify the previous storage pool than a new transaction on the chain is present
"""
@spec acknowledge_previous_storage_nodes(
tx_address :: binary(),
last_address :: binary(),
genesis_address :: binary(),
previous_address :: binary(),
tx_time :: DateTime.t()
) :: :ok
def acknowledge_previous_storage_nodes(address, previous_address, timestamp)
when is_binary(address) and is_binary(previous_address) do
TransactionChain.register_last_address(previous_address, address, timestamp)
Contracts.stop_contract(previous_address)
def acknowledge_previous_storage_nodes(
last_address,
genesis_address,
previous_address,
timestamp = %DateTime{}
)
when is_binary(last_address) and is_binary(genesis_address) and is_binary(previous_address) do
storage_nodes =
Election.chain_storage_nodes(previous_address, P2P.authorized_and_available_nodes())

# If the current is node was a previous storage nodes
if Utils.key_in_node_list?(storage_nodes, Crypto.first_node_public_key()) do
TransactionChain.register_last_address(genesis_address, last_address, timestamp)
Contracts.stop_contract(previous_address)

if previous_address != address do
case TransactionChain.get_transaction(previous_address, [:previous_public_key]) do
{:ok, tx} ->
next_previous_address = Transaction.previous_address(tx)

if previous_address != next_previous_address do
previous_storage_nodes =
Election.chain_storage_nodes(
next_previous_address,
P2P.authorized_and_available_nodes()
)

if Utils.key_in_node_list?(previous_storage_nodes, Crypto.first_node_public_key()) do
acknowledge_previous_storage_nodes(address, next_previous_address, timestamp)
else
P2P.broadcast_message(previous_storage_nodes, %NotifyLastTransactionAddress{
address: address,
previous_address: next_previous_address,
timestamp: timestamp
})
end
end
previous_storage_nodes =
Election.chain_storage_nodes(
next_previous_address,
P2P.authorized_and_available_nodes()
)

# Take only the nodes which were not in the previous shard but previously on the chain
diff_nodes = previous_storage_nodes -- storage_nodes

P2P.broadcast_message(diff_nodes, %NotifyLastTransactionAddress{
last_address: last_address,
genesis_address: genesis_address,
previous_address: next_previous_address,
timestamp: timestamp
})

_ ->
:ok
end
else
:ok
P2P.broadcast_message(storage_nodes, %NotifyLastTransactionAddress{
last_address: last_address,
genesis_address: genesis_address,
previous_address: previous_address,
timestamp: timestamp
})
end
end

Expand Down
3 changes: 2 additions & 1 deletion test/archethic/p2p/messages_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,8 @@ defmodule Archethic.P2P.MessageTest do

test "NotifyLastTransactionAddress message" do
msg = %NotifyLastTransactionAddress{
address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
last_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
genesis_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
previous_address: <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>,
timestamp: DateTime.utc_now() |> DateTime.truncate(:millisecond)
}
Expand Down
4 changes: 3 additions & 1 deletion test/archethic/replication_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ defmodule Archethic.ReplicationTest do
assert :ok =
Replication.acknowledge_previous_storage_nodes(
"@Alice2",
"@Alice0",
"@Alice1",
DateTime.utc_now()
)
Expand All @@ -266,7 +267,7 @@ defmodule Archethic.ReplicationTest do
me = self()

MockClient
|> stub(:send_message, fn _, %NotifyLastTransactionAddress{address: _}, _ ->
|> stub(:send_message, fn _, %NotifyLastTransactionAddress{last_address: _}, _ ->
send(me, :notification_sent)
{:ok, %Ok{}}
end)
Expand All @@ -286,6 +287,7 @@ defmodule Archethic.ReplicationTest do
assert :ok =
Replication.acknowledge_previous_storage_nodes(
"@Alice2",
"@Alice0",
"@Alice1",
DateTime.utc_now()
)
Expand Down