Skip to content

Commit

Permalink
feat: log open/closed channels together with their locked AE (#840)
Browse files Browse the repository at this point in the history
* feat: log open/closed channels together with their locked AE
* chore: add channel stats migration (~30min RUN)

This migration will avoid requiring a full-sync, but it will take
~30mins to run.
  • Loading branch information
sborrazas committed Aug 19, 2022
1 parent 900636d commit d965275
Show file tree
Hide file tree
Showing 12 changed files with 503 additions and 71 deletions.
52 changes: 52 additions & 0 deletions lib/ae_mdw/channels.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule AeMdw.Channels do
@moduledoc """
Main channels module.
"""

alias AeMdw.Collection
alias AeMdw.Db.ChannelCloseMutation
alias AeMdw.Db.ChannelOpenMutation
alias AeMdw.Db.ChannelSpendMutation
alias AeMdw.Db.Model
alias AeMdw.Db.State
alias AeMdw.Node
alias AeMdw.Txs

require Model

@typep state() :: State.t()
@type closing_type() ::
:channel_close_solo_tx
| :channel_close_mutual_tx
| :channel_settle_tx

@spec close_mutation(closing_type(), Node.aetx()) :: ChannelCloseMutation.t()
def close_mutation(tx_type, tx), do: ChannelCloseMutation.new(tx_type, tx)

@spec open_mutation(Node.aetx()) :: ChannelOpenMutation.t()
def open_mutation(tx), do: ChannelOpenMutation.new(tx)

@spec deposit_mutation(Node.aetx()) :: ChannelSpendMutation.t()
def deposit_mutation(tx), do: ChannelSpendMutation.new(:aesc_deposit_tx.amount(tx))

@spec withdraw_mutation(Node.aetx()) :: ChannelSpendMutation.t()
def withdraw_mutation(tx), do: ChannelSpendMutation.new(-:aesc_withdraw_tx.amount(tx))

@spec channels_opened_count(state(), Txs.txi(), Txs.txi()) :: non_neg_integer()
def channels_opened_count(state, from_txi, next_txi),
do: type_count(state, :channel_create_tx, from_txi, next_txi)

@spec channels_closed_count(state(), Txs.txi(), Txs.txi()) :: non_neg_integer()
def channels_closed_count(state, from_txi, next_txi) do
type_count(state, :channel_close_solo_tx, from_txi, next_txi) +
type_count(state, :channel_close_mutual_tx, from_txi, next_txi) +
type_count(state, :channel_settle_tx, from_txi, next_txi)
end

defp type_count(state, type, from_txi, next_txi) do
state
|> Collection.stream(Model.Type, {type, from_txi})
|> Stream.take_while(&match?({^type, txi} when txi < next_txi, &1))
|> Enum.count()
end
end
18 changes: 14 additions & 4 deletions lib/ae_mdw/db/model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,10 @@ defmodule AeMdw.Db.Model do
block_reward: 0,
dev_reward: 0,
locked_in_auctions: 0,
burned_in_auctions: 0
burned_in_auctions: 0,
channels_opened: 0,
channels_closed: 0,
locked_in_channels: 0
]
defrecord :delta_stat, @delta_stat_defaults

Expand All @@ -557,7 +560,10 @@ defmodule AeMdw.Db.Model do
block_reward: integer(),
dev_reward: integer(),
locked_in_auctions: integer(),
burned_in_auctions: integer()
burned_in_auctions: integer(),
channels_opened: non_neg_integer(),
channels_closed: non_neg_integer(),
locked_in_channels: integer()
)

# summarized statistics
Expand All @@ -574,7 +580,9 @@ defmodule AeMdw.Db.Model do
inactive_oracles: 0,
contracts: 0,
locked_in_auctions: 0,
burned_in_auctions: 0
burned_in_auctions: 0,
locked_in_channels: 0,
open_channels: 0
]
defrecord :total_stat, @total_stat_defaults

Expand All @@ -591,7 +599,9 @@ defmodule AeMdw.Db.Model do
inactive_oracles: integer(),
contracts: integer(),
locked_in_auctions: non_neg_integer(),
burned_in_auctions: non_neg_integer()
burned_in_auctions: non_neg_integer(),
locked_in_channels: non_neg_integer(),
open_channels: non_neg_integer()
)

