Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix regressions #263

Merged
11 commits merged into from
Mar 18, 2022
2 changes: 1 addition & 1 deletion config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ config :archethic, ArchEthic.Bootstrap.NetworkInit,
if(System.get_env("ARCHETHIC_NETWORK_TYPE") == "testnet",
do: %{
address:
"0000EC64107CA604A6B954037CFA91ED18315A77A94FBAFD91275CEE07FA45EAF893"
"00001259AE51A6E63A1E04E308C5E769E0E9D15BFFE4E7880266C8FA10C3ADD7B7A2"
|> Base.decode16!(case: :mixed),
amount: 1_000_000_000_000_000
}
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ defmodule ArchEthic do
if P2P.authorized_node?() do
do_send_transaction(tx)
else
P2P.authorized_nodes()
P2P.authorized_nodes(DateTime.utc_now())
|> Enum.filter(&Node.locally_available?/1)
|> P2P.nearest_nodes()
|> forward_transaction(tx)
Expand Down
13 changes: 7 additions & 6 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ defmodule ArchEthic.BeaconChain do

## Examples

iex> BeaconChain.subset_from_address(<<0, 44, 242, 77, 186, 95, 176, 163,
iex> BeaconChain.subset_from_address(<<0, 0, 44, 242, 77, 186, 95, 176, 163,
...> 14, 38, 232, 59, 42, 197, 185, 226, 158, 51, 98, 147, 139, 152, 36,
...> 27, 22, 30, 92, 31, 167, 66, 94, 115, 4, >>)
<<44>>
"""
@spec subset_from_address(binary()) :: binary()
def subset_from_address(address) do
:binary.part(address, 1, 1)
def subset_from_address(<<_::8, _::8, first_digit::binary-size(1), _::binary>>) do
first_digit
end

@doc """
Expand Down Expand Up @@ -118,11 +118,12 @@ defmodule ArchEthic.BeaconChain do
) do
with {%Slot{subset: subset} = slot, _} <- Slot.deserialize(content),
:ok <- validate_slot(tx, slot) do
Logger.debug("New beacon transaction loaded - #{inspect(slot)}",
beacon_subset: Base.encode16(subset)
)

SummaryCache.add_slot(subset, slot)
else
true ->
:ok

{:error, _} = e ->
Logger.error("Invalid beacon slot #{inspect(e)}")
:error
Expand Down
9 changes: 7 additions & 2 deletions lib/archethic/beacon_chain/replication_attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,18 @@ defmodule ArchEthic.BeaconChain.ReplicationAttestation do
:ok
| {:error, :invalid_confirmations_signatures}
def validate(%__MODULE__{
transaction_summary: tx_summary = %TransactionSummary{address: tx_address, type: tx_type},
transaction_summary:
tx_summary = %TransactionSummary{
address: tx_address,
type: tx_type,
timestamp: timestamp
},
confirmations: confirmations
}) do
tx_summary_payload = TransactionSummary.serialize(tx_summary)

storage_nodes =
Election.chain_storage_nodes_with_type(tx_address, tx_type, P2P.available_nodes())
Election.chain_storage_nodes_with_type(tx_address, tx_type, P2P.authorized_nodes(timestamp))

if valid_confirmations?(confirmations, tx_summary_payload, storage_nodes) do
:ok
Expand Down
74 changes: 73 additions & 1 deletion lib/archethic/beacon_chain/slot.ex
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,78 @@ defmodule ArchEthic.BeaconChain.Slot do
}
]
}

Append transaction attestations

