Skip to content

Commit

Permalink
Merge pull request #362 from aeternity/transfers-account-kind-filters
Browse files Browse the repository at this point in the history
fix: reindex transfers to be able to filter by account + kind
  • Loading branch information
thepiwo committed Nov 30, 2021
2 parents bbf8847 + 1a89b38 commit e9a6fe9
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 54 deletions.
7 changes: 5 additions & 2 deletions lib/ae_mdw/db/int_transfer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ defmodule AeMdw.Db.IntTransfer do
Model.int_transfer_tx(index: {{height, pos_txi}, kind, target_pk, ref_txi}, amount: amount)

kind_tx = Model.kind_int_transfer_tx(index: {kind, {height, pos_txi}, target_pk, ref_txi})
target_tx = Model.target_int_transfer_tx(index: {target_pk, {height, pos_txi}, kind, ref_txi})

target_kind_tx =
Model.target_kind_int_transfer_tx(index: {target_pk, kind, {height, pos_txi}, ref_txi})

:mnesia.write(Model.IntTransferTx, int_tx, :write)
:mnesia.write(Model.KindIntTransferTx, kind_tx, :write)
:mnesia.write(Model.TargetIntTransferTx, target_tx, :write)
:mnesia.write(Model.TargetKindIntTransferTx, target_kind_tx, :write)
end
end
18 changes: 10 additions & 8 deletions lib/ae_mdw/db/model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,12 @@ defmodule AeMdw.Db.Model do
]
defrecord :kind_int_transfer_tx, @kind_int_transfer_tx_defaults