@stat_defaults [:index, :payload]
Expand Down
40 changes: 40 additions & 0 deletions lib/ae_mdw/db/mutations/channel_close_mutation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule AeMdw.Db.ChannelCloseMutation do
@moduledoc """
Increases channels_closed stat and refund locked AE.
"""

alias AeMdw.Channels
alias AeMdw.Db.State
alias AeMdw.Node

@derive AeMdw.Db.Mutation
defstruct [:tx_type, :tx]

@typep tx_type() :: Channels.closing_type()

@opaque t() :: %__MODULE__{tx_type: tx_type(), tx: Node.aetx()}

@spec new(tx_type(), Node.aetx()) :: t()
def new(tx_type, tx), do: %__MODULE__{tx_type: tx_type, tx: tx}

@spec execute(t(), State.t()) :: State.t()
def execute(%__MODULE__{tx_type: tx_type, tx: tx}, state) do
state
|> State.inc_stat(:channels_closed)
|> State.inc_stat(:locked_in_channels, -released_amount(tx_type, tx))
end

defp released_amount(:channel_close_solo_tx, _tx), do: 0

defp released_amount(:channel_close_mutual_tx, tx) do
:aesc_close_mutual_tx.initiator_amount_final(tx) +
:aesc_close_mutual_tx.responder_amount_final(tx)
end

defp released_amount(:channel_settle_tx, tx) do
%{"initiator_amount_final" => initiator_amount, "responder_amount_final" => responder_amount} =
:aesc_settle_tx.for_client(tx)

initiator_amount + responder_amount
end
end
28 changes: 28 additions & 0 deletions lib/ae_mdw/db/mutations/channel_open_mutation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule AeMdw.Db.ChannelOpenMutation do
@moduledoc """
Increases channels_opened stat.
"""

alias AeMdw.Db.State
alias AeMdw.Node

@derive AeMdw.Db.Mutation
defstruct [:tx]

@opaque t() :: %__MODULE__{
tx: Node.aetx()
}

@spec new(Node.aetx()) :: t()
def new(tx), do: %__MODULE__{tx: tx}

@spec execute(t(), State.t()) :: State.t()
def execute(%__MODULE__{tx: tx}, state) do
initiator_amount = :aesc_create_tx.initiator_amount(tx)
responder_amount = :aesc_create_tx.responder_amount(tx)

state
|> State.inc_stat(:channels_opened)
|> State.inc_stat(:locked_in_channels, initiator_amount + responder_amount)
end
end
20 changes: 20 additions & 0 deletions lib/ae_mdw/db/mutations/channel_spend_mutation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule AeMdw.Db.ChannelSpendMutation do
@moduledoc """
Logs withdraws/deposists from channels.
"""

alias AeMdw.Db.State

@derive AeMdw.Db.Mutation
defstruct [:amount]

@opaque t() :: %__MODULE__{amount: integer()}

@spec new(integer()) :: t()
def new(amount), do: %__MODULE__{amount: amount}

@spec execute(t(), State.t()) :: State.t()
def execute(%__MODULE__{amount: amount}, state) do
State.inc_stat(state, :locked_in_channels, amount)
end
end
57 changes: 44 additions & 13 deletions lib/ae_mdw/db/mutations/stats_mutation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,58 @@ defmodule AeMdw.Db.StatsMutation do
alias AeMdw.Collection
alias AeMdw.Db.Model
alias AeMdw.Blocks
alias AeMdw.Channels
alias AeMdw.Db.IntTransfer
alias AeMdw.Db.Model
alias AeMdw.Db.Name
alias AeMdw.Db.Oracle
alias AeMdw.Db.Origin
alias AeMdw.Db.State
alias AeMdw.Stats
alias AeMdw.Txs
alias AeMdw.Util

require Model

@derive AeMdw.Db.Mutation
defstruct [:height, :key_hash, :tps, :all_cached?]
defstruct [:height, :key_hash, :from_txi, :next_txi, :tps, :all_cached?]

