Skip to content

Commit

Permalink
Some test
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorv-2204 committed Nov 7, 2022
1 parent e36dc4e commit 8f7c363
Show file tree
Hide file tree
Showing 7 changed files with 650 additions and 206 deletions.
31 changes: 23 additions & 8 deletions lib/archethic/p2p/message/shard_repair.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Archethic.P2P.Message.ShardRepair do
@moduledoc """
Ask a new shard to start a repair for a given Transaction.
Inform a shard to start repair.
"""
@enforce_keys [:last_address, :genesis_address]
defstruct [:last_address, :genesis_address]
Expand All @@ -16,11 +16,15 @@ defmodule Archethic.P2P.Message.ShardRepair do
Serialize ShardRepair Struct
iex> %ShardRepair{
...> address:
...>
...> node
...>
...> genesis_address: <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>,
...> last_address: <<0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72,
...> 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130>>
...> } |> ShardRepair.serialize()
<<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251,
0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72,
2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130>>
"""
def serialize(%__MODULE__{
genesis_address: genesis_address,
Expand All @@ -30,9 +34,20 @@ defmodule Archethic.P2P.Message.ShardRepair do
end

@doc """
DESerialize ShardRepair Struct
DeSerialize ShardRepair Struct
iex>
iex> <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
...> 3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251,
...> 0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72,
...> 2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130>>
...> |> ShardRepair.deserialize()
{
%ShardRepair{
genesis_address: <<0, 0, 94, 5, 249, 103, 126, 31, 43, 57, 25, 14, 187, 133, 59, 234, 201, 172,
3, 195, 43, 81, 81, 146, 164, 202, 147, 218, 207, 204, 31, 185, 73, 251>>,
last_address: <<0, 0, 106, 248, 193, 217, 112, 140, 200, 141, 33, 81, 243, 92, 207, 242, 72,
2, 92, 236, 236, 100, 121, 250, 105, 12, 90, 240, 221, 108, 1, 171, 108, 130>>
}, ""}
"""
def deserialize(bin) do
Expand Down
247 changes: 178 additions & 69 deletions lib/archethic/self_repair/notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ defmodule Archethic.SelfRepair.Notifier do
H -->|Replicate Transaction| D[Node3]
```
Different nodes holding same chain, but with different length of txn chain
node | node-A| node-B| node-C| node-D | node-E
txn_chain | 1 | 1 2 | 1 2 3 | 1 2 3 4 5| 1 2 3 4 5
Sharding on single txn results in holding that txn chain upto that txn
txn_chain => A -> B -> C -> D
A Sharded tranction replication mean tx from genesis addr to that tx address.
"""
use GenServer
require Logger
Expand All @@ -29,8 +36,7 @@ defmodule Archethic.SelfRepair.Notifier do
P2P.Node,
P2P.Message.ShardRepair,
TransactionChain,
TransactionChain.Transaction,
Utils
TransactionChain.Transaction
}

@network_type_transactions Transaction.list_network_type()
Expand Down Expand Up @@ -100,110 +106,213 @@ defmodule Archethic.SelfRepair.Notifier do
{:noreply, state}
end

def repair_transactions(disconnected_node_key, current_node_public_key) do
@doc """
Determines whether a genesis address belongs to a network chain.
"""
@spec network_chain?(binary()) :: boolean()
def network_chain?(genesis_address) do
case TransactionChain.get_transaction(genesis_address, [:type]) do
{:ok, %Transaction{type: type}} when type in @network_type_transactions ->
true

_ ->
false
end
end

@doc """
For each txn chain in db. Load its genesis address, load its
chain, recompute shards , notifiy nodes. Network txns are excluded,
dependent bootstrap operations.
"""
@spec repair_transactions(Crypto.key(), Crypto.key()) :: :ok
def repair_transactions(unavailable_node_key, current_node_public_key) do
Logger.debug("Trying to repair transactions due to a topology change",
node: Base.encode16(disconnected_node_key)
node: Base.encode16(unavailable_node_key)
)

# We fetch all the transactions existing and check if the disconnecting node was a storage node

TransactionChain.stream_genesis_addresses()
|> Stream.filter(&(network_chain?(&1) == false))
|> Stream.each(&sync_chain_by_chain(&1, disconnected_node_key, current_node_public_key))
|> tap(fn x -> IO.inspect(x) end)
|> Stream.each(&sync_chain_by_chain(&1, unavailable_node_key, current_node_public_key))
|> Stream.run()
end
|> tap(fn x -> IO.inspect(x) end)

