Skip to content

Commit

Permalink
INDEXER_CATCHUP_BLOCKS_BATCH_SIZE and INDEXER_CATCHUP_BLOCKS_CONCURRE…
Browse files Browse the repository at this point in the history
…NCY env varaibles
  • Loading branch information
vbaranov committed Sep 30, 2022
1 parent 0827e21 commit 13f4072
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -2,6 +2,7 @@

### Features

- [#6196](https://github.com/blockscout/blockscout/pull/6196) - INDEXER_CATCHUP_BLOCKS_BATCH_SIZE and INDEXER_CATCHUP_BLOCKS_CONCURRENCY env varaibles
- [#6187](https://github.com/blockscout/blockscout/pull/6187) - Filter by created time of verified contracts in listcontracts API endpoint
- [#6092](https://github.com/blockscout/blockscout/pull/6092) - Blockscout Account functionality
- [#6073](https://github.com/blockscout/blockscout/pull/6073) - Add vyper support for rust verifier microservice integration
Expand Down
52 changes: 36 additions & 16 deletions apps/indexer/lib/indexer/block/catchup/fetcher.ex
Expand Up @@ -33,18 +33,14 @@ defmodule Indexer.Block.Catchup.Fetcher do
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.

@blocks_batch_size 10
@blocks_concurrency 10
@blocks_batch_size_default 10
@blocks_concurrency_default 10

@sequence_name :block_catchup_sequencer

defstruct blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency,
block_fetcher: nil,
defstruct block_fetcher: nil,
memory_monitor: nil

@doc false
def default_blocks_batch_size, do: @blocks_batch_size

@doc """
Required named arguments
Expand All @@ -54,19 +50,18 @@ defmodule Indexer.Block.Catchup.Fetcher do
The follow options can be overridden:
* `:blocks_batch_size` - The number of blocks to request in one call to the JSONRPC. Defaults to
`#{@blocks_batch_size}`. Block requests also include the transactions for those blocks. *These transactions
`#{@blocks_batch_size_default}`. Block requests also include the transactions for those blocks. *These transactions
are not paginated.*
* `:blocks_concurrency` - The number of concurrent requests of `:blocks_batch_size` to allow against the JSONRPC.
Defaults to #{@blocks_concurrency}. So, up to `blocks_concurrency * block_batch_size` (defaults to
`#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all
Defaults to #{@blocks_concurrency_default}. So, up to `blocks_concurrency * block_batch_size` (defaults to
`#{@blocks_concurrency_default * @blocks_batch_size_default}`) blocks can be requested from the JSONRPC at once over all
connections. Up to `block_concurrency * receipts_batch_size * receipts_concurrency` (defaults to
`#{@blocks_concurrency * Block.Fetcher.default_receipts_batch_size() * Block.Fetcher.default_receipts_batch_size()}`
`#{@blocks_concurrency_default * Block.Fetcher.default_receipts_batch_size() * Block.Fetcher.default_receipts_batch_size()}`
) receipts can be requested from the JSONRPC at once over all connections.
"""
def task(
%__MODULE__{
blocks_batch_size: blocks_batch_size,
block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}
} = state
) do
Expand Down Expand Up @@ -111,7 +106,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
false

_ ->
step = step(first, last, blocks_batch_size)
step = step(first, last, blocks_batch_size())
sequence_opts = put_memory_monitor([ranges: missing_ranges, step: step], state)
gen_server_opts = [name: @sequence_name]
{:ok, sequence} = Sequence.start_link(sequence_opts, gen_server_opts)
Expand All @@ -132,6 +127,31 @@ defmodule Indexer.Block.Catchup.Fetcher do
end
end

def blocks_batch_size() do
blocks_batch_size_str = System.get_env("INDEXER_CATCHUP_BLOCKS_BATCH_SIZE", to_string(@blocks_batch_size_default))

case Integer.parse(blocks_batch_size_str) do
{batch_size, ""} ->
batch_size

_ ->
@blocks_batch_size_default
end
end

def blocks_concurrency() do
blocks_concurrency_str =
System.get_env("INDEXER_CATCHUP_BLOCKS_CONCURRENCY", to_string(@blocks_concurrency_default))

case Integer.parse(blocks_concurrency_str) do
{concurrency, ""} ->
concurrency

_ ->
@blocks_concurrency_default
end
end

defp fetch_last_block(json_rpc_named_arguments) do
case latest_block() do
nil ->
Expand Down Expand Up @@ -184,13 +204,13 @@ defmodule Indexer.Block.Catchup.Fetcher do
async_import_token_instances(imported)
end

defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence)
defp stream_fetch_and_import(state, sequence)
when is_pid(sequence) do
sequence
|> Sequence.build_stream()
|> Task.async_stream(
&fetch_and_import_range_from_sequence(state, &1, sequence),
max_concurrency: blocks_concurrency,
max_concurrency: blocks_concurrency(),
timeout: :infinity
)
|> Stream.run()
Expand Down
Expand Up @@ -207,7 +207,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do

{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)

default_blocks_batch_size = Catchup.Fetcher.default_blocks_batch_size()
default_blocks_batch_size = Catchup.Fetcher.blocks_batch_size()

assert latest_block_number > default_blocks_batch_size

Expand Down
2 changes: 2 additions & 0 deletions docker-compose/envs/common-blockscout.env
Expand Up @@ -81,6 +81,8 @@ DISABLE_INDEXER=false
DISABLE_REALTIME_INDEXER=false
INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=false
INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_CATCHUP_BLOCKS_BATCH_SIZE=
# INDEXER_CATCHUP_BLOCKS_CONCURRENCY=
# WEBAPP_URL=
# API_URL=
WOBSERVER_ENABLED=false
Expand Down
30 changes: 18 additions & 12 deletions docker/Makefile
Expand Up @@ -370,30 +370,18 @@ endif
ifdef API_RATE_LIMIT_WHITELISTED_IPS
BLOCKSCOUT_CONTAINER_PARAMS += -e 'API_RATE_LIMIT_WHITELISTED_IPS=$(API_RATE_LIMIT_WHITELISTED_IPS)'
endif
ifdef INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER)'
endif
ifdef INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER)'
endif
ifdef TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES
BLOCKSCOUT_CONTAINER_PARAMS += -e 'TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES=$(TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES)'
endif
ifdef COIN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES
BLOCKSCOUT_CONTAINER_PARAMS += -e 'COIN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES=$(COIN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD_MINUTES)'
endif
ifdef INDEXER_MEMORY_LIMIT
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_MEMORY_LIMIT=$(INDEXER_MEMORY_LIMIT)'
endif
ifdef ETHEREUM_JSONRPC_DEBUG_TRACE_TRANSACTION_TIMEOUT
BLOCKSCOUT_CONTAINER_PARAMS += -e 'ETHEREUM_JSONRPC_DEBUG_TRACE_TRANSACTION_TIMEOUT=$(ETHEREUM_JSONRPC_DEBUG_TRACE_TRANSACTION_TIMEOUT)'
endif
ifdef FETCH_REWARDS_WAY
BLOCKSCOUT_CONTAINER_PARAMS += -e 'FETCH_REWARDS_WAY=$(FETCH_REWARDS_WAY)'
endif
ifdef INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)'
endif
ifdef FOOTER_CHAT_LINK
BLOCKSCOUT_CONTAINER_PARAMS += -e 'FOOTER_CHAT_LINK=$(FOOTER_CHAT_LINK)'
endif
Expand Down Expand Up @@ -436,6 +424,15 @@ endif
ifdef ENABLE_TXS_STATS
BLOCKSCOUT_CONTAINER_PARAMS += -e 'ENABLE_TXS_STATS=$(ENABLE_TXS_STATS)'
endif
ifdef INDEXER_MEMORY_LIMIT
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_MEMORY_LIMIT=$(INDEXER_MEMORY_LIMIT)'
endif
ifdef INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER)'
endif
ifdef INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=$(INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER)'
endif
ifdef INDEXER_DISABLE_BLOCK_REWARD_FETCHER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_BLOCK_REWARD_FETCHER=$(INDEXER_DISABLE_BLOCK_REWARD_FETCHER)'
endif
Expand All @@ -448,6 +445,15 @@ endif
ifdef INDEXER_DISABLE_EMPTY_BLOCK_SANITIZER
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_DISABLE_EMPTY_BLOCK_SANITIZER=$(INDEXER_DISABLE_EMPTY_BLOCK_SANITIZER)'
endif
ifdef INDEXER_CATCHUP_BLOCKS_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_CATCHUP_BLOCKS_BATCH_SIZE=$(INDEXER_CATCHUP_BLOCKS_BATCH_SIZE)'
endif
ifdef INDEXER_CATCHUP_BLOCKS_CONCURRENCY
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_CATCHUP_BLOCKS_CONCURRENCY=$(INDEXER_CATCHUP_BLOCKS_CONCURRENCY)'
endif
ifdef INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE=$(INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE)'
endif
ifdef SECRET_KEY_BASE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'SECRET_KEY_BASE=$(SECRET_KEY_BASE)'
endif
Expand Down

0 comments on commit 13f4072

Please sign in to comment.