iex> %Slot{transaction_attestations: [
...> %ReplicationAttestation{
...> transaction_summary: %TransactionSummary{
...> address: <<0, 0, 11, 4, 226, 118, 242, 59, 165, 128, 69, 40, 228, 121, 127, 37, 154, 199,
...> 168, 212, 53, 82, 220, 22, 56, 222, 223, 127, 16, 172, 142, 218, 41, 247>>,
...> timestamp: ~U[2020-06-25 15:11:53Z],
...> type: :transfer,
...> movements_addresses: [],
...> fee: 10_000_000
...> },
...> confirmations: [{0, <<185, 37, 172, 79, 189, 197, 94, 202, 41, 160, 222, 127, 227, 180, 133, 62, 76,
...> 29, 230, 10, 100, 79, 47, 49, 139, 117, 0, 64, 89, 229, 228, 214, 6, 49, 119,
...> 32, 180, 47, 189, 143, 239, 156, 56, 234, 236, 128, 17, 79, 236, 211, 124,
...> 158, 142, 23, 151, 43, 50, 153, 52, 195, 144, 226, 247, 65>>
...> }]
...> }
...> ]}
...> |> Slot.add_transaction_attestation(%ReplicationAttestation{
...> transaction_summary: %TransactionSummary{
...> address: <<0, 0, 63, 243, 35, 90, 94, 187, 142, 185, 202, 188, 247, 248, 215, 170, 18, 115, 50,
...> 235, 117, 27, 105, 90, 132, 206, 105, 234, 200, 227, 176, 210, 46, 69>>,
...> timestamp: ~U[2020-06-25 15:11:53Z],
...> type: :transfer,
...> movements_addresses: [],
...> fee: 10_000_000
...> },
...> confirmations: [{1, <<89, 98, 246, 6, 202, 116, 247, 88, 69, 148, 188, 173, 34, 0, 194, 108, 169,
...> 155, 63, 197, 200, 6, 31, 148, 57, 152, 195, 154, 181, 14, 77, 9, 161, 38,
...> 239, 151, 241, 35, 93, 254, 65, 201, 152, 57, 187, 225, 86, 235, 56, 206, 134,
...> 141, 174, 141, 29, 28, 173, 17, 4, 78, 129, 33, 68, 4>>}],
...> })
%Slot{
transaction_attestations: [
%ReplicationAttestation{
transaction_summary: %TransactionSummary{
address: <<0, 0, 63, 243, 35, 90, 94, 187, 142, 185, 202, 188, 247, 248, 215, 170, 18, 115, 50,
235, 117, 27, 105, 90, 132, 206, 105, 234, 200, 227, 176, 210, 46, 69>>,
timestamp: ~U[2020-06-25 15:11:53Z],
type: :transfer,
movements_addresses: [],
fee: 10_000_000
},
confirmations: [{1, <<89, 98, 246, 6, 202, 116, 247, 88, 69, 148, 188, 173, 34, 0, 194, 108, 169,
155, 63, 197, 200, 6, 31, 148, 57, 152, 195, 154, 181, 14, 77, 9, 161, 38,
239, 151, 241, 35, 93, 254, 65, 201, 152, 57, 187, 225, 86, 235, 56, 206, 134,
141, 174, 141, 29, 28, 173, 17, 4, 78, 129, 33, 68, 4>>}]
},
%ReplicationAttestation{
transaction_summary: %TransactionSummary{
address: <<0, 0, 11, 4, 226, 118, 242, 59, 165, 128, 69, 40, 228, 121, 127, 37, 154, 199,
168, 212, 53, 82, 220, 22, 56, 222, 223, 127, 16, 172, 142, 218, 41, 247>>,
timestamp: ~U[2020-06-25 15:11:53Z],
type: :transfer,
movements_addresses: [],
fee: 10_000_000
},
confirmations: [
{
0,
<<185, 37, 172, 79, 189, 197, 94, 202, 41, 160, 222, 127, 227, 180, 133, 62, 76,
29, 230, 10, 100, 79, 47, 49, 139, 117, 0, 64, 89, 229, 228, 214, 6, 49, 119,
32, 180, 47, 189, 143, 239, 156, 56, 234, 236, 128, 17, 79, 236, 211, 124,
158, 142, 23, 151, 43, 50, 153, 52, 195, 144, 226, 247, 65>>
}
]
}
]
}