def network_chain?(genesis_address) do
case TransactionChain.get_transaction(genesis_address, [:type]) do
{:ok, %Transaction{type: type}} when type in @network_type_transactions ->
true

_ ->
false
end
:ok
end

@doc """
Loads a Txn Chain by a genesis address, Allocate new shards for the txn went down with down node.
"""
@spec sync_chain_by_chain(binary(), Crypto.key(), Crypto.key()) :: :ok
def sync_chain_by_chain(
genesis_address,
disconnected_node_key,
unavailable_node_key,
current_node_public_key
) do
genesis_address
|> TransactionChain.stream([:address, validation_stamp: [:timestamp]])
|> Stream.scan(
%{genesis_address: genesis_address},
&list_shards(&1, &2, disconnected_node_key, current_node_public_key)
)
|> Stream.map(&list_previous_shards(&1))
|> Stream.filter(&with_down_shard?(&1, unavailable_node_key))
|> Stream.filter(&current_node_in_node_list?(&1, current_node_public_key))
|> Stream.map(&new_storage_nodes(&1, unavailable_node_key))
|> Stream.scan(%{}, &map_node_and_address(&1, _acc = &2))
|> Stream.take(-1)
|> notify_nodes()
|> notify_nodes(genesis_address)

:ok
end

def list_shards(txn, acc, disconnected_node_key, current_node_public_key) do
@doc """
Repair txns that was stored by currenltyy unavailable nodes
For re-election and repair.
"""
@spec list_previous_shards(Transaction.t()) :: {binary(), list(Crypto.key())}
def list_previous_shards(txn) do
node_list = get_nodes_list(txn.validation_stamp.timestamp)

prev_storage_nodes =
Election.chain_storage_nodes(txn.address, node_list)
|> Enum.map(& &1.first_public_key)

{txn.address, prev_storage_nodes}
end

@doc """
Returns a node list that have been authorized before a given DateTime
"""
@spec get_nodes_list(DateTime.t()) :: list(Crypto.key())
def get_nodes_list(timestamp) do
P2P.stream_nodes()
|> Stream.filter(fn
%P2P.Node{authorization_date: auth_date} when not is_nil(auth_date) ->
DateTime.compare(auth_date, timestamp) == :lt

_ ->
false
end)
|> Enum.to_list()
end

@doc """
Does the currently unavailable_node_key is in previously elected shards
"""
@spec with_down_shard?({binary(), list(Crypto.key())}, Crypto.key()) :: boolean()
def with_down_shard?({_address, node_list}, unavailable_node_key) do
Enum.any?(node_list, &(&1 == unavailable_node_key))
end

@doc """
Is current node key in the list of previous nodes/shards
"""
@spec current_node_in_node_list?({binary(), list(Crypto.key())}, Crypto.key()) :: boolean()
def current_node_in_node_list?({_address, node_list}, current_node_key) do
Enum.any?(node_list, &(&1 == current_node_key))
end

@doc """
New election is carried out on the set of all authorized omiting unavailable_node_key.
The set of previous storage nodes is subtracted from the set of new storage nodes.
"""
@spec new_storage_nodes({binary(), list(Crypto.key())}, Crypto.key()) ::
{binary(), list(Crypto.key())}
def new_storage_nodes({address, prev_storage_node}, unavailable_node_key) do
node_list =
P2P.stream_nodes()
|> Stream.filter(fn node ->
DateTime.compare(node.authorization_date, txn.validation_stamp.timestamp) ==
:lt and
node.authorization_date != nil
end)
|> Enum.to_list()

prev_storage_node = Election.chain_storage_nodes(txn.address, node_list)
prev = {txn.address, prev_storage_node}

with true <- Utils.key_in_node_list?(prev_storage_node, disconnected_node_key),
true <- Utils.key_in_node_list?(prev_storage_node, current_node_public_key),
{:ok, node_list} <- new_storage_nodes(prev, disconnected_node_key) do
# accumulate nodes
Enum.reduce(node_list, acc, fn
%Node{first_public_key: first_public_key}, acc ->
Map.put(acc, first_public_key, nil)
end)
else
_e ->
# disconnected node is not in election no need to resync
# current node not in old storage nodes, not responsible for enforce resync
# empty new storage node list
acc
end
Election.chain_storage_nodes(
address,
P2P.authorized_nodes()
|> Enum.reject(&(&1.first_public_key == unavailable_node_key))
)

