Skip to content

Commit

Permalink
Skip replication if there is a cross-validation error (#718)
Browse files Browse the repository at this point in the history
It also includes:
* stop the FSM in case of consensus_not_reached
* Validation will check for sufficient funds
  • Loading branch information
bchamagne committed Dec 2, 2022
1 parent 58ec61e commit 962cfee
Show file tree
Hide file tree
Showing 5 changed files with 500 additions and 149 deletions.
167 changes: 75 additions & 92 deletions lib/archethic/mining/distributed_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -502,11 +502,8 @@ defmodule Archethic.Mining.DistributedWorkflow do
transaction_type: tx.type
)

next_events = [
{:next_event, :internal, {:notify_error, :timeout}}
]

{:keep_state_and_data, next_events}
notify_error(:timeout, data)
:stop

_ ->
new_context =
Expand Down Expand Up @@ -594,11 +591,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
if ValidationContext.atomic_commitment?(new_context) do
{:next_state, :replication, %{data | context: new_context}}
else
next_events = [
{:next_event, :internal, {:notify_error, :consensus_not_reached}}
]

{:next_state, :consensus_not_reached, %{data | context: new_context}, next_events}
{:next_state, :consensus_not_reached, %{data | context: new_context}}
end
else
{:keep_state, %{data | context: new_context}}
Expand All @@ -609,7 +602,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
:enter,
:wait_cross_validation_stamps,
:consensus_not_reached,
_data = %{
data = %{
context:
context = %ValidationContext{
transaction: tx,
Expand All @@ -635,28 +628,38 @@ defmodule Archethic.Mining.DistributedWorkflow do

MaliciousDetection.start_link(context)

:keep_state_and_data
notify_error(:consensus_not_reached, data)
:stop
end

def handle_event(
:enter,
from_state,
:replication,
_data = %{
data = %{
context:
context = %ValidationContext{
transaction: %Transaction{address: tx_address, type: type}
}
}
)
when from_state in [:cross_validator, :wait_cross_validation_stamps] do
Logger.info("Start replication",
transaction_address: Base.encode16(tx_address),
transaction_type: type
)
case ValidationContext.get_first_error(context) do
nil ->
Logger.info("Start replication",
transaction_address: Base.encode16(tx_address),
transaction_type: type
)

request_replication(context)
:keep_state_and_data
request_replication(context)
:keep_state_and_data

err ->
Logger.info("Skipped replication because validation failed", err: err)

notify_error(err, data)
:stop
end
end

def handle_event(
Expand Down Expand Up @@ -766,19 +769,15 @@ defmodule Archethic.Mining.DistributedWorkflow do
:info,
{:replication_error, reason},
:replication,
_data = %{context: %ValidationContext{transaction: tx}}
data = %{context: %ValidationContext{transaction: tx}}
) do
Logger.error("Replication error - #{inspect(reason)}",
transaction_address: Base.encode16(tx.address),
transaction_type: tx.type
)

next_events = [
{:next_event, :internal, {:notify_error, reason}}
]

{:keep_state_and_data, next_events}
# :stop
notify_error(reason, data)
:stop
end

def handle_event(
Expand All @@ -803,13 +802,8 @@ defmodule Archethic.Mining.DistributedWorkflow do
transaction_type: tx.type
)

next_events = [
{:next_event, :internal, {:notify_error, :timeout}}
]

{:keep_state_and_data, next_events}

# :stop
notify_error(:timeout, data)
:stop

_ ->
nb_cross_validation_nodes = length(next_cross_validation_nodes)
Expand Down Expand Up @@ -837,71 +831,14 @@ defmodule Archethic.Mining.DistributedWorkflow do
{:timeout, :stop_timeout},
:any,
_state,
_data = %{context: %ValidationContext{transaction: tx}}
data = %{context: %ValidationContext{transaction: tx}}
) do
Logger.warning("Timeout reached during mining",
transaction_type: tx.type,
transaction_address: Base.encode16(tx.address)
)

next_events = [
{:next_event, :internal, {:notify_error, :timeout}}
]

{:keep_state_and_data, next_events}
end

def handle_event(
:internal,
{:notify_error, reason},
_,
_data = %{
context:
_context = %ValidationContext{
welcome_node: welcome_node = %Node{},
transaction: %Transaction{address: tx_address},
pending_transaction_error_detail: pending_error_detail
}
}
) do
{error_context, error_reason} =
case reason do
:invalid_pending_transaction ->
{:invalid_transaction, pending_error_detail}

:invalid_inherit_constraints ->
{:invalid_transaction, "Inherit constraints not respected"}

:insufficient_funds ->
{:invalid_transaction, "Insufficient funds"}

:invalid_proof_of_work ->
{:invalid_transaction, "Invalid origin signature"}

reason ->
{:network_issue, reason |> Atom.to_string() |> String.replace("_", " ")}
end

Logger.warning("Invalid transaction #{inspect(error_reason)}",
transaction_address: Base.encode16(tx_address)
)

Logger.debug("Notify error back to the welcome node",
transaction_address: Base.encode16(tx_address)
)

# Notify error to the welcome node
message = %ValidationError{context: error_context, reason: error_reason, address: tx_address}

Task.Supervisor.async_nolink(Archethic.TaskSupervisor, fn ->
P2P.send_message(
welcome_node,
message
)

:ok
end)

notify_error(:timeout, data)
:stop
end

Expand Down Expand Up @@ -1024,4 +961,50 @@ defmodule Archethic.Mining.DistributedWorkflow do

P2P.broadcast_message(storage_nodes, message)
end

defp notify_error(reason, %{
context: %ValidationContext{
welcome_node: welcome_node = %Node{},
transaction: %Transaction{address: tx_address},
pending_transaction_error_detail: pending_error_detail
}
}) do
{error_context, error_reason} =
case reason do
:invalid_pending_transaction ->
{:invalid_transaction, pending_error_detail}

:invalid_inherit_constraints ->
{:invalid_transaction, "Inherit constraints not respected"}

:insufficient_funds ->
{:invalid_transaction, "Insufficient funds"}

:invalid_proof_of_work ->
{:invalid_transaction, "Invalid origin signature"}

reason ->
{:network_issue, reason |> Atom.to_string() |> String.replace("_", " ")}
end

Logger.warning("Invalid transaction #{inspect(error_reason)}",
transaction_address: Base.encode16(tx_address)
)

Logger.debug("Notify error back to the welcome node",
transaction_address: Base.encode16(tx_address)
)

# Notify error to the welcome node
message = %ValidationError{context: error_context, reason: error_reason, address: tx_address}

Task.Supervisor.async_nolink(Archethic.TaskSupervisor, fn ->
P2P.send_message(
welcome_node,
message
)

:ok
end)
end
end
Loading

0 comments on commit 962cfee

Please sign in to comment.