@typep txi() :: Txs.txi()

@type t() :: %__MODULE__{
height: Blocks.height(),
key_hash: Blocks.block_hash(),
from_txi: txi(),
next_txi: txi(),
tps: Stats.tps(),
all_cached?: boolean()
}

@spec new(Blocks.height(), Blocks.block_hash(), Stats.tps(), boolean()) :: t()
def new(height, key_hash, tps, all_cached?) do
@spec new(Blocks.height(), Blocks.block_hash(), txi(), txi(), Stats.tps(), boolean()) :: t()
def new(height, key_hash, from_txi, next_txi, tps, all_cached?) do
%__MODULE__{
height: height,
key_hash: key_hash,
from_txi: from_txi,
next_txi: next_txi,
tps: tps,
all_cached?: all_cached?
}
end

@spec execute(t(), State.t()) :: State.t()
def execute(
%__MODULE__{height: height, key_hash: key_hash, tps: tps, all_cached?: all_cached?},
%__MODULE__{
height: height,
key_hash: key_hash,
from_txi: from_txi,
next_txi: next_txi,
tps: tps,
all_cached?: all_cached?
},
state
) do
m_delta_stat = make_delta_stat(state, height, all_cached?)
m_delta_stat = make_delta_stat(state, height, from_txi, next_txi, all_cached?)
# delta/transitions are only reflected on total stats at height + 1
m_total_stat = make_total_stat(state, height + 1, m_delta_stat)

Expand All @@ -65,8 +80,8 @@ defmodule AeMdw.Db.StatsMutation do
#
# Private functions
#
@spec make_delta_stat(State.t(), Blocks.height(), boolean()) :: Model.delta_stat()
defp make_delta_stat(state, height, true = _all_cached?) do
@spec make_delta_stat(State.t(), Blocks.height(), txi(), txi(), boolean()) :: Model.delta_stat()
defp make_delta_stat(state, height, _from_txi, _next_txi, true = _all_cached?) do
Model.delta_stat(
index: height,
auctions_started: get(state, :auctions_started, 0),
Expand All @@ -79,11 +94,14 @@ defmodule AeMdw.Db.StatsMutation do
block_reward: get(state, :block_reward, 0),
dev_reward: get(state, :dev_reward, 0),
locked_in_auctions: get(state, :locked_in_auctions, 0),
burned_in_auctions: get(state, :burned_in_auctions, 0)
burned_in_auctions: get(state, :burned_in_auctions, 0),
channels_opened: get(state, :channels_opened, 0),
channels_closed: get(state, :channels_closed, 0),
locked_in_channels: get(state, :locked_in_channels, 0)
)
end

defp make_delta_stat(state, height, false = _all_cached?) do
defp make_delta_stat(state, height, from_txi, next_txi, false = _all_cached?) do
Model.total_stat(
active_auctions: prev_active_auctions,
active_names: prev_active_names,
Expand All @@ -94,6 +112,8 @@ defmodule AeMdw.Db.StatsMutation do
current_active_names = State.count_keys(state, Model.ActiveName)
current_active_auctions = State.count_keys(state, Model.AuctionExpiration)
current_active_oracles = State.count_keys(state, Model.ActiveOracle)
channels_opened = Channels.channels_opened_count(state, from_txi, next_txi)
channels_closed = Channels.channels_closed_count(state, from_txi, next_txi)

{height_revoked_names, height_expired_names} =
state
Expand Down Expand Up @@ -123,6 +143,7 @@ defmodule AeMdw.Db.StatsMutation do
spent_in_auctions = height_int_amount(state, height, :spend_name)
refund_in_auctions = height_int_amount(state, height, :refund_name)
locked_in_auctions = spent_in_auctions - refund_in_auctions
locked_in_channels = height_int_amount(state, height, :lock_channel)

Model.delta_stat(
index: height,
Expand All @@ -136,7 +157,10 @@ defmodule AeMdw.Db.StatsMutation do
block_reward: current_block_reward,
dev_reward: current_dev_reward,
locked_in_auctions: locked_in_auctions,
burned_in_auctions: burned_in_auctions
burned_in_auctions: burned_in_auctions,
channels_opened: channels_opened,
channels_closed: channels_closed,
locked_in_channels: locked_in_channels
)
end

Expand All @@ -155,7 +179,10 @@ defmodule AeMdw.Db.StatsMutation do
block_reward: inc_block_reward,
dev_reward: inc_dev_reward,
locked_in_auctions: locked_in_auctions,
burned_in_auctions: burned_in_auctions
burned_in_auctions: burned_in_auctions,
channels_opened: channels_opened,
channels_closed: channels_closed,
locked_in_channels: locked_in_channels
)
) do
Model.total_stat(
Expand All @@ -169,7 +196,9 @@ defmodule AeMdw.Db.StatsMutation do
inactive_oracles: prev_inactive_oracles,
contracts: prev_contracts,
locked_in_auctions: prev_locked_in_auctions,
burned_in_auctions: prev_burned_in_acutions
burned_in_auctions: prev_burned_in_acutions,
open_channels: prev_open_channels,
locked_in_channels: prev_locked_in_channels
) = fetch_total_stat(state, height - 1)

