Skip to content

Commit

Permalink
Merge branch 'master' into ab-batch-token-updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ayrat555 committed Oct 2, 2019
2 parents a30e65f + dc81f92 commit 3855be4
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 88 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
@@ -1,6 +1,7 @@
## Current

### Features
- [#2726](https://github.com/poanetwork/blockscout/pull/2726) - Remove internal_transaction block_number setting from blocks runner
- [#2717](https://github.com/poanetwork/blockscout/pull/2717) - Improve speed of nonconsensus data removal
- [#2679](https://github.com/poanetwork/blockscout/pull/2679) - added fixed height for card chain blocks and card chain transactions
- [#2678](https://github.com/poanetwork/blockscout/pull/2678) - fixed dashboard banner height bug
Expand Down
68 changes: 19 additions & 49 deletions apps/explorer/lib/explorer/chain/import/runner/blocks.ex
Expand Up @@ -56,7 +56,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
# Note, needs to be executed after `lose_consensus` for lock acquisition
insert(repo, changes_list, insert_options)
end)
|> Multi.run(:uncle_fetched_block_second_degree_relations, fn repo, %{blocks: blocks} when is_list(blocks) ->
|> Multi.run(:uncle_fetched_block_second_degree_relations, fn repo, _ ->
update_block_second_degree_relations(repo, hashes, %{
timeout:
options[Runner.Block.SecondDegreeRelations.option_key()][:timeout] ||
Expand Down Expand Up @@ -86,15 +86,9 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
|> Multi.run(:remove_nonconsensus_logs, fn repo, %{derive_transaction_forks: transactions} ->
remove_nonconsensus_logs(repo, transactions, insert_options)
end)
|> Multi.run(:acquire_internal_transactions, fn repo, %{derive_transaction_forks: transactions} ->
acquire_internal_transactions(repo, hashes, transactions)
end)
|> Multi.run(:remove_nonconsensus_internal_transactions, fn repo, %{derive_transaction_forks: transactions} ->
remove_nonconsensus_internal_transactions(repo, transactions, insert_options)
end)
|> Multi.run(:internal_transaction_transaction_block_number, fn repo, _ ->
update_internal_transaction_block_number(repo, hashes)
end)
|> Multi.run(:acquire_contract_address_tokens, fn repo, _ ->
acquire_contract_address_tokens(repo, consensus_block_numbers)
end)
Expand Down Expand Up @@ -139,26 +133,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
Tokens.acquire_contract_address_tokens(repo, contract_address_hashes)
end

defp acquire_internal_transactions(repo, hashes, forked_transaction_hashes) do
query =
from(internal_transaction in InternalTransaction,
join: transaction in Transaction,
on: internal_transaction.transaction_hash == transaction.hash,
where: transaction.block_hash in ^hashes,
or_where: transaction.hash in ^forked_transaction_hashes,
select: {internal_transaction.transaction_hash, internal_transaction.index},
# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md)
order_by: [
internal_transaction.transaction_hash,
internal_transaction.index
],
# NOTE: find a better way to know the alias that ecto gives to token
lock: "FOR UPDATE OF i0"
)

{:ok, repo.all(query)}
end

defp fork_transactions(%{
repo: repo,
timeout: timeout,
Expand Down Expand Up @@ -379,13 +353,27 @@ defmodule Explorer.Chain.Import.Runner.Blocks do

defp remove_nonconsensus_internal_transactions(repo, forked_transaction_hashes, %{timeout: timeout}) do
query =
from(internal_transaction in InternalTransaction,
from(
internal_transaction in InternalTransaction,
where: internal_transaction.transaction_hash in ^forked_transaction_hashes,
select: map(internal_transaction, [:transaction_hash, :index])
select: %{transaction_hash: internal_transaction.transaction_hash},
# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md)
order_by: [
internal_transaction.transaction_hash,
internal_transaction.index
],
lock: "FOR UPDATE"
)

delete_query =
from(
i in InternalTransaction,
join: s in subquery(query),
on: i.transaction_hash == s.transaction_hash,
select: map(i, [:transaction_hash, :index])
)

# ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md)
{_count, deleted_internal_transactions} = repo.delete_all(query, timeout: timeout)
{_count, deleted_internal_transactions} = repo.delete_all(delete_query, timeout: timeout)

{:ok, deleted_internal_transactions}
rescue
Expand Down Expand Up @@ -645,24 +633,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
end
end

defp update_internal_transaction_block_number(repo, blocks_hashes) when is_list(blocks_hashes) do
query =
from(
internal_transaction in InternalTransaction,
join: transaction in Transaction,
on: internal_transaction.transaction_hash == transaction.hash,
join: block in Block,
on: block.hash == transaction.block_hash,
where: block.hash in ^blocks_hashes,
update: [set: [block_number: block.number]]
)

# ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md)
{total, _} = repo.update_all(query, [])

{:ok, total}
end

defp where_forked(blocks_changes) when is_list(blocks_changes) do
initial = from(t in Transaction, where: false)

Expand Down
Expand Up @@ -4,9 +4,10 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
"""

require Ecto.Query
require Logger

alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.{Hash, Import, InternalTransaction, Transaction}
alias Explorer.Chain.{Block, Hash, Import, InternalTransaction, Transaction}
alias Explorer.Chain.Import.Runner

import Ecto.Query, only: [from: 2]
Expand Down Expand Up @@ -52,32 +53,43 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
|> Multi.run(:acquire_transactions, fn repo, _ ->
acquire_transactions(repo, changes_list)
end)
|> Multi.run(:internal_transactions, fn repo, _ ->
insert(repo, changes_list, insert_options)
|> Multi.run(:internal_transactions, fn repo, %{acquire_transactions: transactions} ->
insert(repo, changes_list, transactions, insert_options)
end)
|> Multi.run(:internal_transactions_indexed_at_transactions, fn repo, %{acquire_transactions: transaction_hashes} ->
update_transactions(repo, transaction_hashes, update_transactions_options)
|> Multi.run(:internal_transactions_indexed_at_transactions, fn repo, %{acquire_transactions: transactions} ->
update_transactions(repo, transactions, update_transactions_options)
end)
|> Multi.run(
:remove_consensus_of_missing_transactions_blocks,
fn repo, %{internal_transactions: inserted} = results_map ->
# NOTE: for this to work it has to follow the runner `internal_transactions_indexed_at_blocks`
block_hashes = Map.get(results_map, :internal_transactions_indexed_at_blocks, [])
remove_consensus_of_missing_transactions_blocks(repo, block_hashes, changes_list, inserted)
end
)
end

@impl Runner
def timeout, do: @timeout

@spec insert(Repo.t(), [map], %{
@spec insert(Repo.t(), [map], [Transaction.t()], %{
optional(:on_conflict) => Runner.on_conflict(),
required(:timeout) => timeout,
required(:timestamps) => Import.timestamps()
}) ::
{:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]}
| {:error, [Changeset.t()]}
defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options)
defp insert(repo, changes_list, transactions, %{timeout: timeout, timestamps: timestamps} = options)
when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)

# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md)
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index})
transactions_map = Map.new(transactions, &{&1.hash, &1})

final_changes_list = reject_pending_transactions(ordered_changes_list, repo)
final_changes_list =
changes_list
# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md)
|> Enum.sort_by(&{&1.transaction_hash, &1.index})
|> reject_missing_transactions(transactions_map)

{:ok, internal_transactions} =
Import.insert_changes_list(
Expand All @@ -86,16 +98,12 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
conflict_target: [:transaction_hash, :index],
for: InternalTransaction,
on_conflict: on_conflict,
returning: [:transaction_hash, :index],
returning: true,
timeout: timeout,
timestamps: timestamps
)

{:ok,
for(
internal_transaction <- internal_transactions,
do: Map.take(internal_transaction, [:id, :index, :transaction_hash])
)}
{:ok, internal_transactions}
end

defp default_on_conflict do
Expand Down Expand Up @@ -158,26 +166,28 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
from(
t in Transaction,
where: t.hash in ^transaction_hashes,
# do not consider pending transactions
where: not is_nil(t.block_hash),
select: t.hash,
select: map(t, [:hash, :block_hash, :block_number]),
# Enforce Transaction ShareLocks order (see docs: sharelocks.md)
order_by: t.hash,
lock: "FOR UPDATE"
)

hashes = repo.all(query)

{:ok, hashes}
{:ok, repo.all(query)}
end

defp update_transactions(repo, transaction_hashes, %{
defp update_transactions(repo, transactions, %{
timeout: timeout,
timestamps: timestamps
})
when is_list(transaction_hashes) do
when is_list(transactions) do
transaction_hashes = Enum.map(transactions, & &1.hash)

update_query =
from(
t in Transaction,
# pending transactions are already excluded by `acquire_transactions`
where: t.hash in ^transaction_hashes,
# ShareLocks order already enforced by `acquire_transactions` (see docs: sharelocks.md)
update: [
Expand Down Expand Up @@ -214,22 +224,51 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
end
end

defp reject_pending_transactions(ordered_changes_list, repo) do
transaction_hashes =
ordered_changes_list
|> Enum.map(& &1.transaction_hash)
|> Enum.dedup()
# If not using Parity this is not relevant
defp remove_consensus_of_missing_transactions_blocks(_, [], _, _), do: {:ok, []}

query =
from(t in Transaction,
where: t.hash in ^transaction_hashes,
where: is_nil(t.block_hash),
select: t.hash
defp remove_consensus_of_missing_transactions_blocks(repo, block_hashes, changes_list, inserted) do
inserted_block_numbers = MapSet.new(inserted, & &1.block_number)

missing_transactions_block_numbers =
changes_list
|> MapSet.new(& &1.block_number)
|> MapSet.difference(inserted_block_numbers)
|> MapSet.to_list()

update_query =
from(
b in Block,
where: b.number in ^missing_transactions_block_numbers,
where: b.hash in ^block_hashes,
# ShareLocks order already enforced by `internal_transactions_indexed_at_blocks` (see docs: sharelocks.md)
update: [set: [consensus: false, internal_transactions_indexed_at: nil]]
)

pending_transactions = repo.all(query)
try do
{_num, result} = repo.update_all(update_query, [])

Logger.debug(fn ->
[
"consensus removed from blocks with numbers: ",
inspect(missing_transactions_block_numbers),
" because of missing transactions"
]
end)

{:ok, result}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, missing_transactions_block_numbers: missing_transactions_block_numbers}}
end
end

ordered_changes_list
|> Enum.reject(fn %{transaction_hash: hash} -> Enum.member?(pending_transactions, hash) end)
defp reject_missing_transactions(ordered_changes_list, transactions_map) do
Enum.reject(ordered_changes_list, fn %{transaction_hash: hash} ->
transactions_map
|> Map.get(hash, %{})
|> Map.get(:block_hash)
|> is_nil()
end)
end
end
Expand Up @@ -68,7 +68,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactionsIndexedAtBlocks do
try do
{_, result} =
repo.update_all(
from(b in Block, join: s in subquery(query), on: b.hash == s.hash),
from(b in Block, join: s in subquery(query), on: b.hash == s.hash, select: b.hash),
[set: [internal_transactions_indexed_at: timestamps.updated_at]],
timeout: timeout
)
Expand Down
Expand Up @@ -2,7 +2,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactionsTest do
use Explorer.DataCase

alias Ecto.Multi
alias Explorer.Chain.{Data, Wei, Transaction, InternalTransaction}
alias Explorer.Chain.{Block, Data, Wei, Transaction, InternalTransaction}
alias Explorer.Chain.Import.Runner.InternalTransactions

describe "run/1" do
Expand Down Expand Up @@ -42,10 +42,78 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactionsTest do

assert is_nil(Repo.get(Transaction, pending.hash).block_hash)
end

test "removes consensus to blocks where transactions are missing" do
empty_block = insert(:block)
pending = insert(:transaction)

assert is_nil(pending.block_hash)

full_block = insert(:block)
inserted = insert(:transaction) |> with_block(full_block)

assert full_block.hash == inserted.block_hash

index = 0

pending_transaction_changes =
pending.hash
|> make_internal_transaction_changes(index, nil)
|> Map.put(:block_number, empty_block.number)

transaction_changes =
inserted.hash
|> make_internal_transaction_changes(index, nil)
|> Map.put(:block_number, full_block.number)

multi =
Multi.new()
|> Multi.run(:internal_transactions_indexed_at_blocks, fn _, _ -> {:ok, [empty_block.hash, full_block.hash]} end)

assert {:ok, _} = run_internal_transactions([pending_transaction_changes, transaction_changes], multi)

assert from(i in InternalTransaction, where: i.transaction_hash == ^pending.hash) |> Repo.one() |> is_nil()

assert %{consensus: false} = Repo.get(Block, empty_block.hash)

assert from(i in InternalTransaction, where: i.transaction_hash == ^inserted.hash) |> Repo.one() |> is_nil() ==
false

assert %{consensus: true} = Repo.get(Block, full_block.hash)
end

test "does not remove consensus when block is empty and no transactions are missing" do
empty_block = insert(:block)

full_block = insert(:block)
inserted = insert(:transaction) |> with_block(full_block)

assert full_block.hash == inserted.block_hash

index = 0

transaction_changes =
inserted.hash
|> make_internal_transaction_changes(index, nil)
|> Map.put(:block_number, full_block.number)

multi =
Multi.new()
|> Multi.run(:internal_transactions_indexed_at_blocks, fn _, _ -> {:ok, [empty_block.hash, full_block.hash]} end)

assert {:ok, _} = run_internal_transactions([transaction_changes], multi)

assert %{consensus: true} = Repo.get(Block, empty_block.hash)

assert from(i in InternalTransaction, where: i.transaction_hash == ^inserted.hash) |> Repo.one() |> is_nil() ==
false

assert %{consensus: true} = Repo.get(Block, full_block.hash)
end
end

defp run_internal_transactions(changes_list) when is_list(changes_list) do
Multi.new()
defp run_internal_transactions(changes_list, multi \\ Multi.new()) when is_list(changes_list) do
multi
|> InternalTransactions.run(changes_list, %{
timeout: :infinity,
timestamps: %{inserted_at: DateTime.utc_now(), updated_at: DateTime.utc_now()}
Expand Down

0 comments on commit 3855be4

Please sign in to comment.