Skip to content

Commit

Permalink
Add CoinBalanceDailyUpdater
Browse files Browse the repository at this point in the history
  • Loading branch information
Qwerty5Uiop committed Nov 13, 2023
1 parent b70f95c commit 6377c97
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -9,6 +9,7 @@
- [#8750](https://github.com/blockscout/blockscout/pull/8750) - Support new eth-bytecode-db request metadata fields
- [#8634](https://github.com/blockscout/blockscout/pull/8634) - API v2: NFT for address
- [#8609](https://github.com/blockscout/blockscout/pull/8609) - Change logs format to JSON; Add endpoint url to the block_scout_web logging
- [#8558](https://github.com/blockscout/blockscout/pull/8558) - Add CoinBalanceDailyUpdater

### Fixes

Expand Down
7 changes: 0 additions & 7 deletions apps/indexer/lib/indexer/block/fetcher.ex
Expand Up @@ -173,12 +173,6 @@ defmodule Indexer.Block.Fetcher do
withdrawals: withdrawals_params
}
|> AddressCoinBalances.params_set(),
coin_balances_params_daily_set =
%{
coin_balances_params: coin_balances_params_set,
blocks: blocks
}
|> AddressCoinBalancesDaily.params_set(),
beneficiaries_with_gas_payment =
beneficiaries_with_gas_payment(blocks, beneficiary_params_set, transactions_with_receipts),
address_token_balances = AddressTokenBalances.params_set(%{token_transfers_params: token_transfers}),
Expand All @@ -188,7 +182,6 @@ defmodule Indexer.Block.Fetcher do
basic_import_options = %{
addresses: %{params: addresses},
address_coin_balances: %{params: coin_balances_params_set},
address_coin_balances_daily: %{params: coin_balances_params_daily_set},
address_token_balances: %{params: address_token_balances},
address_current_token_balances: %{
params: address_token_balances |> MapSet.to_list() |> TokenBalances.to_address_current_token_balances()
Expand Down
10 changes: 4 additions & 6 deletions apps/indexer/lib/indexer/block/realtime/fetcher.ex
Expand Up @@ -33,7 +33,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
alias Explorer.Utility.MissingRangesManipulator
alias Indexer.{Block, Tracer}
alias Indexer.Block.Realtime.TaskSupervisor
alias Indexer.Fetcher.CoinBalance
alias Indexer.Fetcher.{CoinBalance, CoinBalanceDailyUpdater}
alias Indexer.Fetcher.PolygonEdge.{DepositExecute, Withdrawal}
alias Indexer.Prometheus
alias Indexer.Transform.Addresses
Expand Down Expand Up @@ -195,7 +195,6 @@ defmodule Indexer.Block.Realtime.Fetcher do
block_fetcher,
%{
address_coin_balances: %{params: address_coin_balances_params},
address_coin_balances_daily: %{params: address_coin_balances_daily_params},
address_hash_to_fetched_balance_block_number: address_hash_to_block_number,
addresses: %{params: addresses_params},
block_rewards: block_rewards
Expand All @@ -212,8 +211,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
balances(block_fetcher, %{
address_hash_to_block_number: address_hash_to_block_number,
addresses_params: addresses_params,
balances_params: address_coin_balances_params,
balances_daily_params: address_coin_balances_daily_params
balances_params: address_coin_balances_params
})},
{block_reward_errors, chain_import_block_rewards} = Map.pop(block_rewards, :errors),
chain_import_options =
Expand All @@ -222,8 +220,8 @@ defmodule Indexer.Block.Realtime.Fetcher do
|> put_in([:addresses, :params], balances_addresses_params)
|> put_in([:blocks, :params, Access.all(), :consensus], true)
|> put_in([:block_rewards], chain_import_block_rewards)
|> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params)
|> put_in([Access.key(:address_coin_balances_daily, %{}), :params], balances_daily_params),
|> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params),
CoinBalanceDailyUpdater.add_daily_balances_params(balances_daily_params),
{:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do
async_import_remaining_block_data(
imported,
Expand Down
5 changes: 3 additions & 2 deletions apps/indexer/lib/indexer/fetcher/block_reward.ex
Expand Up @@ -20,7 +20,7 @@ defmodule Indexer.Fetcher.BlockReward do
alias Explorer.Chain.Cache.Accounts
alias Indexer.{BufferedTask, Tracer}
alias Indexer.Fetcher.BlockReward.Supervisor, as: BlockRewardSupervisor
alias Indexer.Fetcher.CoinBalance
alias Indexer.Fetcher.{CoinBalance, CoinBalanceDailyUpdater}
alias Indexer.Transform.{AddressCoinBalances, AddressCoinBalancesDaily, Addresses}

@behaviour BufferedTask
Expand Down Expand Up @@ -292,10 +292,11 @@ defmodule Indexer.Fetcher.BlockReward do
address_coin_balances_daily_params_set =
AddressCoinBalancesDaily.params_set(address_coin_balances_params_with_block_timestamp_set)

CoinBalanceDailyUpdater.add_daily_balances_params(address_coin_balances_daily_params_set)

Chain.import(%{
addresses: %{params: addresses_params},
address_coin_balances: %{params: address_coin_balances_params_set},
address_coin_balances_daily: %{params: address_coin_balances_daily_params_set},
block_rewards: %{params: block_rewards_params}
})
end
Expand Down
68 changes: 68 additions & 0 deletions apps/indexer/lib/indexer/fetcher/coin_balance_daily_updater.ex
@@ -0,0 +1,68 @@
defmodule Indexer.Fetcher.CoinBalanceDailyUpdater do
@moduledoc """
Accumulates and periodically updates daily coin balances
"""

use GenServer

alias Explorer.Chain
alias Explorer.Counters.AverageBlockTime
alias Timex.Duration

@default_update_interval :timer.seconds(10)

def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end

@impl true
def init(_) do
schedule_next_update()

{:ok, %{}}
end

def add_daily_balances_params(daily_balances_params) do
GenServer.cast(__MODULE__, {:add_daily_balances_params, daily_balances_params})
end

@impl true
def handle_cast({:add_daily_balances_params, daily_balances_params}, state) do
{:noreply, Enum.reduce(daily_balances_params, state, &put_new_param/2)}
end

defp put_new_param(%{day: day, address_hash: address_hash, value: value} = param, acc) do
Map.update(acc, {address_hash, day}, param, fn %{value: old_value} = old_param ->
if is_nil(old_value) or value > old_value, do: param, else: old_param
end)
end

@impl true
def handle_info(:update, state) when state == %{} do
schedule_next_update()

{:noreply, %{}}
end

def handle_info(:update, state) do
Chain.import(%{address_coin_balances_daily: %{params: Map.values(state)}})

schedule_next_update()

{:noreply, %{}}
end

def handle_info(_, state) do
{:noreply, state}
end

defp schedule_next_update do
update_interval =
case AverageBlockTime.average_block_time() do
{:error, :disabled} -> @default_update_interval
block_time -> round(Duration.to_milliseconds(block_time))
end

Process.send_after(self(), :update, update_interval)
end
end
2 changes: 2 additions & 0 deletions apps/indexer/lib/indexer/supervisor.ex
Expand Up @@ -21,6 +21,7 @@ defmodule Indexer.Supervisor do
alias Indexer.Fetcher.{
BlockReward,
CoinBalance,
CoinBalanceDailyUpdater,
ContractCode,
EmptyBlocksSanitizer,
InternalTransaction,
Expand Down Expand Up @@ -142,6 +143,7 @@ defmodule Indexer.Supervisor do
{EmptyBlocksSanitizer.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments]]},
{PendingTransactionsSanitizer, [[json_rpc_named_arguments: json_rpc_named_arguments]]},
{TokenTotalSupplyUpdater, [[]]},
{CoinBalanceDailyUpdater, [[]]},

# Temporary workers
{UncatalogedTokenTransfers.Supervisor, [[]]},
Expand Down

0 comments on commit 6377c97

Please sign in to comment.