Skip to content

Commit

Permalink
Add support for syncing chain headers in parallel
Browse files Browse the repository at this point in the history
The pace is controlled by the header_sync_jobs command line and config file parameter.

Syncing historical headers is a slow process mostly because there are simply too many historical
blocks and transactions to download, the serialization format is not very compact, and API does not
batch them. The commit provides a very straightforward and simple way to speed it up.
  • Loading branch information
ldmberman committed Jun 7, 2021
1 parent dfbe778 commit 935e798
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 59 deletions.
7 changes: 6 additions & 1 deletion apps/arweave/include/ar_config.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
%% and downloads it from peers.
-define(DEFAULT_SYNC_JOBS, 20).

%% The number of header sync jobs to run. Each job picks the latest not synced
%% block header and downloads it from peers.
-define(DEFAULT_HEADER_SYNC_JOBS, 1).

%% The default expiration time for a data root in the disk pool.
-define(DEFAULT_DISK_POOL_DATA_ROOT_EXPIRATION_TIME_S, 2 * 60 * 60).

Expand All @@ -34,7 +38,7 @@
-endif.

%% The default frequency of checking for the available disk space.
-define(DISK_SPACE_CHECK_FREQUENCY_MS, 5 * 60 * 1000).
-define(DISK_SPACE_CHECK_FREQUENCY_MS, 30 * 1000).

-define(NUM_STAGE_ONE_HASHING_PROCESSES,
max(1, (erlang:system_info(schedulers_online) div 2))).
Expand Down Expand Up @@ -74,6 +78,7 @@
max_emitters = ?NUM_EMITTER_PROCESSES,
tx_propagation_parallelization = ?TX_PROPAGATION_PARALLELIZATION,
sync_jobs = ?DEFAULT_SYNC_JOBS,
header_sync_jobs = ?DEFAULT_HEADER_SYNC_JOBS,
load_key = not_set,
disk_space,
disk_space_check_frequency = ?DISK_SPACE_CHECK_FREQUENCY_MS,
Expand Down
17 changes: 9 additions & 8 deletions apps/arweave/include/ar_header_sync.hrl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% @doc The number of recent blocks tracked, used for erasing the orphans.
%% The number of recent blocks tracked, used for erasing the orphans.
-define(HEADER_SYNC_TRACK_CONFIRMATIONS, 100).

%% @doc The size in bytes of a portion of the disk space reserved for recent block
%% The size in bytes of a portion of the disk space reserved for recent block
%% and transaction headers. The node stops syncing historical headers when the remaining
%% disk space is smaller than this amount.
-ifdef(DEBUG).
Expand All @@ -10,7 +10,7 @@
-define(DISK_HEADERS_BUFFER_SIZE, 10 * 1024 * 1024 * 1024).
-endif.

%% @doc The threshold in bytes for the remaining disk space. When reached, the node
%% The threshold in bytes for the remaining disk space. When reached, the node
%% removes the oldest of the stored block and transaction headers until the space
%% exceeding this threshold is available.
-ifdef(DEBUG).
Expand All @@ -19,14 +19,14 @@
-define(DISK_HEADERS_CLEANUP_THRESHOLD, 2 * 1024 * 1024 * 1024).
-endif.

%% @doc The frequency of processing items in the queue.
%% The frequency of processing items in the queue.
-ifdef(DEBUG).
-define(PROCESS_ITEM_INTERVAL_MS, 1000).
-else.
-define(PROCESS_ITEM_INTERVAL_MS, 100).
-endif.

%% @doc The frequency of checking if there are headers to sync after everything
%% The frequency of checking if there are headers to sync after everything
%% is synced. Also applies to a fresh node without any data waiting for a block index.
%% Another case is when the process misses a few blocks (e.g. blocks were sent while the
%% supervisor was restarting it after a crash).
Expand All @@ -36,10 +36,11 @@
-define(CHECK_AFTER_SYNCED_INTERVAL_MS, 5000).
-endif.

%% @doc The initial value for the exponential backoff for failing requests.
%% The initial value for the exponential backoff for failing requests.
-define(INITIAL_BACKOFF_INTERVAL_S, 30).
%% @doc The maximum exponential backoff interval for failing requests.

%% The maximum exponential backoff interval for failing requests.
-define(MAX_BACKOFF_INTERVAL_S, 2 * 60 * 60).

