Skip to content

Commit

Permalink
Limit fetchers init tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Qwerty5Uiop committed Jun 15, 2023
1 parent 81c7fed commit fd78aaf
Show file tree
Hide file tree
Showing 20 changed files with 142 additions and 79 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -11,6 +11,7 @@
- [#7532](https://github.com/blockscout/blockscout/pull/7532) - Handle empty id in json rpc responses
- [#7544](https://github.com/blockscout/blockscout/pull/7544) - Add ERC-1155 signatures to uncataloged_token_transfer_block_numbers
- [#7363](https://github.com/blockscout/blockscout/pull/7363) - CSV export filters
- [#7697](https://github.com/blockscout/blockscout/pull/7697) - Limit fetchers init tasks

### Fixes

Expand Down
83 changes: 59 additions & 24 deletions apps/explorer/lib/explorer/chain.ex
Expand Up @@ -2616,8 +2616,9 @@ defmodule Explorer.Chain do
@doc """
Calls `reducer` on a stream of `t:Explorer.Chain.Block.t/0` without `t:Explorer.Chain.Block.Reward.t/0`.
"""
def stream_blocks_without_rewards(initial, reducer) when is_function(reducer, 2) do
def stream_blocks_without_rewards(initial, reducer, limited? \\ false) when is_function(reducer, 2) do
Block.blocks_without_reward_query()
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end

Expand Down Expand Up @@ -2827,30 +2828,35 @@ defmodule Explorer.Chain do
@spec stream_unfetched_balances(
initial :: accumulator,
reducer ::
(entry :: %{address_hash: Hash.Address.t(), block_number: Block.block_number()}, accumulator -> accumulator)
(entry :: %{address_hash: Hash.Address.t(), block_number: Block.block_number()}, accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_unfetched_balances(initial, reducer) when is_function(reducer, 2) do
def stream_unfetched_balances(initial, reducer, limited? \\ false) when is_function(reducer, 2) do
query =
from(
balance in CoinBalance,
where: is_nil(balance.value_fetched_at),
select: %{address_hash: balance.address_hash, block_number: balance.block_number}
)

Repo.stream_reduce(query, initial, reducer)
query
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end

@doc """
Returns a stream of all token balances that weren't fetched values.
"""
@spec stream_unfetched_token_balances(
initial :: accumulator,
reducer :: (entry :: TokenBalance.t(), accumulator -> accumulator)
reducer :: (entry :: TokenBalance.t(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_unfetched_token_balances(initial, reducer) when is_function(reducer, 2) do
def stream_unfetched_token_balances(initial, reducer, limited? \\ false) when is_function(reducer, 2) do
TokenBalance.unfetched_token_balances()
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end

Expand All @@ -2872,18 +2878,22 @@ defmodule Explorer.Chain do
"""
@spec stream_blocks_with_unfetched_internal_transactions(
initial :: accumulator,
reducer :: (entry :: term(), accumulator -> accumulator)
reducer :: (entry :: term(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_blocks_with_unfetched_internal_transactions(initial, reducer) when is_function(reducer, 2) do
def stream_blocks_with_unfetched_internal_transactions(initial, reducer, limited? \\ false)
when is_function(reducer, 2) do
query =
from(
po in PendingBlockOperation,
where: not is_nil(po.block_number),
select: po.block_number
)

Repo.stream_reduce(query, initial, reducer)
query
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end

def remove_nonconsensus_blocks_from_pending_ops(block_hashes) do
Expand Down Expand Up @@ -2930,10 +2940,11 @@ defmodule Explorer.Chain do
| :value
],
initial :: accumulator,
reducer :: (entry :: term(), accumulator -> accumulator)
reducer :: (entry :: term(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_transactions_with_unfetched_created_contract_codes(fields, initial, reducer)
def stream_transactions_with_unfetched_created_contract_codes(fields, initial, reducer, limited? \\ false)
when is_function(reducer, 2) do
query =
from(t in Transaction,
Expand All @@ -2943,7 +2954,9 @@ defmodule Explorer.Chain do
select: ^fields
)

Repo.stream_reduce(query, initial, reducer)
query
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end

@spec stream_mined_transactions(
Expand Down Expand Up @@ -2995,14 +3008,16 @@ defmodule Explorer.Chain do
| :value
],
initial :: accumulator,
reducer :: (entry :: term(), accumulator -> accumulator)
reducer :: (entry :: term(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_pending_transactions(fields, initial, reducer) when is_function(reducer, 2) do
def stream_pending_transactions(fields, initial, reducer, limited? \\ false) when is_function(reducer, 2) do
query =
Transaction
|> pending_transactions_query()
|> select(^fields)
|> add_fetcher_limit(limited?)

Repo.stream_reduce(query, initial, reducer)
end
Expand All @@ -3017,17 +3032,20 @@ defmodule Explorer.Chain do
"""
@spec stream_unfetched_uncles(
initial :: accumulator,
reducer :: (entry :: term(), accumulator -> accumulator)
reducer :: (entry :: term(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_unfetched_uncles(initial, reducer) when is_function(reducer, 2) do
def stream_unfetched_uncles(initial, reducer, limited? \\ false) when is_function(reducer, 2) do
query =
from(bsdr in Block.SecondDegreeRelation,
where: is_nil(bsdr.uncle_fetched_at) and not is_nil(bsdr.index),
select: [:nephew_hash, :index]
)

Repo.stream_reduce(query, initial, reducer)
query
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end

@doc """
Expand Down Expand Up @@ -5017,18 +5035,22 @@ defmodule Explorer.Chain do
"""
@spec stream_uncataloged_token_contract_address_hashes(
initial :: accumulator,
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator)
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_uncataloged_token_contract_address_hashes(initial, reducer) when is_function(reducer, 2) do
def stream_uncataloged_token_contract_address_hashes(initial, reducer, limited? \\ false)
when is_function(reducer, 2) do
query =
from(
token in Token,
where: token.cataloged == false,
select: token.contract_address_hash
)

Repo.stream_reduce(query, initial, reducer)
query
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end

@spec stream_unfetched_token_instances(
Expand Down Expand Up @@ -5080,17 +5102,19 @@ defmodule Explorer.Chain do

@spec stream_token_instances_with_error(
initial :: accumulator,
reducer :: (entry :: map(), accumulator -> accumulator)
reducer :: (entry :: map(), accumulator -> accumulator),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_token_instances_with_error(initial, reducer) when is_function(reducer, 2) do
def stream_token_instances_with_error(initial, reducer, limited? \\ false) when is_function(reducer, 2) do
Instance
|> where([instance], not is_nil(instance.error))
|> select([instance], %{
contract_address_hash: instance.token_contract_address_hash,
token_id: instance.token_id,
updated_at: instance.updated_at
})
|> add_fetcher_limit(limited?)
|> Repo.stream_reduce(initial, reducer)
end

Expand All @@ -5099,13 +5123,16 @@ defmodule Explorer.Chain do
"""
@spec stream_cataloged_token_contract_address_hashes(
initial :: accumulator,
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator)
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator),
some_time_ago_updated :: integer(),
limited? :: boolean()
) :: {:ok, accumulator}
when accumulator: term()
def stream_cataloged_token_contract_address_hashes(initial, reducer, some_time_ago_updated \\ 2880)
def stream_cataloged_token_contract_address_hashes(initial, reducer, some_time_ago_updated \\ 2880, limited? \\ false)
when is_function(reducer, 2) do
some_time_ago_updated
|> Token.cataloged_tokens()
|> add_fetcher_limit(limited?)
|> order_by(asc: :updated_at)
|> Repo.stream_reduce(initial, reducer)
end
Expand Down Expand Up @@ -6913,4 +6940,12 @@ defmodule Explorer.Chain do
def count_withdrawals_from_cache(options \\ []) do
"withdrawals_count" |> get_last_fetched_counter(options) |> Decimal.add(1)
end

def add_fetcher_limit(query, false), do: query

def add_fetcher_limit(query, true) do
fetcher_limit = Application.get_env(:indexer, :fetcher_init_limit)

limit(query, ^fetcher_limit)
end
end
21 changes: 4 additions & 17 deletions apps/indexer/lib/indexer/buffered_task.ex
Expand Up @@ -71,7 +71,7 @@ defmodule Indexer.BufferedTask do
flush_interval: nil,
max_batch_size: nil,
max_concurrency: nil,
poll: false,
poll: true,
metadata: [],
current_buffer: [],
bound_queue: %BoundQueue{},
Expand Down Expand Up @@ -231,7 +231,7 @@ defmodule Indexer.BufferedTask do
state = %BufferedTask{
callback_module: callback_module,
callback_module_state: Keyword.fetch!(opts, :state),
poll: Keyword.get(opts, :poll, false),
poll: Keyword.get(opts, :poll, true),
task_supervisor: Keyword.fetch!(opts, :task_supervisor),
flush_interval: Keyword.fetch!(opts, :flush_interval),
max_batch_size: Keyword.fetch!(opts, :max_batch_size),
Expand Down Expand Up @@ -442,21 +442,8 @@ defmodule Indexer.BufferedTask do
end

# get more work from `init/2`
defp schedule_next(%BufferedTask{poll: true, bound_queue: %BoundQueue{size: 0}} = state) do
do_initial_stream(state)
end

# was shrunk and was out of work, get more work from `init/2`
defp schedule_next(%BufferedTask{bound_queue: %BoundQueue{size: 0, maximum_size: maximum_size}} = state)
when maximum_size != nil do
Logger.info(fn ->
[
"BufferedTask ",
process(self()),
" ran out of work, but work queue was shrunk to save memory, so restoring lost work from `c:init/2`."
]
end)

defp schedule_next(%BufferedTask{poll: true, bound_queue: %BoundQueue{size: 0}, task_ref_to_batch: tasks} = state)
when tasks == %{} do
do_initial_stream(state)
end

Expand Down
10 changes: 7 additions & 3 deletions apps/indexer/lib/indexer/fetcher/block_reward.ex
Expand Up @@ -62,9 +62,13 @@ defmodule Indexer.Fetcher.BlockReward do
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
Chain.stream_blocks_without_rewards(initial, fn %{number: number}, acc ->
reducer.(number, acc)
end)
Chain.stream_blocks_without_rewards(
initial,
fn %{number: number}, acc ->
reducer.(number, acc)
end,
true
)

final
end
Expand Down
14 changes: 9 additions & 5 deletions apps/indexer/lib/indexer/fetcher/coin_balance.ex
Expand Up @@ -61,11 +61,15 @@ defmodule Indexer.Fetcher.CoinBalance do
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
Chain.stream_unfetched_balances(initial, fn address_fields, acc ->
address_fields
|> entry()
|> reducer.(acc)
end)
Chain.stream_unfetched_balances(
initial,
fn address_fields, acc ->
address_fields
|> entry()
|> reducer.(acc)
end,
true
)

final
end
Expand Down
3 changes: 2 additions & 1 deletion apps/indexer/lib/indexer/fetcher/contract_code.ex
Expand Up @@ -64,7 +64,8 @@ defmodule Indexer.Fetcher.ContractCode do
transaction_fields
|> entry()
|> reducer.(acc)
end
end,
true
)

final
Expand Down
11 changes: 7 additions & 4 deletions apps/indexer/lib/indexer/fetcher/internal_transaction.ex
Expand Up @@ -71,9 +71,13 @@ defmodule Indexer.Fetcher.InternalTransaction do
@impl BufferedTask
def init(initial, reducer, _json_rpc_named_arguments) do
{:ok, final} =
Chain.stream_blocks_with_unfetched_internal_transactions(initial, fn block_number, acc ->
reducer.(block_number, acc)
end)
Chain.stream_blocks_with_unfetched_internal_transactions(
initial,
fn block_number, acc ->
reducer.(block_number, acc)
end,
true
)

final
end
Expand Down Expand Up @@ -349,7 +353,6 @@ defmodule Indexer.Fetcher.InternalTransaction do
flush_interval: :timer.seconds(3),
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size,
poll: true,
task_supervisor: Indexer.Fetcher.InternalTransaction.TaskSupervisor,
metadata: [fetcher: :internal_transaction]
]
Expand Down
3 changes: 2 additions & 1 deletion apps/indexer/lib/indexer/fetcher/replaced_transaction.ex
Expand Up @@ -61,7 +61,8 @@ defmodule Indexer.Fetcher.ReplacedTransaction do
transaction_fields
|> pending_entry()
|> reducer.(acc)
end
end,
true
)

final
Expand Down
10 changes: 7 additions & 3 deletions apps/indexer/lib/indexer/fetcher/token.ex
Expand Up @@ -42,9 +42,13 @@ defmodule Indexer.Fetcher.Token do
@impl BufferedTask
def init(initial_acc, reducer, _) do
{:ok, acc} =
Chain.stream_uncataloged_token_contract_address_hashes(initial_acc, fn address, acc ->
reducer.(address, acc)
end)
Chain.stream_uncataloged_token_contract_address_hashes(
initial_acc,
fn address, acc ->
reducer.(address, acc)
end,
true
)

acc
end
Expand Down
14 changes: 9 additions & 5 deletions apps/indexer/lib/indexer/fetcher/token_balance.ex
Expand Up @@ -69,11 +69,15 @@ defmodule Indexer.Fetcher.TokenBalance do
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
Chain.stream_unfetched_token_balances(initial, fn token_balance, acc ->
token_balance
|> entry()
|> reducer.(acc)
end)
Chain.stream_unfetched_token_balances(
initial,
fn token_balance, acc ->
token_balance
|> entry()
|> reducer.(acc)
end,
true
)

final
end
Expand Down

0 comments on commit fd78aaf

Please sign in to comment.