Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infinite_looping_in_forward_transaction_#931 #937

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import Config
# Print only errors during test
config :logger, level: :error

config :archethic, Archethic.TaskSupervisor, enabled: true
apoorv-2204 marked this conversation as resolved.
Show resolved Hide resolved

config :archethic, :mut_dir, "data_test"

config :archethic, Archethic.Account.MemTablesLoader, enabled: false
Expand Down
62 changes: 41 additions & 21 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,15 @@ defmodule Archethic do
Provides high level functions serving the API and the Explorer
"""

alias Archethic.SharedSecrets
alias __MODULE__.{Account, BeaconChain, Crypto, Election, P2P, P2P.Node, P2P.Message}
alias __MODULE__.{SelfRepair, TransactionChain}
alias __MODULE__.Contracts.Interpreter
alias __MODULE__.Contracts.Contract
alias __MODULE__.{SelfRepair, TransactionChain, SharedSecrets, TaskSupervisor}
alias __MODULE__.Contracts.{Interpreter, Contract}

alias Message.{NewTransaction, NotFound, StartMining}
alias Message.{Balance, GetBalance, GetTransactionSummary}
alias Message.{StartMining, Ok, TransactionSummaryMessage}

alias TransactionChain.{
Transaction,
TransactionInput,
TransactionSummary
}
alias TransactionChain.{Transaction, TransactionInput, TransactionSummary}

require Logger

Expand Down Expand Up @@ -57,7 +51,7 @@ defmodule Archethic do
@doc """
Send a new transaction in the network to be mined. The current node will act as welcome node
"""
@spec send_new_transaction(Transaction.t()) :: :ok | {:error, :network_issue}
@spec send_new_transaction(Transaction.t()) :: :ok
def send_new_transaction(
tx = %Transaction{},
welcome_node_key \\ Crypto.first_node_public_key()
Expand All @@ -80,25 +74,51 @@ defmodule Archethic do
end
end

defp forward_transaction(
tx,
welcome_node_key,
nodes \\ P2P.authorized_and_available_nodes()
|> Enum.filter(&Node.locally_available?/1)
|> P2P.nearest_nodes()
)
@spec forward_transaction(tx :: Transaction.t(), welcome_node_key :: Crypto.key()) :: :ok
defp forward_transaction(tx, welcome_node_key) do
%Node{network_patch: welcome_node_patch} = P2P.get_node_info!(welcome_node_key)

nodes =
P2P.authorized_and_available_nodes()
|> Enum.reject(&(&1.first_public_key == welcome_node_key))
|> Enum.sort_by(& &1.first_public_key)
apoorv-2204 marked this conversation as resolved.
Show resolved Hide resolved
|> P2P.nearest_nodes(welcome_node_patch)
|> Enum.filter(&Node.locally_available?/1)

this_node = Crypto.first_node_public_key()

nodes =
if this_node != welcome_node_key do
# if this node is not the welcome node then select
# next node from the this node position in nodes list
index = Enum.find_index(nodes, &(&1.first_public_key == this_node))
{_l, r} = Enum.split(nodes, index + 1)
r
else
nodes
end

TaskSupervisor
apoorv-2204 marked this conversation as resolved.
Show resolved Hide resolved
|> Task.Supervisor.start_child(fn ->
:ok =
%NewTransaction{transaction: tx, welcome_node: welcome_node_key}
|> do_forward_transaction(nodes)
end)

:ok
apoorv-2204 marked this conversation as resolved.
Show resolved Hide resolved
end

defp forward_transaction(tx, welcome_node_key, [node | rest]) do
case P2P.send_message(node, %NewTransaction{transaction: tx, welcome_node: welcome_node_key}) do
defp do_forward_transaction(msg, [node | rest]) do
case P2P.send_message(node, msg) do
{:ok, %Ok{}} ->
:ok

{:error, _} ->
forward_transaction(tx, welcome_node_key, rest)
do_forward_transaction(msg, rest)
end
end

defp forward_transaction(_, _, []), do: {:error, :network_issue}
defp do_forward_transaction(_, []), do: {:error, :network_issue}

defp do_send_transaction(tx = %Transaction{type: tx_type}, welcome_node_key) do
current_date = DateTime.utc_now()
Expand Down
28 changes: 14 additions & 14 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,20 @@ defmodule Archethic.P2P do

defdelegate do_send_message(node, message, timeout), to: Client, as: :send_message

@doc """
Return the nearest storages nodes from the local node
"""
@spec nearest_nodes(list(Node.t())) :: list(Node.t())
def nearest_nodes(storage_nodes) when is_list(storage_nodes) do
case get_node_info(Crypto.first_node_public_key()) do
{:ok, %Node{network_patch: network_patch}} ->
nearest_nodes(storage_nodes, network_patch)

{:error, :not_found} ->
storage_nodes
end
end

@doc """
Get the nearest nodes from a specified node and a list of nodes to compare with