token_supply_delta = AeMdw.Node.token_supply_delta(height - 1)
Expand All @@ -187,7 +216,9 @@ defmodule AeMdw.Db.StatsMutation do
inactive_oracles: prev_inactive_oracles + oracles_expired,
contracts: prev_contracts + contracts_created,
locked_in_auctions: prev_locked_in_auctions + locked_in_auctions,
burned_in_auctions: prev_burned_in_acutions + burned_in_auctions
burned_in_auctions: prev_burned_in_acutions + burned_in_auctions,
open_channels: prev_open_channels + channels_opened - channels_closed,
locked_in_channels: prev_locked_in_channels + locked_in_channels
)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/ae_mdw/db/sync/block.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ defmodule AeMdw.Db.Sync.Block do
block_rewards_mutation,
NamesExpirationMutation.new(height),
OraclesExpirationMutation.new(height),
Stats.mutation(height, key_block, micro_blocks, starting_from_mb0?),
Stats.mutation(height, key_block, micro_blocks, from_txi, txi, starting_from_mb0?),
next_kb_mutation
]

Expand Down
18 changes: 17 additions & 1 deletion lib/ae_mdw/db/sync/transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule AeMdw.Db.Sync.Transaction do
"

alias AeMdw.Blocks
alias AeMdw.Channels
alias AeMdw.Contract
alias AeMdw.Db.Model
alias AeMdw.Db.ContractCallMutation
Expand All @@ -30,6 +31,8 @@ defmodule AeMdw.Db.Sync.Transaction do

require Model

@channel_closing_types ~w(channel_close_solo_tx channel_close_mutual_tx channel_settle_tx)a

defmodule TxContext do
@moduledoc """
Transaction context struct that contains necessary information to build a transaction mutation.
Expand Down Expand Up @@ -200,13 +203,26 @@ defmodule AeMdw.Db.Sync.Transaction do
type: :channel_create_tx,
signed_tx: signed_tx,
txi: txi,
tx: tx,
tx_hash: tx_hash
}) do
{:ok, channel_pk} = :aesc_utils.channel_pubkey(signed_tx)

Origin.origin_mutations(:channel_create_tx, nil, channel_pk, txi, tx_hash)
[
Channels.open_mutation(tx),
Origin.origin_mutations(:channel_create_tx, nil, channel_pk, txi, tx_hash)
]
end

defp tx_mutations(%TxContext{type: tx_type, tx: tx}) when tx_type in @channel_closing_types,
do: Channels.close_mutation(tx_type, tx)

defp tx_mutations(%TxContext{type: :channel_deposit_tx, tx: tx}),
do: Channels.deposit_mutation(tx)

defp tx_mutations(%TxContext{type: :channel_withdraw_tx, tx: tx}),
do: Channels.withdraw_mutation(tx)

defp tx_mutations(%TxContext{type: :ga_attach_tx, tx: tx, txi: txi, tx_hash: tx_hash}) do
contract_pk = :aega_attach_tx.contract_pubkey(tx)

Expand Down
Loading

0 comments on commit d965275

Please sign in to comment.