"""
@spec add_transaction_attestation(
__MODULE__.t(),
Expand All @@ -193,7 +265,7 @@ defmodule ArchEthic.BeaconChain.Slot do
&(&1.transaction_summary.address == tx_address)
) do
nil ->
%{slot | transaction_attestations: [attestation]}
Map.update!(slot, :transaction_attestations, &[attestation | &1])

index ->
add_transaction_attestation_confirmations(slot, index, confirmations)
Expand Down
14 changes: 7 additions & 7 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,16 @@ defmodule ArchEthic.BeaconChain.Subset do
state = %{current_slot: current_slot, subset: subset, subscribed_nodes: subscribed_nodes}
) do
if ArchEthic.BeaconChain.subset_from_address(address) == subset do
Logger.info("Transaction #{type}@#{Base.encode16(address)} added to the beacon chain",
beacon_subset: Base.encode16(subset)
)

current_slot =
new_slot =
Slot.add_transaction_attestation(
current_slot,
attestation
)

Logger.info("Transaction #{type}@#{Base.encode16(address)} added to the beacon chain",
beacon_subset: Base.encode16(subset)
)

subscribed_nodes
|> P2P.get_nodes_info()
|> Enum.reject(&(&1.first_public_key == Crypto.first_node_public_key()))
Expand All @@ -164,12 +164,12 @@ defmodule ArchEthic.BeaconChain.Subset do
if update_p2p_view?(state) do
new_state =
state
|> Map.put(:current_slot, add_p2p_view(current_slot))
|> Map.put(:current_slot, add_p2p_view(new_slot))
|> Map.put(:sampling_time, DateTime.utc_now())

{:noreply, new_state}
else
{:noreply, %{state | current_slot: current_slot}}
{:noreply, %{state | current_slot: new_slot}}
end
else
{:noreply, state}
Expand Down
56 changes: 37 additions & 19 deletions lib/archethic/db/cassandra_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,12 @@ defmodule ArchEthic.DB.CassandraImpl do
def get_transaction_chain(address, fields \\ []) when is_binary(address) and is_list(fields) do
start = System.monotonic_time()

prepared =
Xandra.prepare!(
:xandra_conn,
"SELECT transaction_address FROM archethic.transaction_chains WHERE chain_address=? and bucket=?"
)

addresses_to_fetch =
1..4
|> Task.async_stream(
fn bucket ->
:xandra_conn
|> Xandra.stream_pages!(prepared, [address, bucket], page_size: 10)
|> Stream.flat_map(& &1)
|> Enum.map(fn %{"transaction_address" => tx_address} ->
tx_address
end)
end,
max_concurrency: 4
)
|> Enum.flat_map(fn {:ok, addresses} -> addresses end)
:xandra_conn
|> do_get_transaction_chain(address, [:transaction_address])
|> Enum.map(fn %{"transaction_address" => tx_address} ->
tx_address
end)

# Chunk the reads while leveraging concurrency
# avoiding a connection to be open too long to read many transactions
Expand All @@ -120,6 +106,26 @@ defmodule ArchEthic.DB.CassandraImpl do
chain
end

defp do_get_transaction_chain(conn, address, fields) do
prepared =
Xandra.prepare!(
conn,
"SELECT #{CQL.list_to_cql(fields)} FROM archethic.transaction_chains WHERE chain_address=? and bucket=?"
)

1..4
|> Task.async_stream(
fn bucket ->
:xandra_conn
|> Xandra.stream_pages!(prepared, [address, bucket], page_size: 10)
|> Stream.flat_map(& &1)
|> Enum.to_list()
end,
max_concurrency: 4
)
|> Enum.flat_map(fn {:ok, entries} -> entries end)
end

defp chunk_get_transaction(addresses, fields) do
Xandra.run(:xandra_conn, fn conn ->
Enum.map(addresses, fn address ->
Expand All @@ -143,6 +149,18 @@ defmodule ArchEthic.DB.CassandraImpl do
Xandra.run(:xandra_conn, fn conn ->
do_write_transaction(conn, tx)
add_transaction_to_chain(conn, tx_address, tx_timestamp, tx_address)

# Add transaction to the previous chain
do_get_transaction_chain(conn, Transaction.previous_address(tx), [
:transaction_address,
:transaction_timestamp
])
|> Enum.each(fn %{
"transaction_address" => prev_tx_address,
"transaction_timestamp" => prev_tx_timestamp
} ->
add_transaction_to_chain(conn, prev_tx_address, prev_tx_timestamp, tx_address)
end)
end)
end

Expand Down
38 changes: 24 additions & 14 deletions lib/archethic/mining.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule ArchEthic.Mining do
alias ArchEthic.P2P
alias ArchEthic.P2P.Node

alias ArchEthic.SelfRepair
# alias ArchEthic.SelfRepair

alias ArchEthic.TransactionChain.Transaction
alias ArchEthic.TransactionChain.Transaction.CrossValidationStamp
Expand Down Expand Up @@ -57,22 +57,32 @@ defmodule ArchEthic.Mining do
"""
@spec transaction_validation_node_list(Transaction.transaction_type(), DateTime.t()) ::
list(Node.t())
def transaction_validation_node_list(tx_type, time = %DateTime{}) do
if Transaction.network_type?(tx_type) do
last_self_repair_date = SelfRepair.get_previous_scheduler_repair_time(time)