# target_int_transfer_tx
@target_int_transfer_tx_defaults [
index: {<<>>, {-1, -1}, nil, -1},
# target_kind_int_transfer_tx
@target_kind_int_transfer_tx_defaults [
index: {<<>>, <<>>, {-1, -1}, -1},
unused: nil
]
defrecord :target_int_transfer_tx, @target_int_transfer_tx_defaults
defrecord :target_kind_int_transfer_tx, @target_kind_int_transfer_tx_defaults

# statistics
@stat_defaults [
Expand Down Expand Up @@ -416,7 +416,7 @@ defmodule AeMdw.Db.Model do
AeMdw.Db.Model.RevOrigin,
AeMdw.Db.Model.IntTransferTx,
AeMdw.Db.Model.KindIntTransferTx,
AeMdw.Db.Model.TargetIntTransferTx
AeMdw.Db.Model.TargetKindIntTransferTx
]
end

Expand Down Expand Up @@ -527,7 +527,7 @@ defmodule AeMdw.Db.Model do
:oracle,
:int_transfer_tx,
:kind_int_transfer_tx,
:target_int_transfer_tx,
:target_kind_int_transfer_tx,
:stat,
:sum_stat,
:migrations,
Expand Down Expand Up @@ -587,7 +587,8 @@ defmodule AeMdw.Db.Model do
def record(AeMdw.Db.Model.InactiveOracle), do: :oracle
def record(AeMdw.Db.Model.IntTransferTx), do: :int_transfer_tx
def record(AeMdw.Db.Model.KindIntTransferTx), do: :kind_int_transfer_tx
def record(AeMdw.Db.Model.TargetIntTransferTx), do: :target_int_transfer_tx
def record(AeMdw.Db.Model.TargetKindIntTransferTx), do: :target_kind_int_transfer_tx

def record(AeMdw.Db.Model.Stat), do: :stat
def record(AeMdw.Db.Model.SumStat), do: :sum_stat

Expand Down Expand Up @@ -627,6 +628,7 @@ defmodule AeMdw.Db.Model do
def table(:int_transfer_tx), do: AeMdw.Db.Model.IntTransferTx
def table(:kind_int_transfer_tx), do: AeMdw.Db.Model.KindIntTransferTx
def table(:target_int_transfer_tx), do: AeMdw.Db.Model.TargetIntTransferTx
def table(:target_kind_int_transfer_tx), do: AeMdw.Db.Model.TargetKindIntTransferTx
def table(:stat), do: AeMdw.Db.Model.Stat
def table(:sum_stat), do: AeMdw.Db.Model.SumStat

Expand Down Expand Up @@ -672,7 +674,7 @@ defmodule AeMdw.Db.Model do
def defaults(:oracle), do: @oracle_defaults
def defaults(:int_transfer_tx), do: @int_transfer_tx_defaults
def defaults(:kind_int_transfer_tx), do: @kind_int_transfer_tx_defaults
def defaults(:target_int_transfer_tx), do: @target_int_transfer_tx_defaults
def defaults(:target_kind_int_transfer_tx), do: @target_kind_int_transfer_tx_defaults
def defaults(:stat), do: @stat_defaults
def defaults(:sum_stat), do: @sum_stat_defaults

Expand Down
66 changes: 25 additions & 41 deletions lib/ae_mdw/transfers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ defmodule AeMdw.Transfers do
@kinds ~w(fee_lock_name fee_refund_name fee_spend_name reward_block reward_dev reward_oracle)

@int_transfer_table Model.IntTransferTx
@target_int_transfer_tx Model.TargetIntTransferTx
@kind_int_transfer_tx Model.KindIntTransferTx
@target_kind_int_transfer_tx_table Model.TargetKindIntTransferTx
@kind_int_transfer_tx_table Model.KindIntTransferTx

@spec fetch_transfers(direction(), range(), query(), cursor() | nil, limit()) ::
{:ok, [transfer()], cursor() | nil} | {:error, reason()}
Expand Down Expand Up @@ -73,50 +73,34 @@ defmodule AeMdw.Transfers do

defp inside_txi_range?(_range, _ref_txi), do: true

# Retrieves transfers within the {account, kind_prefix_*, gen_txi, X} range
# and then takes transfers until one outside of the scope is reached.
# Retrieves transfers within the {account, kind_prefix_*, gen_txi, X} range.
defp build_stream(%{account_pk: account_pk, kind_prefix: kind_prefix}, scope, cursor, direction) do
{{first_gen_txi, first_kind, _first_account_pk, _first_ref_txi},
{last_gen_txi, last_kind, _last_account_pk, _last_ref_txi}} = scope

scope =
{{account_pk, first_gen_txi, first_kind, nil}, {account_pk, last_gen_txi, last_kind, nil}}

cursor =
case cursor do
nil -> nil
{gen_txi, kind, account_pk, ref_txi} -> {account_pk, gen_txi, kind, ref_txi}
end

@target_int_transfer_tx
|> Collection.stream(direction, scope, cursor)
|> Stream.take_while(fn {_account_pk, _gen_txi, kind, _ref_txi} ->
String.starts_with?(kind, kind_prefix)
end)
|> Stream.map(fn {account_pk, gen_txi, kind, ref_txi} ->
{gen_txi, kind, account_pk, ref_txi}
{{first_gen_txi, _first_kind, _first_account_pk, _first_ref_txi},
{last_gen_txi, _last_kind, _last_account_pk, _last_ref_txi}} = scope

@kinds
|> Enum.filter(&String.starts_with?(&1, kind_prefix))
|> Enum.map(fn kind ->
scope = {{account_pk, kind, first_gen_txi, nil}, {account_pk, kind, last_gen_txi, nil}}

cursor =
case cursor do
nil -> nil
{gen_txi, _kind, account_pk, ref_txi} -> {account_pk, kind, gen_txi, ref_txi}
end

@target_kind_int_transfer_tx_table
|> Collection.stream(direction, scope, cursor)
|> Stream.map(fn {account_pk, kind, gen_txi, ref_txi} ->
{gen_txi, kind, account_pk, ref_txi}
end)
end)
|> Collection.merge(direction)
end

# Retrieves transfers within the {account, gen_txi, X, Y} range.
defp build_stream(%{account_pk: account_pk}, scope, cursor, direction) do
{{first_gen_txi, first_kind, _first_account_pk, _first_ref_txi},
{last_gen_txi, last_kind, _last_account_pk, _last_ref_txi}} = scope

scope =
{{account_pk, first_gen_txi, first_kind, nil}, {account_pk, last_gen_txi, last_kind, nil}}

cursor =
case cursor do
nil -> nil
{gen_txi, kind, account_pk, ref_txi} -> {account_pk, gen_txi, kind, ref_txi}
end

@target_int_transfer_tx
|> Collection.stream(direction, scope, cursor)
|> Stream.map(fn {account_pk, gen_txi, kind, ref_txi} ->
{gen_txi, kind, account_pk, ref_txi}
end)
build_stream(%{account_pk: account_pk, kind_prefix: ""}, scope, cursor, direction)
end

# Retrieves transfers within the {kind_prefix_*, gen_txi, account, X} range.
Expand All @@ -136,7 +120,7 @@ defmodule AeMdw.Transfers do
{gen_txi, _kind, account_pk, ref_txi} -> {kind, gen_txi, account_pk, ref_txi}
end

@kind_int_transfer_tx
@kind_int_transfer_tx_table
|> Collection.stream(direction, scope, cursor)
|> Stream.map(fn {kind, gen_txi, account_pk, ref_txi} ->
{gen_txi, kind, account_pk, ref_txi}
Expand Down
118 changes: 118 additions & 0 deletions priv/migrations/20211124092400_reindex_int_transfers_acc_index.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
defmodule AeMdw.Migrations.ReindexIntTransfersAccIndex do
@moduledoc """
Creates a new index for internal transfers and destroys the previous one
after loading all the data.
Right now, the `TargetIntTransferTx` mnesia table is indexed by
`{account_pk, gen_txi, kind, ref_txi}`. This doesn't allow to filter by both
account and kind with results sorted by gen_txi.
Instead, the new index created on this migration, named
`TargetKindIntTransferTx`, will have `{account_pk, kind, gen_txi, ref_txi}` as
a key, which allows to filter by both account and kind on a given range.
This change, however, won't allow to filter by a kind prefix (sorted by
gen_txi), since it can only be applied for a specific account. But this
features isn't needed anyway, because the values the `kind` field may have
are fixed and known.
"""

alias AeMdw.Db.Model

require Logger

@int_transfer_table Model.IntTransferTx
@target_kind_int_transfer_table Model.TargetKindIntTransferTx
@target_int_transfer_table Model.TargetIntTransferTx
@gen_batches_size 1_000
@min_int -100
@end_token :"$end_of_table"

@doc """
This migration will not store any state - it's either run fully or not - if
it throws an error in the middle of the run it will re-run next time this
migration is run from scratch.
The way to know if this migration was completed or not is to check if the
`TargetIntTransferTx` table does not exist.
Steps:
1. Check if `TargetIntTransferTx` exists. Ignore migration if it doesn't.
2. Split 0..last_gen into batches of 1_000 generations.
3. For each of these generations range, grab the transfers from the
`IntTransferTx` table and create a record for each one in the newly
created `TargetKindIntTransferTx` table.
4. Destroy the `TargetIntTransferTx` table.
"""
@spec run(boolean()) :: {:ok, {non_neg_integer(), pos_integer()}}
def run(from_startup?) do
run(from_startup?, @target_int_transfer_table in :mnesia.system_info(:tables))
end

defp run(_from_startup?, true) do
{{last_gen, _txi}, _kind, _account_pk, _ref_txi} = :mnesia.dirty_last(@int_transfer_table)
batches_count = div(last_gen + @gen_batches_size - 1, @gen_batches_size)

log("processing #{batches_count} batches of #{@gen_batches_size} generations")

{duration_microseconds, reindexed_count} = :timer.tc(&reindex_transfers/1, [batches_count])
duration = div(duration_microseconds, 1_000_000)

log("indexed #{reindexed_count} records in #{duration}s")

:mnesia.delete_table(@target_int_transfer_table)

{:ok, {reindexed_count, duration}}
end

defp run(_from_startup?, false) do
log("aborting: transfers reindex not needed")

{:ok, {0, 0}}
end

defp reindex_transfers(batches_count) do
(0..(batches_count - 1))
|> Enum.map(fn index ->
start_gen = index * @gen_batches_size
next_gen = start_gen + @gen_batches_size

indexed_count = reindex_gen_range_transfers(start_gen, next_gen)

log("batch #{index} done")

{index, indexed_count}
end)
|> Enum.map(fn {_index, indexed_count} -> indexed_count end)
|> Enum.reduce(0, &:erlang.+/2)
end

defp reindex_gen_range_transfers(start_gen, next_gen) do
keys =
{{start_gen, @min_int}, nil, nil, nil}
|> Stream.unfold(fn
@end_token ->
nil
key ->
next_key = :mnesia.dirty_next(@int_transfer_table, key)
{next_key, next_key}
end)
|> Stream.reject(&match?(@end_token, &1))
|> Stream.take_while(fn {{gen, _txi}, _kind, _account_pk, _ref_txi} -> gen < next_gen end)
|> Stream.map(fn {{gen, txi}, kind, account_pk, ref_txi} ->
{:target_kind_int_transfer_tx, {account_pk, kind, {gen, txi}, ref_txi}, nil}
end)

{:atomic, indexed_count} = :mnesia.transaction(fn ->
keys
|> Stream.each(fn target_kind_tx ->
:mnesia.write(@target_kind_int_transfer_table, target_kind_tx, :write)
end)
|> Enum.count()
end)

indexed_count
end

defp log(msg), do: Logger.info("[ReindexIntTransfersAccIndex migration] #{msg}", sync: true)
end
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ defmodule Integration.AeMdwWeb.TransferControllerTest do

test "when providing account and kind prefix filters, it returns transfers filtered by account and kind",
%{conn: conn} do
account_pk = "ak_21rna3xrD7p32U3vpXPSmanjsnSGnh6BWFPC9Pe7pYxeAW8PpS"
kind_prefix = "reward"
account_pk = "ak_JFkVmYeY9iP4gmKexBHx7t1aAj1R6FGvBdWaZRBVLuuzWv83j"
kind_prefix = "fee_"

conn = get(conn, "/transfers/forward?account=#{account_pk}&kind=#{kind_prefix}")
response = json_response(conn, 200)
Expand All @@ -324,7 +324,33 @@ defmodule Integration.AeMdwWeb.TransferControllerTest do
conn_next = get(conn, response["next"])
response_next = json_response(conn_next, 200)

assert @default_limit = Enum.count(response_next["data"])
assert Enum.count(response_next["data"]) > 0

assert Enum.all?(response_next["data"], fn %{"account_id" => account_id, "kind" => kind} ->
account_id == account_pk and String.starts_with?(kind, kind_prefix)
end)
end

test "when providing account and kind prefix filters and backwards, it returns transfers accordingly",
%{conn: conn} do
account_pk = "ak_JFkVmYeY9iP4gmKexBHx7t1aAj1R6FGvBdWaZRBVLuuzWv83j"
kind_prefix = "fee_"
from = 500_000
to = 50_000

conn = get(conn, "/transfers/gen/#{from}-#{to}?account=#{account_pk}&kind=#{kind_prefix}")
response = json_response(conn, 200)

assert @default_limit = Enum.count(response["data"])

assert Enum.all?(response["data"], fn %{"account_id" => account_id, "kind" => kind} ->
account_id == account_pk and String.starts_with?(kind, kind_prefix)
end)

conn_next = get(conn, response["next"])
response_next = json_response(conn_next, 200)

assert Enum.count(response_next["data"]) > 0

assert Enum.all?(response_next["data"], fn %{"account_id" => account_id, "kind" => kind} ->
account_id == account_pk and String.starts_with?(kind, kind_prefix)
Expand Down

0 comments on commit e9a6fe9

Please sign in to comment.