Expand Down Expand Up @@ -381,20 +395,6 @@ defmodule Archethic.P2P do
int
end

@doc """
Return the nearest storages nodes from the local node
"""
@spec nearest_nodes(list(Node.t())) :: list(Node.t())
def nearest_nodes(storage_nodes) when is_list(storage_nodes) do
case get_node_info(Crypto.first_node_public_key()) do
{:ok, %Node{network_patch: network_patch}} ->
nearest_nodes(storage_nodes, network_patch)

{:error, :not_found} ->
storage_nodes
end
end

@doc """
Return a list of nodes information from a list of public keys
"""
Expand Down
3 changes: 2 additions & 1 deletion lib/archethic/p2p/geo_patch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ defmodule Archethic.P2P.GeoPatch do

@doc """
Get a patch from an IP address
Null island, patch for local host.
"""
@spec from_ip(:inet.ip_address()) :: binary()
def from_ip({127, 0, 0, 1}), do: compute_random_patch()
def from_ip({127, 0, 0, 1}), do: "000"

def from_ip(ip) when is_tuple(ip) do
case GeoIP.get_coordinates(ip) do
Expand Down
9 changes: 2 additions & 7 deletions lib/archethic/p2p/message/new_transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@ defmodule Archethic.P2P.Message.NewTransaction do

@spec process(__MODULE__.t(), Crypto.key()) :: Ok.t() | Error.t()
def process(%__MODULE__{transaction: tx, welcome_node: node_pbkey}, _) do
case Archethic.send_new_transaction(tx, node_pbkey) do
:ok ->
%Ok{}

{:error, :network_issue} ->
%Error{reason: :network_issue}
end
:ok = Archethic.send_new_transaction(tx, node_pbkey)
%Ok{}
apoorv-2204 marked this conversation as resolved.
Show resolved Hide resolved
end

@spec serialize(t()) :: bitstring()
Expand Down
19 changes: 7 additions & 12 deletions lib/archethic_web/controllers/api/origin_key_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,13 @@ defmodule ArchethicWeb.API.OriginKeyController do
end

defp send_transaction(tx = %Transaction{}) do
case Archethic.send_new_transaction(tx) do
:ok ->
TransactionSubscriber.register(tx.address, System.monotonic_time())
:ok = Archethic.send_new_transaction(tx)
TransactionSubscriber.register(tx.address, System.monotonic_time())

{201,
%{
transaction_address: Base.encode16(tx.address),
status: "pending"
}}

{:error, :network_issue} ->
{422, %{status: "error - may be invalid transaction"}}
end
{201,
%{
transaction_address: Base.encode16(tx.address),
status: "pending"
}}
end
end
25 changes: 9 additions & 16 deletions lib/archethic_web/controllers/api/transaction_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,15 @@ defmodule ArchethicWeb.API.TransactionController do
end

defp send_transaction(conn, tx = %Transaction{}) do
case Archethic.send_new_transaction(tx) do
:ok ->
TransactionSubscriber.register(tx.address, System.monotonic_time())

conn
|> put_status(201)
|> json(%{
transaction_address: Base.encode16(tx.address),
status: "pending"
})

{:error, :network_issue} ->
conn
|> put_status(422)
|> json(%{status: "error - transaction may be invalid"})
end
:ok = Archethic.send_new_transaction(tx)
TransactionSubscriber.register(tx.address, System.monotonic_time())

conn
|> put_status(201)
|> json(%{
transaction_address: Base.encode16(tx.address),
status: "pending"
})
end

def last_transaction_content(conn, params = %{"address" => address}) do
Expand Down
23 changes: 10 additions & 13 deletions lib/archethic_web/controllers/faucet_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,16 @@ defmodule ArchethicWeb.FaucetController do
tx_address = tx.address
TransactionSubscriber.register(tx_address, System.monotonic_time())

case Archethic.send_new_transaction(tx) do
:ok ->
receive do
{:new_transaction, ^tx_address} ->
FaucetRateLimiter.register(recipient_address, System.monotonic_time())
{:ok, tx_address}
after
5000 ->
{:error, :network_issue}
end

{:error, _} = e ->
e
# case Archethic.send_new_transaction(tx) do
:ok = Archethic.send_new_transaction(tx)

receive do
{:new_transaction, ^tx_address} ->
FaucetRateLimiter.register(recipient_address, System.monotonic_time())
{:ok, tx_address}
after
5000 ->
{:error, :network_issue}
end
end
end
2 changes: 2 additions & 0 deletions test/archethic/reward/scheduler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ defmodule Archethic.Reward.SchedulerTest do
end

test "should wait for :node_down message to stop the scheduler" do
:persistent_term.put(:archethic_up, nil)

P2P.add_and_connect_node(%Node{
ip: {127, 0, 0, 1},
port: 3002,
Expand Down
Loading