# Get the authorized nodes which were authorize before the previous self repair date
case P2P.authorized_nodes(last_self_repair_date) do
def transaction_validation_node_list(_tx_type, time = %DateTime{}) do
case P2P.authorized_nodes(time) do
[] ->
# If there are not nodes from this date, it means a boostrapping time, so we take all the authorized nodes
[] ->
P2P.authorized_nodes()
P2P.authorized_nodes()

authorized_nodes ->
authorized_nodes
end
else
P2P.authorized_nodes(time)
nodes ->
nodes
end

# if Transaction.network_type?(tx_type) do
# #last_self_repair_date = SelfRepair.get_previous_scheduler_repair_time(time)
# #

# ## Get the authorized nodes which were authorize before the previous self repair date
# #case P2P.authorized_nodes(last_self_repair_date) do
# # # If there are not nodes from this date, it means a boostrapping time, so we take all the authorized nodes
# # [] ->
# # P2P.authorized_nodes()

# # authorized_nodes ->
# # authorized_nodes
# #end
# else
# P2P.authorized_nodes(time)
# end
end

@doc """
Expand Down
21 changes: 7 additions & 14 deletions lib/archethic/mining/distributed_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ defmodule ArchEthic.Mining.DistributedWorkflow do
Election.chain_storage_nodes_with_type(
tx.address,
tx.type,
P2P.available_nodes()
P2P.authorized_nodes(DateTime.utc_now())
)

beacon_storage_nodes =
Election.beacon_storage_nodes(
BeaconChain.subset_from_address(tx.address),
BeaconChain.next_slot(DateTime.utc_now()),
P2P.authorized_nodes()
P2P.authorized_nodes(DateTime.utc_now())
)

context =
Expand Down Expand Up @@ -241,14 +241,12 @@ defmodule ArchEthic.Mining.DistributedWorkflow do
:build_transaction_context,
state,
data = %{
start_time: mining_start_time,
timeout: timeout,
context:
context = %ValidationContext{
transaction: tx,
chain_storage_nodes: chain_storage_nodes,
beacon_storage_nodes: beacon_storage_nodes,
cross_validation_nodes: cross_validation_nodes
beacon_storage_nodes: beacon_storage_nodes
}
}
) do
Expand Down Expand Up @@ -301,15 +299,10 @@ defmodule ArchEthic.Mining.DistributedWorkflow do
next_events =
case state do
:coordinator ->
context_retrieval_time =
(now - mining_start_time)
|> :erlang.convert_time_unit(:native, :millisecond)
|> abs()

transmission_delay = 500
nb_cross_validation_nodes = length(cross_validation_nodes)

waiting_time = (context_retrieval_time + transmission_delay) * nb_cross_validation_nodes
# TODO: Provide a better waiting time management
# for example rolling percentile latency could be way to achieve this
# (https://cs.stackexchange.com/a/129178)
waiting_time = 3_000

Logger.debug(
"Coordinator will wait #{waiting_time} ms before continue with the responding nodes",
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/mining/fee.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ defmodule ArchEthic.Mining.Fee do
defp get_number_replicas(%Transaction{address: address}) do
# TODO: take the nodes at the time of the transaction's timestamp
address
|> Election.chain_storage_nodes(P2P.available_nodes())
|> Election.chain_storage_nodes(P2P.authorized_nodes())
|> length()
end

Expand Down
Loading