Skip to content

Commit

Permalink
chore: track internal calls for wealth (#1369)
Browse files Browse the repository at this point in the history
  • Loading branch information
jyeshe committed May 31, 2023
1 parent 83eaf77 commit ca2fff8
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 75 deletions.
2 changes: 1 addition & 1 deletion lib/ae_mdw/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ defmodule AeMdw.Application do
end

def start_phase(:dedup_accounts, _start_type, []) do
AeMdw.Sync.AsyncTasks.WealthRank.dedup_pending_accounts()
AeMdw.Sync.AsyncTasks.WealthRankAccounts.dedup_pending_accounts()
:ok
end

Expand Down
10 changes: 5 additions & 5 deletions lib/ae_mdw/db/mutations/int_calls_mutation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ defmodule AeMdw.Db.IntCallsMutation do

@typep int_call() ::
{Contract.local_idx(), Contract.fname(), Node.tx_type(), Node.tx(), Node.aetx()}
@opaque t() :: %__MODULE__{
contract_pk: Db.pubkey(),
call_txi: Txs.txi(),
int_calls: [int_call()]
}
@type t() :: %__MODULE__{
contract_pk: Db.pubkey(),
call_txi: Txs.txi(),
int_calls: [int_call()]
}

@spec new(Db.pubkey(), Txs.txi(), [int_call()]) :: t()
def new(contract_pk, call_txi, int_calls) do
Expand Down
50 changes: 0 additions & 50 deletions lib/ae_mdw/sync/async_tasks/wealth_rank.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ defmodule AeMdw.Sync.AsyncTasks.WealthRank do
alias AeMdw.Db.AsyncStore
alias AeMdw.Db.Model
alias AeMdw.Db.State
alias AeMdw.Db.DeleteKeysMutation
alias AeMdw.Db.WriteMutation

require Model

Expand Down Expand Up @@ -81,54 +79,6 @@ defmodule AeMdw.Sync.AsyncTasks.WealthRank do
:ok
end

@spec dedup_pending_accounts() :: :ok
def dedup_pending_accounts do
state = State.new()

{delete_keys, pubkeys_set} =
state
|> Collection.stream(Model.AsyncTask, nil)
|> Stream.filter(fn {_ts, type} -> type == :store_acc_balance end)
|> Stream.map(&State.fetch!(state, Model.AsyncTask, &1))
|> Enum.map_reduce(MapSet.new(), fn
Model.async_task(index: index, extra_args: []), acc ->
{index, acc}

Model.async_task(index: index, extra_args: [extra_args]), acc ->
{index, MapSet.union(acc, extra_args)}
end)

with args when args != [] <- last_mb(state, nil) do
task_index = {System.system_time(), :store_acc_balance}
m_task = Model.async_task(index: task_index, args: args, extra_args: [pubkeys_set])

State.commit_db(
state,
[
DeleteKeysMutation.new(%{Model.AsyncTask => delete_keys}),
WriteMutation.new(Model.AsyncTask, m_task)
],
false
)
end

:ok
end

defp last_mb(state, key) do
case State.prev(state, Model.Block, key) do
{:ok, {_height, -1} = prev_key} ->
last_mb(state, prev_key)

{:ok, block_index} ->
Model.block(hash: mb_hash) = State.fetch!(state, Model.Block, block_index)
[mb_hash, block_index]

:none ->
[]
end
end

defp init_wealth_store do
async_store = AsyncStore.instance()

Expand Down
110 changes: 110 additions & 0 deletions lib/ae_mdw/sync/async_tasks/wealth_rank_accounts.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
defmodule AeMdw.Sync.AsyncTasks.WealthRankAccounts do
@moduledoc false

@typep micro_block :: term()

alias AeMdw.Collection
alias AeMdw.Db.Model
alias AeMdw.Db.State
alias AeMdw.Db.Mutation
alias AeMdw.Db.IntCallsMutation
alias AeMdw.Db.DeleteKeysMutation
alias AeMdw.Db.WriteMutation
alias AeMdw.Sync.Transaction

require Model

@spec dedup_pending_accounts() :: :ok
def dedup_pending_accounts do
state = State.new()

{delete_keys, pubkeys_set} =
state
|> Collection.stream(Model.AsyncTask, nil)
|> Stream.filter(fn {_ts, type} -> type == :store_acc_balance end)
|> Stream.map(&State.fetch!(state, Model.AsyncTask, &1))
|> Enum.map_reduce(MapSet.new(), fn
Model.async_task(index: index, extra_args: []), acc ->
{index, acc}

Model.async_task(index: index, extra_args: [extra_args]), acc ->
{index, MapSet.union(acc, extra_args)}
end)

with args when args != [] <- last_mb(state, nil) do
task_index = {System.system_time(), :store_acc_balance}
m_task = Model.async_task(index: task_index, args: args, extra_args: [pubkeys_set])

State.commit_db(
state,
[
DeleteKeysMutation.new(%{Model.AsyncTask => delete_keys}),
WriteMutation.new(Model.AsyncTask, m_task)
],
false
)
end

:ok
end

@spec micro_block_accounts(micro_block(), [Mutation.t()]) :: MapSet.t()
def micro_block_accounts(micro_block, mutations) do
txs_pubkeys =
micro_block
|> :aec_blocks.txs()
|> Enum.flat_map(fn signed_tx ->
signed_tx
|> Transaction.get_ids_from_tx()
|> Enum.flat_map(fn
{:id, :account, pubkey} -> [pubkey]
_other -> []
end)
end)

txs_pubkeys
|> MapSet.new()
|> MapSet.union(int_calls_accounts(mutations))
end

defp int_calls_accounts(mutations) do
mutations
|> Enum.filter(&is_struct(&1, IntCallsMutation))
|> Enum.flat_map(fn %IntCallsMutation{int_calls: int_calls} ->
int_calls
|> Enum.filter(fn {_idx, fname, _type, _aetx, _tx} ->
fname in ["Chain.spend", "Call.amount"]
end)
|> Enum.flat_map(fn {_idx, _fname, _type, aetx, _tx} ->
aetx_accounts(aetx)
end)
end)
|> MapSet.new()
end

defp aetx_accounts(aetx) do
{tx_type, tx_rec} = :aetx.specialize_type(aetx)

tx_type
|> AeMdw.Node.tx_ids_positions()
|> Enum.map(&elem(tx_rec, &1))
|> Enum.flat_map(fn
{:id, :account, pubkey} -> [pubkey]
_other -> []
end)
end

defp last_mb(state, key) do
case State.prev(state, Model.Block, key) do
{:ok, {_height, -1} = prev_key} ->
last_mb(state, prev_key)

{:ok, block_index} ->
Model.block(hash: mb_hash) = State.fetch!(state, Model.Block, block_index)
[mb_hash, block_index]

:none ->
[]
end
end
end
22 changes: 3 additions & 19 deletions lib/ae_mdw/sync/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ defmodule AeMdw.Sync.Server do
alias AeMdw.Db.Status
alias AeMdw.Db.Sync.Block
alias AeMdw.Log
alias AeMdw.Sync.Transaction
alias AeMdw.Sync.AsyncTasks.WealthRankAccounts
alias AeMdwWeb.Websocket.Broadcaster

require Logger
Expand Down Expand Up @@ -317,8 +317,8 @@ defmodule AeMdw.Sync.Server do
{{_height, -1}, _block, _mutations}, set ->
set

{_block_index, mblock, _mutations}, set ->
MapSet.union(set, micro_block_accounts(mblock))
{_block_index, mblock, mutations}, set ->
MapSet.union(set, WealthRankAccounts.micro_block_accounts(mblock, mutations))
end)