{address, node_list -- prev_storage_node}
end

def notify_nodes(acc = %{genesis_address: genesis_address}) do
last_address = TransactionChain.get_last_address(genesis_address)
@doc """
Acc in map, node key to the last address it should hold for a transaction chain.
"""
@spec map_node_and_address({binary(), list(Crypto.key())}, map()) :: map()
def map_node_and_address({address, node_list}, acc) do
Enum.reduce(node_list, acc, fn first_public_key, acc ->
Map.put(acc, first_public_key, address)
end)
end

@doc """
Deploys the ShardRepair message to the intended nodes.
"""
@spec notify_nodes([map()], binary()) :: :ok
def notify_nodes([], _), do: :ok

def notify_nodes([acc], genesis_address) do
Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
acc,
fn
{:genesis_address, _} ->
nil

{node_key, nil} ->
{node_key, address} ->
P2P.send_message(node_key, %ShardRepair{
genesis_address: genesis_address,
last_address: last_address
last_address: address
})
end,
ordered: false,
on_timeout: :kill_task
)
|> Stream.run()
end
end

def new_storage_nodes({address, prev_storage_node}, unavailable_node_key) do
node_list =
Election.chain_storage_nodes(
address,
P2P.authorized_nodes()
|> Enum.reject(&(&1.first_public_key == unavailable_node_key))
) -- prev_storage_node
# genesis_address
# |> TransactionChain.stream([:address, validation_stamp: [:timestamp]])
# |> tap(fn x ->
# Enum.each(x, &IO.inspect({&1.address, &1.validation_stamp.timestamp}, label: "1"))
# end)
# # |> tap(fn x -> IO.inspect(x, label: "----") end)
# |> Stream.map(&list_previous_shards(&1))
# |> Enum.to_list()
# |> tap(fn x ->
# IO.inspect(label: "2=======")

case node_list do
[] ->
{:error, :empty}
# Enum.each(
# x,
# &IO.inspect({elem(&1, 0), length(elem(&1, 1))}, label: "post_list_shards==2==")
# )
# end)
# |> Stream.filter(&with_down_shard?(&1, unavailable_node_key))
# |> Enum.to_list()
# |> tap(fn x ->
# IO.inspect(label: "3=======")
# Enum.each(x, &IO.inspect({elem(&1, 0), length(elem(&1, 1))}, label: "with_down_shard==3=="))
# end)
# |> Stream.filter(&current_node_in_node_list?(&1, current_node_public_key))
# |> Enum.to_list()
# |> tap(fn x ->
# IO.inspect(label: "4==")

x when is_list(x) ->
{:ok, x}
end
end
end
# Enum.each(
# x,
# &IO.inspect({elem(&1, 0), length(elem(&1, 1))}, label: " current_node_in_node_list4==")
# )
# end)
# |> Stream.map(&new_storage_nodes(&1, unavailable_node_key))
# |> Enum.to_list()
# |> tap(fn x ->
# IO.inspect(label: "5=======")

# Enum.each(
# x,
# &IO.inspect({elem(&1, 0), length(elem(&1, 1))}, label: "5 new_storage_nodes==")
# )
# end)
# |> Stream.scan(%{}, &map_node_and_address(&1, _acc = &2))
# |> Enum.to_list()
# |> tap(fn x -> IO.inspect(x, label: "6==") end)
# |> Stream.take(-1)
# |> Enum.to_list()
# |> tap(fn x -> IO.inspect(x, label: "7==") end)
# |> notify_nodes(genesis_address)
# |> tap(fn x -> IO.inspect(x, label: "8==") end)

# :ok
# end
2 changes: 2 additions & 0 deletions lib/archethic/self_repair/notifier/repair_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Archethic.SelfRepair.Notifier.RepairWorker do
Registry.register(Notifier.Impl.registry_name(), args.genesis_address, [])
Logger.debug("RepairWorker: Repair Started", address: args.genesis_address)

# note Imp
# genserver continue msg to to continue in genserver
{:ok, _next_state = :repair_started,
_data = %{
genesis_address: args.genesis_address,
Expand Down
7 changes: 7 additions & 0 deletions test/archethic/p2p/message/shard_repair_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule Archethic.P2P.Message.ShardRepairTest do
@moduledoc false
use ExUnit.Case

alias Archethic.P2P.Message.ShardRepair
doctest ShardRepair
end
Loading

0 comments on commit 8f7c363

Please sign in to comment.