Skip to content

Commit

Permalink
Rework header syncing for SPoRA
Browse files Browse the repository at this point in the history
The main goal of the commit is to migrate the data of v1 transactions to the v2 index.
v1 transactions constitute > 400 GiB of the current weave. In SPoRA it is crucial to
access any historical chunk by offset as fast as possible therefore it is necessary to move
the historical data to the v2 index and also store fresh v1 data there.

The header syncing process is extended to manage the migration - it traverses the historical
headers, builds and records the data roots (then we can reuse the existing interface of
submitting data chunks without knowing their absolute offsets - ar_data_sync:add_chunk_async/1),
and moves the data to the v2 index, including some early v2 data stored in the tx_data files.

Block and transaction headers and data of the abandoned forks are cached in memory now. The change
was motivated by the need to cache chunks of v1 data from the orphaned blocks while retaining
access to them (previously, this data was a part of the tx header cached on disk).
Naturally, it comes with the reduced space amplification - the orphans no longer take up
disk space (and there were no cleanup process in place)! It comes at a cost of extra
RAM usage required for the cache, what should be acceptable considering SPoRA is going to be
very RAM-heavy anyway.

Sync block headers from latest to oldest. Syncing in the random order is the artifact of
the times when the protocol used recall blocks instead of recall data chunks for proving access.

Record synced block headers - allows to get rid of traversing the block index on every startup.
Moreover, recorded block headers will make it easy to clean up the old headers to free up space
for the new headers when the limited disk space feature is introduced.

The initialization of the data syncing process and the process managing the wallet trees is
streamlined. The change is motivated by the desire to make the header syncing process
initialization work the same way the data syncing initialization process works as these two
are similar in nature.

Data roots of the transactions fetched during joining are recorded in the v2 index so that
fetched v1 data can be uploaded to the v2 index via ar_data_sync:add_chunk_async/1.

Additionally, 2.0 hashes for all 1.0 blocks are checked out so that we can do safe and quick
verification of the inclusion of the pre-2.0 blocks.
  • Loading branch information
ldmberman committed Dec 14, 2020
1 parent 93323da commit 60de0fc
Show file tree
Hide file tree
Showing 30 changed files with 1,299 additions and 957 deletions.
24 changes: 15 additions & 9 deletions apps/arweave/src/ar.erl
Expand Up @@ -375,6 +375,9 @@ start(normal, _Args) ->
ar_meta_db:put(max_disk_pool_buffer_mb, MaxDiskPoolBuffer),
ar_meta_db:put(max_disk_pool_data_root_buffer_mb, MaxDiskPoolDataRootBuffer),
ar_meta_db:put(randomx_bulk_hashing_iterations, RandomXBulkHashingIterations),
%% Store enabled features.
lists:foreach(fun(Feature) -> ar_meta_db:put(Feature, true) end, Enable),
lists:foreach(fun(Feature) -> ar_meta_db:put(Feature, false) end, Disable),
%% Prepare the storage for operation.
ar_storage:start(),
%% Optionally clear the block cache.
Expand Down Expand Up @@ -437,6 +440,18 @@ start(normal, _Args) ->
)
end,
ar_randomx_state:start(),
{ok, _} = supervisor:start_child(Supervisor, #{
id => ar_data_sync,
start => {ar_data_sync_sup, start_link, [[]]},
type => supervisor,
shutdown => infinity
}),
{ok, _} = supervisor:start_child(Supervisor, #{
id => ar_header_sync,
start => {ar_header_sync_sup, start_link, [[]]},
type => supervisor,
shutdown => infinity
}),
{ok, _} = supervisor:start_child(Supervisor, #{
id => ar_node,
shutdown => infinity,
Expand Down Expand Up @@ -482,9 +497,6 @@ start(normal, _Args) ->
}
),
ar_node:add_peers(Node, Bridge),
%% Store enabled features.
lists:foreach(fun(Feature) -> ar_meta_db:put(Feature, true) end, Enable),
lists:foreach(fun(Feature) -> ar_meta_db:put(Feature, false) end, Disable),
PrintMiningAddress = case MiningAddress of
unclaimed -> "unclaimed";
_ -> binary_to_list(ar_util:encode(MiningAddress))
Expand Down Expand Up @@ -528,12 +540,6 @@ start(normal, _Args) ->
type => supervisor,
shutdown => infinity
}),
{ok, _} = supervisor:start_child(Supervisor, #{
id => ar_header_sync_sup,
start => {ar_header_sync_sup, start_link, [[]]},
type => supervisor,
shutdown => infinity
}),
if Mine -> ar_node:automine(Node); true -> do_nothing end,
case IPFSPin of
false -> ok;
Expand Down
21 changes: 1 addition & 20 deletions apps/arweave/src/ar_cleanup.erl
@@ -1,31 +1,12 @@
-module(ar_cleanup).

-export([rewrite/0, rewrite/1, remove_old_wallet_lists/0]).
-export([remove_old_wallet_lists/0]).

-include("ar.hrl").
-include_lib("eunit/include/eunit.hrl").

-define(KEEP_WALLET_LISTS, 1000).

%% @doc Rewrite every block in the hash list using the latest format.
%% In the case of upgrading a node from 1.1 to 1.5, this dramatically reduces
%% the size of the weave on disk (and on the wire).
rewrite() ->
rewrite(lists:reverse(ar_node:get_block_index(whereis(http_entrypoint_node)))).
rewrite(BI) -> rewrite(BI, BI).
rewrite([], _BI) -> [];
rewrite([{H, _, _} | Rest], BI) ->
try ar_storage:read_block(H) of
B when ?IS_BLOCK(B) ->
ar_storage:write_block(B),
ar:report([{rewrote_block, ar_util:encode(H)}]);
unavailable ->
do_nothing
catch _:_ ->
ar:report([{error_rewriting_block, ar_util:encode(H)}])
end,
rewrite(Rest, BI).

remove_old_wallet_lists() ->
DataDir = rpc:call('arweave@127.0.0.1', ar_meta_db, get, [data_dir], 5000),
WalletListDir = filename:join(DataDir, ?WALLET_LIST_DIR),
Expand Down

0 comments on commit 60de0fc

Please sign in to comment.