%% @doc The frequency of storing the server state on disk.
%% The frequency of storing the server state on disk.
-define(STORE_HEADER_STATE_FREQUENCY_MS, 30000).
9 changes: 9 additions & 0 deletions apps/arweave/src/ar.erl
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ show_help() ->
" Each job periodically picks a range and downloads it from peers.",
[?DEFAULT_SYNC_JOBS]
)},
{"header_sync_jobs (num)",
io_lib:format(
"The number of header syncing jobs to run. Default: ~B."
" Each job periodically picks the latest not synced block header"
" and downloads it from peers.",
[?DEFAULT_HEADER_SYNC_JOBS]
)},
{"load_mining_key (file)",
"Load the address that mining rewards should be credited to from file."},
{"ipfs_pin", "Pin incoming IPFS tagged transactions on your local IPFS node."},
Expand Down Expand Up @@ -328,6 +335,8 @@ parse_cli_args(["max_propagation_peers", Num|Rest], C) ->
parse_cli_args(Rest, C#config { max_propagation_peers = list_to_integer(Num) });
parse_cli_args(["sync_jobs", Num|Rest], C) ->
parse_cli_args(Rest, C#config { sync_jobs = list_to_integer(Num) });
parse_cli_args(["header_sync_jobs", Num|Rest], C) ->
parse_cli_args(Rest, C#config { header_sync_jobs = list_to_integer(Num) });
parse_cli_args(["tx_propagation_parallelization", Num|Rest], C) ->
parse_cli_args(Rest, C#config { tx_propagation_parallelization = list_to_integer(Num) });
parse_cli_args(["max_connections", Num | Rest], C) ->
Expand Down
6 changes: 6 additions & 0 deletions apps/arweave/src/ar_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ parse_options([{<<"sync_jobs">>, Value} | Rest], Config)
parse_options([{<<"sync_jobs">>, Value} | _], _) ->
{error, {bad_type, sync_jobs, number}, Value};

parse_options([{<<"header_sync_jobs">>, Value} | Rest], Config)
when is_integer(Value) ->
parse_options(Rest, Config#config{ header_sync_jobs = Value });
parse_options([{<<"header_sync_jobs">>, Value} | _], _) ->
{error, {bad_type, header_sync_jobs, number}, Value};

parse_options([{<<"load_mining_key">>, DataDir} | Rest], Config) when is_binary(DataDir) ->
parse_options(Rest, Config#config{ load_key = binary_to_list(DataDir) });
parse_options([{<<"load_mining_key">>, DataDir} | _], _) ->
Expand Down
120 changes: 75 additions & 45 deletions apps/arweave/src/ar_header_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ request_tx_removal(TXID) ->
init([]) ->
?LOG_INFO([{event, ar_header_sync_start}]),
process_flag(trap_exit, true),
{ok, Config} = application:get_env(arweave, config),
{ok, DB} = ar_kv:open("ar_header_sync_db"),
{SyncRecord, LastHeight, CurrentBI} =
case ar_storage:read_term(header_sync_state) of
Expand All @@ -60,7 +61,13 @@ init([]) ->
StoredState
end,
gen_server:cast(?MODULE, check_space_alarm),
gen_server:cast(?MODULE, check_space_process_item),
gen_server:cast(?MODULE, check_space),
lists:foreach(
fun(_) ->
gen_server:cast(?MODULE, process_item)
end,
lists:seq(1, Config#config.header_sync_jobs)
),
gen_server:cast(?MODULE, store_sync_state),
{ok,
#{
Expand All @@ -70,8 +77,8 @@ init([]) ->
block_index => CurrentBI,
queue => queue:new(),
last_picked => LastHeight,
cleanup_started => false,
disk_full => false
disk_full => false,
sync_disk_space => have_free_space()
}}.

handle_cast({join, BI, Blocks}, State) ->
Expand Down Expand Up @@ -164,31 +171,38 @@ handle_cast(check_space_alarm, State) ->
cast_after(?DISK_SPACE_WARNING_FREQUENCY, check_space_alarm),
{noreply, State};

handle_cast(check_space_process_item, #{ cleanup_started := CleanupStarted } = State) ->
handle_cast(check_space, State) ->
case have_free_space() of
true ->
gen_server:cast(self(), process_item),
{noreply, State#{ cleanup_started => false }};
cast_after(ar_disksup:get_disk_space_check_frequency(), check_space),
{noreply, State#{ sync_disk_space => true }};
false ->
case should_cleanup() of
true ->
case CleanupStarted of
true ->
ok;
false ->
?LOG_INFO([
{event, ar_header_sync_removing_oldest_headers},
{reason, little_disk_space_left}
])
end,
gen_server:cast(?MODULE, check_space_process_item),
{noreply, remove_oldest_headers(State#{ cleanup_started => true })};
?LOG_INFO([
{event, ar_header_sync_removing_oldest_headers},
{reason, little_disk_space_left}
]),
gen_server:cast(?MODULE, cleanup_oldest);
false ->
cast_after(?CHECK_AFTER_SYNCED_INTERVAL_MS, check_space_process_item),
{noreply, State#{ cleanup_started => false }}
end
cast_after(ar_disksup:get_disk_space_check_frequency(), check_space)
end,
{noreply, State#{ sync_disk_space => false }}
end;

handle_cast(cleanup_oldest, State) ->
case have_free_space() of
true ->
cast_after(ar_disksup:get_disk_space_check_frequency(), check_space),
{noreply, State#{ sync_disk_space => true }};
false ->
gen_server:cast(?MODULE, cleanup_oldest),
{noreply, remove_oldest_headers(State)}
end;

handle_cast(process_item, #{ sync_disk_space := false } = State) ->
cast_after(?CHECK_AFTER_SYNCED_INTERVAL_MS, process_item),
{noreply, State};
handle_cast(process_item, State) ->
#{
queue := Queue,
Expand All @@ -200,7 +214,6 @@ handle_cast(process_item, State) ->
UpdatedQueue = process_item(Queue),
case pick_unsynced_block(LastPicked, SyncRecord) of
nothing_to_sync ->
cast_after(?CHECK_AFTER_SYNCED_INTERVAL_MS, check_space_process_item),
LastPicked2 =
case queue:is_empty(UpdatedQueue) of
true ->
Expand All @@ -210,19 +223,9 @@ handle_cast(process_item, State) ->
end,
{noreply, State#{ queue => UpdatedQueue, last_picked => LastPicked2 }};
Height ->
cast_after(?PROCESS_ITEM_INTERVAL_MS, check_space_process_item),
case ar_node:get_block_index_entry(Height) of
true ->
{noreply, State#{ queue => UpdatedQueue }};
not_joined ->
{noreply, State#{ queue => UpdatedQueue }};
not_found ->
?LOG_ERROR([
{event, ar_header_sync_block_index_entry_not_found},
{height, Height},
{sync_record, SyncRecord}
]),
{noreply, State#{ queue => UpdatedQueue }};
{H, _WeaveSize, TXRoot} ->
%% Before 2.0, to compute a block hash, the complete wallet list
%% and all the preceding hashes were required. Getting a wallet list
Expand All @@ -243,6 +246,12 @@ handle_cast(process_item, State) ->
end
end;

handle_cast({failed_to_get_block, H, H2, TXRoot, Backoff}, #{ queue := Queue } = State) ->
Backoff2 = update_backoff(Backoff),
Queue2 = enqueue({block, {H, H2, TXRoot}}, Backoff2, Queue),
gen_server:cast(?MODULE, process_item),
{noreply, State#{ queue => Queue2 }};

handle_cast({remove_tx, TXID}, State) ->
{ok, _Size} = ar_storage:delete_tx(TXID),
ar_tx_blacklist:notify_about_removed_tx(TXID),
Expand All @@ -264,6 +273,19 @@ handle_cast(Msg, State) ->
handle_call(_Msg, _From, State) ->
{reply, not_implemented, State}.

handle_info({'DOWN', _, process, _, normal}, State) ->
{noreply, State};
handle_info({'DOWN', _, process, _, noproc}, State) ->
{noreply, State};
handle_info({'DOWN', _, process, _, Reason}, State) ->
?LOG_WARNING([
{event, header_sync_job_failed},
{reason, io_lib:format("~p", [Reason])},
{action, spawning_another_one}
]),
gen_server:cast(?MODULE, process_item),
{noreply, State};

handle_info({_Ref, _Atom}, State) ->
%% Some older versions of Erlang OTP have a bug where gen_tcp:close may leak
%% a message. https://github.com/ninenines/gun/issues/193,
Expand Down Expand Up @@ -356,7 +378,9 @@ remove_oldest_headers(#{ db := DB, sync_record := SyncRecord } = State) ->
end.

cast_after(Delay, Message) ->
timer:apply_after(Delay, gen_server, cast, [self(), Message]).
%% Not using timer:apply_after here because send_after is more efficient:
%% http://erlang.org/doc/efficiency_guide/commoncaveats.html#timer-module.
erlang:send_after(Delay, ?MODULE, {'$gen_cast', Message}).

%% @doc Pick the biggest height smaller than LastPicked from outside the sync record.
pick_unsynced_block(LastPicked, SyncRecord) ->
Expand Down Expand Up @@ -391,19 +415,25 @@ process_item(Queue) ->
Now = os:system_time(second),
case queue:out(Queue) of
{empty, _Queue} ->
cast_after(?PROCESS_ITEM_INTERVAL_MS, process_item),
Queue;
{{value, {Item, {BackoffTimestamp, _} = Backoff}}, UpdatedQueue}
when BackoffTimestamp > Now ->
cast_after(?PROCESS_ITEM_INTERVAL_MS, process_item),
enqueue(Item, Backoff, UpdatedQueue);
{{value, {{block, {H, H2, TXRoot}}, Backoff}}, UpdatedQueue} ->
case download_block(H, H2, TXRoot) of
{error, _Reason} ->
UpdatedBackoff = update_backoff(Backoff),
enqueue({block, {H, H2, TXRoot}}, UpdatedBackoff, UpdatedQueue);
{ok, B} ->
gen_server:cast(self(), {add_block, B}),
UpdatedQueue
end
monitor(process, spawn(
fun() ->
case download_block(H, H2, TXRoot) of
{error, _Reason} ->
gen_server:cast(?MODULE, {failed_to_get_block, H, H2, TXRoot, Backoff});
{ok, B} ->
gen_server:cast(?MODULE, {add_block, B}),
cast_after(?PROCESS_ITEM_INTERVAL_MS, process_item)
end
end
)),
UpdatedQueue
end.

enqueue(Item, Backoff, Queue) ->
Expand Down Expand Up @@ -483,11 +513,11 @@ download_txs(Peers, B, TXRoot) ->
end
end;
_ ->
?LOG_WARNING([
{event, ar_header_sync_block_tx_root_mismatch},
{block, ar_util:encode(B#block.indep_hash)}
]),
{error, block_tx_root_mismatch}
?LOG_WARNING([
{event, ar_header_sync_block_tx_root_mismatch},
{block, ar_util:encode(B#block.indep_hash)}
]),
{error, block_tx_root_mismatch}
end;
{error, txs_exceed_block_size_limit} ->
?LOG_WARNING([
Expand Down
1 change: 1 addition & 0 deletions apps/arweave/test/ar_config_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ parse_config() ->
max_propagation_peers = 8,
tx_propagation_parallelization = 4,
sync_jobs = 10,
header_sync_jobs = 1,
load_key = "some_key_file",
disk_space = 44 * 1024 * 1024 * 1024,
disk_space_check_frequency = 10 * 1000,
Expand Down
1 change: 1 addition & 0 deletions apps/arweave/test/ar_config_tests_config_fixture.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"max_propagation_peers": 8,
"tx_propagation_parallelization": 4,
"sync_jobs": 10,
"header_sync_jobs": 1,
"load_mining_key": "some_key_file",
"transaction_blacklists": [
"some_blacklist_1",
Expand Down
10 changes: 5 additions & 5 deletions apps/arweave/test/ar_header_sync_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
-include_lib("kernel/include/file.hrl").

-import(ar_test_node, [
start/3,
start/1,
join_on_master/0,
slave_call/3,
sign_tx/3, assert_post_tx_to_master/1,
Expand All @@ -21,8 +21,7 @@ syncs_headers_test_() ->
test_syncs_headers() ->
Wallet = {_, Pub} = ar_wallet:new(),
[B0] = ar_weave:init([{ar_wallet:to_address(Pub), ?AR(200), <<>>}]),
{ok, Config} = application:get_env(arweave, config),
{_Master, _} = start(B0, unclaimed, Config#config{ disk_space_check_frequency = 1 }),
{_Master, _} = start(B0),
post_random_blocks(Wallet, ?MAX_TX_ANCHOR_DEPTH + 5, B0),
join_on_master(),
BI = assert_slave_wait_until_height(?MAX_TX_ANCHOR_DEPTH + 5),
Expand All @@ -49,6 +48,7 @@ test_syncs_headers() ->
lists:reverse(lists:seq(0, 5))
),
B1 = ar_storage:read_block(1, BI),
{ok, Config} = application:get_env(arweave, config),
application:set_env(
arweave,
config,
Expand All @@ -64,7 +64,7 @@ test_syncs_headers() ->
end
end,
200,
?CHECK_AFTER_SYNCED_INTERVAL_MS * 2
Config#config.disk_space_check_frequency * 2
),
?assertEqual([unavailable || _ <- B0#block.txs], ar_storage:read_tx(B0#block.txs)),
application:set_env(arweave, config, Config),
Expand All @@ -88,7 +88,7 @@ test_syncs_headers() ->
end
end,
200,
?CHECK_AFTER_SYNCED_INTERVAL_MS * 2
Config#config.disk_space_check_frequency * 2
),
?assertEqual([unavailable || _ <- B1#block.txs], ar_storage:read_tx(B1#block.txs)),
application:set_env(arweave, config, Config).
Expand Down
2 changes: 2 additions & 0 deletions apps/arweave/test/ar_test_node.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ start(B0, RewardAddr, Config) ->
start_from_block_index = true,
peers = [],
mining_addr = RewardAddr,
disk_space_check_frequency = 1000,
header_sync_jobs = 4,
enable = [search_in_rocksdb_when_mining, serve_arql, serve_wallet_txs,
serve_wallet_deposits]
}),
Expand Down

0 comments on commit 935e798

Please sign in to comment.