if MapSet.size(accounts_set) > 0 do
Expand Down Expand Up @@ -380,20 +380,4 @@ defmodule AeMdw.Sync.Server do
Broadcaster.broadcast_key_block(key_block, :v1, :mdw, mbs_count)
end)
end

defp micro_block_accounts(micro_block) do
pubkeys =
micro_block
|> :aec_blocks.txs()
|> Enum.flat_map(fn signed_tx ->
signed_tx
|> Transaction.get_ids_from_tx()
|> Enum.flat_map(fn
{:id, :account, pubkey} -> [pubkey]
_other -> []
end)
end)

Enum.reduce(pubkeys, MapSet.new(), &MapSet.put(&2, &1))
end
end
78 changes: 78 additions & 0 deletions test/ae_mdw/sync/async_tasks/wealth_rank_accounts_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule AeMdw.Sync.AsyncTasks.WealthRankAccountsTest do
use ExUnit.Case, async: false

alias AeMdw.Db.IntCallsMutation
alias AeMdw.Sync.AsyncTasks.WealthRankAccounts

describe "micro_block_accounts/2" do
test "returns a set of transaction and internal call accounts" do
account_pk1 = :crypto.strong_rand_bytes(32)
account_pk2 = :crypto.strong_rand_bytes(32)
account_pk3 = :crypto.strong_rand_bytes(32)
account_pk4 = :crypto.strong_rand_bytes(32)
account_pk5 = :crypto.strong_rand_bytes(32)

{:ok, aetx1} =
:aec_spend_tx.new(%{
sender_id: :aeser_id.create(:account, account_pk1),
recipient_id: :aeser_id.create(:account, account_pk2),
amount: 123,
fee: 0,
nonce: 1,
payload: <<>>
})

txs = [
:aetx_sign.new(aetx1, [])
]

micro_block =
:aec_blocks.new_micro(
1,
<<0::256>>,
<<1::256>>,
<<>>,
<<>>,
txs,
System.system_time(),
:no_fraud,
0
)

{:ok, aetx2} =
:aec_spend_tx.new(%{
sender_id: :aeser_id.create(:account, account_pk3),
recipient_id: :aeser_id.create(:account, account_pk4),
amount: 123,
fee: 0,
nonce: 2,
payload: <<>>
})

{:ok, aetx3} =
:aec_spend_tx.new(%{
sender_id: :aeser_id.create(:account, account_pk1),
recipient_id: :aeser_id.create(:account, account_pk5),
amount: 123,
fee: 0,
nonce: 2,
payload: <<>>
})

{tx_type2, tx_rec2} = :aetx.specialize_type(aetx2)
{tx_type3, tx_rec3} = :aetx.specialize_type(aetx3)

int_calls = [
{0, "Chain.spend", tx_type2, aetx2, tx_rec2},
{1, "Call.amount", tx_type3, aetx3, tx_rec3}
]

mutations = [IntCallsMutation.new(:crypto.strong_rand_bytes(32), 1, int_calls)]

assert MapSet.equal?(
MapSet.new([account_pk1, account_pk2, account_pk3, account_pk4, account_pk5]),
WealthRankAccounts.micro_block_accounts(micro_block, mutations)
)
end
end
end

0 comments on commit ca2fff8

Please sign in to comment.