Skip to content

Commit

Permalink
INDEXER_CATCHUP_BLOCKS_BATCH_SIZE and INDEXER_CATCHUP_BLOCKS_CONCURRE…
Browse files Browse the repository at this point in the history
…NCY env varaibles
  • Loading branch information
vbaranov committed Sep 30, 2022
1 parent 0827e21 commit 5e284b4
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -2,6 +2,7 @@

### Features

- [#6196](https://github.com/blockscout/blockscout/pull/6196) - INDEXER_CATCHUP_BLOCKS_BATCH_SIZE and INDEXER_CATCHUP_BLOCKS_CONCURRENCY env varaibles
- [#6187](https://github.com/blockscout/blockscout/pull/6187) - Filter by created time of verified contracts in listcontracts API endpoint
- [#6092](https://github.com/blockscout/blockscout/pull/6092) - Blockscout Account functionality
- [#6073](https://github.com/blockscout/blockscout/pull/6073) - Add vyper support for rust verifier microservice integration
Expand Down
53 changes: 28 additions & 25 deletions apps/indexer/lib/indexer/block/catchup/fetcher.ex
Expand Up @@ -33,40 +33,22 @@ defmodule Indexer.Block.Catchup.Fetcher do
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.

@blocks_batch_size 10
@blocks_concurrency 10
@blocks_batch_size_default Application.get_env(:indexer, __MODULE__)[:batch_size_default]
@blocks_concurrency_default Application.get_env(:indexer, __MODULE__)[:concurrency_default]

@sequence_name :block_catchup_sequencer

defstruct blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency,
block_fetcher: nil,
defstruct block_fetcher: nil,
memory_monitor: nil

@doc false
def default_blocks_batch_size, do: @blocks_batch_size

@doc """
Required named arguments
* `:json_rpc_named_arguments` - `t:EthereumJSONRPC.json_rpc_named_arguments/0` passed to
`EthereumJSONRPC.json_rpc/2`.
The follow options can be overridden:
* `:blocks_batch_size` - The number of blocks to request in one call to the JSONRPC. Defaults to
`#{@blocks_batch_size}`. Block requests also include the transactions for those blocks. *These transactions
are not paginated.*
* `:blocks_concurrency` - The number of concurrent requests of `:blocks_batch_size` to allow against the JSONRPC.
Defaults to #{@blocks_concurrency}. So, up to `blocks_concurrency * block_batch_size` (defaults to
`#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all
connections. Up to `block_concurrency * receipts_batch_size * receipts_concurrency` (defaults to
`#{@blocks_concurrency * Block.Fetcher.default_receipts_batch_size() * Block.Fetcher.default_receipts_batch_size()}`
) receipts can be requested from the JSONRPC at once over all connections.
"""
def task(
%__MODULE__{
blocks_batch_size: blocks_batch_size,
block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}
} = state
) do
Expand Down Expand Up @@ -111,7 +93,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
false

_ ->
step = step(first, last, blocks_batch_size)
step = step(first, last, blocks_batch_size())
sequence_opts = put_memory_monitor([ranges: missing_ranges, step: step], state)
gen_server_opts = [name: @sequence_name]
{:ok, sequence} = Sequence.start_link(sequence_opts, gen_server_opts)
Expand All @@ -132,6 +114,27 @@ defmodule Indexer.Block.Catchup.Fetcher do
end
end

@doc """
The number of blocks to request in one call to the JSONRPC. Defaults to
10. Block requests also include the transactions for those blocks. *These transactions
are not paginated.
"""
def blocks_batch_size do
Application.get_env(:indexer, __MODULE__)[:batch_size]
end

@doc """
The number of concurrent requests of `blocks_batch_size` to allow against the JSONRPC.
Defaults to 10. So, up to `blocks_concurrency * block_batch_size` (defaults to
`10 * 10`) blocks can be requested from the JSONRPC at once over all
connections. Up to `block_concurrency * receipts_batch_size * receipts_concurrency` (defaults to
`#{10 * Block.Fetcher.default_receipts_batch_size() * Block.Fetcher.default_receipts_batch_size()}`
) receipts can be requested from the JSONRPC at once over all connections.
"""
def blocks_concurrency do
Application.get_env(:indexer, __MODULE__)[:concurrency]
end

defp fetch_last_block(json_rpc_named_arguments) do
case latest_block() do
nil ->
Expand Down Expand Up @@ -184,13 +187,13 @@ defmodule Indexer.Block.Catchup.Fetcher do
async_import_token_instances(imported)
end

defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence)
defp stream_fetch_and_import(state, sequence)
when is_pid(sequence) do
sequence
|> Sequence.build_stream()
|> Task.async_stream(
&fetch_and_import_range_from_sequence(state, &1, sequence),
max_concurrency: blocks_concurrency,
max_concurrency: blocks_concurrency(),
timeout: :infinity
)
|> Stream.run()
Expand Down
Expand Up @@ -207,7 +207,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do

{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)

default_blocks_batch_size = Catchup.Fetcher.default_blocks_batch_size()
default_blocks_batch_size = Catchup.Fetcher.blocks_batch_size()

assert latest_block_number > default_blocks_batch_size

Expand Down
6 changes: 3 additions & 3 deletions apps/indexer/test/indexer/block/catchup/fetcher_test.exs
Expand Up @@ -126,6 +126,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
test "ignores fetched beneficiaries with different hash for same number", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10)
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Expand Down Expand Up @@ -264,7 +265,6 @@ defmodule Indexer.Block.Catchup.FetcherTest do

assert %{first_block_number: ^block_number, last_block_number: 0, missing_block_count: 2, shrunk: false} =
Fetcher.task(%Fetcher{
blocks_batch_size: 1,
block_fetcher: %Block.Fetcher{
callback_module: Fetcher,
json_rpc_named_arguments: json_rpc_named_arguments
Expand All @@ -280,6 +280,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
test "async fetches beneficiaries when individual responses error out", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10)
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Expand Down Expand Up @@ -413,7 +414,6 @@ defmodule Indexer.Block.Catchup.FetcherTest do

assert %{first_block_number: ^block_number, last_block_number: 0, missing_block_count: 2, shrunk: false} =
Fetcher.task(%Fetcher{
blocks_batch_size: 1,
block_fetcher: %Block.Fetcher{
callback_module: Fetcher,
json_rpc_named_arguments: json_rpc_named_arguments
Expand All @@ -431,6 +431,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
test "async fetches beneficiaries when entire call errors out", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10)
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Expand Down Expand Up @@ -557,7 +558,6 @@ defmodule Indexer.Block.Catchup.FetcherTest do

assert %{first_block_number: ^block_number, last_block_number: 0, missing_block_count: 2, shrunk: false} =
Fetcher.task(%Fetcher{
blocks_batch_size: 1,
block_fetcher: %Block.Fetcher{
callback_module: Fetcher,
json_rpc_named_arguments: json_rpc_named_arguments
Expand Down
17 changes: 17 additions & 0 deletions config/runtime.exs
Expand Up @@ -422,6 +422,23 @@ config :indexer, Indexer.Supervisor, enabled: System.get_env("DISABLE_INDEXER")

config :indexer, Indexer.Block.Realtime.Supervisor, enabled: System.get_env("DISABLE_REALTIME_INDEXER") != "true"

blocks_catchup_fetcher_batch_size_default_str = "10"
blocks_catchup_fetcher_concurrency_default_str = "10"
{blocks_catchup_fetcher_batch_size_default, _} = Integer.parse(blocks_catchup_fetcher_batch_size_default_str)
{blocks_catchup_fetcher_concurrency_default, _} = Integer.parse(blocks_catchup_fetcher_concurrency_default_str)

{blocks_catchup_fetcher_batch_size, _} =
Integer.parse(System.get_env("INDEXER_CATCHUP_BLOCKS_BATCH_SIZE", blocks_catchup_fetcher_batch_size_default_str))

{blocks_catchup_fetcher_concurrency, _} =
Integer.parse(System.get_env("INDEXER_CATCHUP_BLOCKS_CONCURRENCY", blocks_catchup_fetcher_concurrency_default_str))

config :indexer, Indexer.Block.Catchup.Fetcher,
batch_size_default: blocks_catchup_fetcher_batch_size_default,
concurrency_default: blocks_catchup_fetcher_concurrency_default,
batch_size: blocks_catchup_fetcher_batch_size,
concurrency: blocks_catchup_fetcher_concurrency

Code.require_file("#{config_env()}.exs", "config/runtime")

for config <- "../apps/*/config/runtime/#{config_env()}.exs" |> Path.expand(__DIR__) |> Path.wildcard() do
Expand Down
2 changes: 2 additions & 0 deletions docker-compose/envs/common-blockscout.env
Expand Up @@ -81,6 +81,8 @@ DISABLE_INDEXER=false
DISABLE_REALTIME_INDEXER=false
INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=false
INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_CATCHUP_BLOCKS_BATCH_SIZE=
# INDEXER_CATCHUP_BLOCKS_CONCURRENCY=
# WEBAPP_URL=
# API_URL=
WOBSERVER_ENABLED=false
Expand Down
30 changes: 18 additions & 12 deletions docker/Makefile
Expand Up @@ -370,30 +370,18 @@ endif
ifdef API_RATE_LIMIT_WHITELISTED_IPS
BLOCKSCOUT_CONTAINER_PARAMS += -e 'API_RATE_LIMIT_WHITELISTED_IPS=$(API_RATE_LIMIT_WHITELISTED_IPS)'
endif
ifdef INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER)'
endif
ifdef INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER)'
endif
ifdef TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES
BLOCKSCOUT_CONTAINER_PARAMS += -e 'TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES=$(TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES)'
endif
ifdef COIN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES
BLOCKSCOUT_CONTAINER_PARAMS += -e 'COIN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES=$(COIN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES)'
endif
ifdef INDEXER_MEMORY_LIMIT
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_MEMORY_LIMIT=$(INDEXER_MEMORY_LIMIT)'
endif
ifdef ETHEREUM_JSONRPC_DEBUG_TRACE_TRANSACTION_TIMEOUT
BLOCKSCOUT_CONTAINER_PARAMS += -e 'ETHEREUM_JSONRPC_DEBUG_TRACE_TRANSACTION_TIMEOUT=$(ETHEREUM_JSONRPC_DEBUG_TRACE_TRANSACTION_TIMEOUT)'
endif
ifdef FETCH_REWARDS_WAY
BLOCKSCOUT_CONTAINER_PARAMS += -e 'FETCH_REWARDS_WAY=$(FETCH_REWARDS_WAY)'
endif
ifdef INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)'
endif
ifdef FOOTER_CHAT_LINK
BLOCKSCOUT_CONTAINER_PARAMS += -e 'FOOTER_CHAT_LINK=$(FOOTER_CHAT_LINK)'
endif
Expand Down Expand Up @@ -436,6 +424,15 @@ endif
ifdef ENABLE_TXS_STATS
BLOCKSCOUT_CONTAINER_PARAMS += -e 'ENABLE_TXS_STATS=$(ENABLE_TXS_STATS)'
endif
ifdef INDEXER_MEMORY_LIMIT
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_MEMORY_LIMIT=$(INDEXER_MEMORY_LIMIT)'
endif
ifdef INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER)'
endif
ifdef INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER)'
endif
ifdef INDEXER_DISABLE_BLOCK_REWARD_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_BLOCK_REWARD_FETCHER=$(INDEXER_DISABLE_BLOCK_REWARD_FETCHER)'
endif
Expand All @@ -448,6 +445,15 @@ endif
ifdef INDEXER_DISABLE_EMPTY_BLOCK_SANITIZER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_EMPTY_BLOCK_SANITIZER=$(INDEXER_DISABLE_EMPTY_BLOCK_SANITIZER)'
endif
ifdef INDEXER_CATCHUP_BLOCKS_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_CATCHUP_BLOCKS_BATCH_SIZE=$(INDEXER_CATCHUP_BLOCKS_BATCH_SIZE)'
endif
ifdef INDEXER_CATCHUP_BLOCKS_CONCURRENCY
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_CATCHUP_BLOCKS_CONCURRENCY=$(INDEXER_CATCHUP_BLOCKS_CONCURRENCY)'
endif
ifdef INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)'
endif
ifdef SECRET_KEY_BASE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'SECRET_KEY_BASE=$(SECRET_KEY_BASE)'
endif
Expand Down

0 comments on commit 5e284b4

Please sign in to comment.