diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 5b7f59361..b0d97e429 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -135,7 +135,7 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) -> case ar_kv:get(DiskPoolChunksIndex, DiskPoolChunkKey) of {ok, _DiskPoolChunk} -> %% The chunk is already in disk pool. - synced; + {synced_disk_pool, EndOffset2}; not_found -> case DataRootOffsetReply of not_found -> @@ -165,6 +165,13 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) -> case CheckSynced of synced -> ok; + {synced_disk_pool, EndOffset4} -> + case is_estimated_long_term_chunk(DataRootOffsetReply, EndOffset4) of + false -> + temporary; + true -> + ok + end; {error, _} = Error4 -> Error4; {ok, {DataPathHash2, DiskPoolChunkKey2, {EndOffset3, PassesBase3, PassesStrict3, @@ -200,11 +207,43 @@ add_chunk(DataRoot, DataPath, Chunk, Offset, TXSize) -> ets:update_counter(ar_data_sync_state, disk_pool_size, {2, ChunkSize}), prometheus_gauge:inc(pending_chunks_size, ChunkSize), - ok + case is_estimated_long_term_chunk(DataRootOffsetReply, EndOffset3) of + false -> + temporary; + true -> + ok + end end end end. +%% @doc Return true if we expect the chunk with the given data root index value and +%% relative end offset to end up in one of the configured storage modules. +is_estimated_long_term_chunk(DataRootOffsetReply, EndOffset) -> + WeaveSize = ar_node:get_current_weave_size(), + case DataRootOffsetReply of + not_found -> + %% A chunk from a pending transaction. + is_offset_vicinity_covered(WeaveSize); + {ok, {TXStartOffset, _}} -> + WeaveSize = ar_node:get_current_weave_size(), + Size = ar_node:get_recent_max_block_size(), + AbsoluteEndOffset = TXStartOffset + EndOffset, + case AbsoluteEndOffset > WeaveSize - Size * 4 of + true -> + %% A relatively recent offset - do not expect this chunk to be + %% persisted unless we have some storage modules configured for + %% the space ahead (the data may be rearranged during after a reorg). + is_offset_vicinity_covered(AbsoluteEndOffset); + false -> + ar_storage_module:has_any(AbsoluteEndOffset) + end + end. + +is_offset_vicinity_covered(Offset) -> + Size = ar_node:get_recent_max_block_size(), + ar_storage_module:has_range(max(0, Offset - Size * 2), Offset + Size * 2). + %% @doc Notify the server about the new pending data root (added to mempool). %% The server may accept pending chunks and store them in the disk pool. add_data_root_to_disk_pool(_, 0, _) -> diff --git a/apps/arweave/src/ar_http_iface_middleware.erl b/apps/arweave/src/ar_http_iface_middleware.erl index 4fc0abc5a..b7181a3d4 100644 --- a/apps/arweave/src/ar_http_iface_middleware.erl +++ b/apps/arweave/src/ar_http_iface_middleware.erl @@ -2190,6 +2190,8 @@ handle_post_chunk(validate_proof, Proof, Req) -> receive ok -> {200, #{}, <<>>, Req}; + temporary -> + {303, #{}, <<>>, Req}; {error, data_root_not_found} -> {400, #{}, jiffy:encode(#{ error => data_root_not_found }), Req}; {error, exceeds_disk_pool_size_limit} -> diff --git a/apps/arweave/src/ar_node.erl b/apps/arweave/src/ar_node.erl index 2f964cc33..12340dd44 100644 --- a/apps/arweave/src/ar_node.erl +++ b/apps/arweave/src/ar_node.erl @@ -14,7 +14,8 @@ get_recent_txs_map/0, get_mempool_size/0, get_block_shadow_from_cache/1, get_recent_partition_upper_bound_by_prev_h/1, get_block_txs_pairs/0, get_partition_upper_bound/1, get_nth_or_last/2, - get_partition_number/1, get_max_partition_number/1]). + get_partition_number/1, get_max_partition_number/1, + get_current_weave_size/0, get_recent_max_block_size/0]). -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_config.hrl"). @@ -293,6 +294,18 @@ get_max_partition_number(infinity) -> get_max_partition_number(PartitionUpperBound) -> max(0, PartitionUpperBound div ?PARTITION_SIZE - 1). +%% @doc Return the current weave size. Assume the node has joined the network and +%% initialized the state. +get_current_weave_size() -> + [{_, WeaveSize}] = ets:lookup(node_state, weave_size), + WeaveSize. + +%% @doc Return the maximum block size among the latest ?BLOCK_INDEX_HEAD_LEN blocks. +%% Assume the node has joined the network and initialized the state. +get_recent_max_block_size() -> + [{_, MaxBlockSize}] = ets:lookup(node_state, recent_max_block_size), + MaxBlockSize. + %%%=================================================================== %%% Tests. %%%=================================================================== diff --git a/apps/arweave/src/ar_node_worker.erl b/apps/arweave/src/ar_node_worker.erl index e48e368ff..320a1f079 100644 --- a/apps/arweave/src/ar_node_worker.erl +++ b/apps/arweave/src/ar_node_worker.erl @@ -396,8 +396,10 @@ handle_info({event, nonce_limiter, initialized}, State) -> BlockTXPairs = [block_txs_pair(Block) || Block <- Blocks], {BlockAnchors, RecentTXMap} = get_block_anchors_and_recent_txs_map(BlockTXPairs), {Rate, ScheduledRate} = {B#block.usd_to_ar_rate, B#block.scheduled_usd_to_ar_rate}, + RecentBI2 = lists:sublist(BI, ?BLOCK_INDEX_HEAD_LEN), ets:insert(node_state, [ - {recent_block_index, lists:sublist(BI, ?BLOCK_INDEX_HEAD_LEN)}, + {recent_block_index, RecentBI2}, + {recent_max_block_size, get_max_block_size(RecentBI2)}, {is_joined, true}, {current, Current}, {timestamp, B#block.timestamp}, @@ -1023,6 +1025,17 @@ get_block_anchors_and_recent_txs_map(BlockTXPairs) -> lists:sublist(BlockTXPairs, ?MAX_TX_ANCHOR_DEPTH) ). +get_max_block_size([_SingleElement]) -> + 0; +get_max_block_size([{_BH, WeaveSize, _TXRoot} | BI]) -> + get_max_block_size(BI, WeaveSize, 0). + +get_max_block_size([], _WeaveSize, Max) -> + Max; +get_max_block_size([{_BH, PrevWeaveSize, _TXRoot} | BI], WeaveSize, Max) -> + Max2 = max(Max, WeaveSize - PrevWeaveSize), + get_max_block_size(BI, PrevWeaveSize, Max2). + apply_block(State) -> case ar_block_cache:get_earliest_not_validated_from_longest_chain(block_cache) of not_found -> @@ -1536,6 +1549,7 @@ apply_validated_block2(State, B, PrevBlocks, Orphans, RecentBI, BlockTXPairs) -> ar_storage:store_block_time_history_part(AddedBlocks, lists:last(PrevBlocks)), ets:insert(node_state, [ {recent_block_index, RecentBI2}, + {recent_max_block_size, get_max_block_size(RecentBI2)}, {current, B#block.indep_hash}, {timestamp, B#block.timestamp}, {wallet_list, B#block.wallet_list}, diff --git a/apps/arweave/src/ar_storage.erl b/apps/arweave/src/ar_storage.erl index 35a1e661a..296c2a448 100644 --- a/apps/arweave/src/ar_storage.erl +++ b/apps/arweave/src/ar_storage.erl @@ -680,6 +680,8 @@ write_tx_data(DataRoot, DataTree, Data, SizeTaggedChunks, TXID) -> case ar_data_sync:add_chunk(DataRoot, DataPath, Chunk, Offset - 1, TXSize) of ok -> Acc; + temporary -> + Acc; {error, Reason} -> ?LOG_WARNING([{event, failed_to_write_tx_chunk}, {tx, ar_util:encode(TXID)}, diff --git a/apps/arweave/src/ar_storage_module.erl b/apps/arweave/src/ar_storage_module.erl index 5b5a0a706..49b3f3c29 100644 --- a/apps/arweave/src/ar_storage_module.erl +++ b/apps/arweave/src/ar_storage_module.erl @@ -1,7 +1,10 @@ -module(ar_storage_module). -export([id/1, label/1, address_label/1, packing_label/1, label_by_id/1, - get_by_id/1, get_range/1, get_packing/1, get_size/1, get/2, get_all/1, get_all/2]). + get_by_id/1, get_range/1, get_packing/1, get_size/1, get/2, get_all/1, get_all/2, + has_any/1, has_range/2]). + +-export([get_unique_sorted_intervals/1]). -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_consensus.hrl"). @@ -9,6 +12,8 @@ -include_lib("eunit/include/eunit.hrl"). +-include_lib("eunit/include/eunit.hrl"). + %% The overlap makes sure a 100 MiB recall range can always be fetched %% from a single storage module. -ifdef(DEBUG). @@ -159,6 +164,23 @@ get_all(Start, End) -> {ok, Config} = application:get_env(arweave, config), get_all(Start, End, Config#config.storage_modules, []). +%% @doc Return true if the given Offset belongs to at least one storage module. +has_any(Offset) -> + {ok, Config} = application:get_env(arweave, config), + has_any(Offset, Config#config.storage_modules). + +%% @doc Return true if the given range is covered by the configured storage modules. +has_range(Start, End) -> + {ok, Config} = application:get_env(arweave, config), + case ets:lookup(?MODULE, unique_sorted_intervals) of + [] -> + Intervals = get_unique_sorted_intervals(Config#config.storage_modules), + ets:insert(?MODULE, {unique_sorted_intervals, Intervals}), + has_range(Start, End, Intervals); + [{_, Intervals}] -> + has_range(Start, End, Intervals) + end. + %%%=================================================================== %%% Private functions. %%%=================================================================== @@ -200,6 +222,41 @@ get_all(Start, End, [StorageModule | StorageModules], FoundModules) -> get_all(_Start, _End, [], FoundModules) -> FoundModules. +has_any(_Offset, []) -> + false; +has_any(Offset, [{BucketSize, Bucket, _Packing} | StorageModules]) -> + case Offset > Bucket * BucketSize andalso Offset =< (Bucket + 1) * BucketSize + ?OVERLAP of + true -> + true; + false -> + has_any(Offset, StorageModules) + end. + +get_unique_sorted_intervals(StorageModules) -> + get_unique_sorted_intervals(StorageModules, ar_intervals:new()). + +get_unique_sorted_intervals([], Intervals) -> + [{Start, End} || {End, Start} <- ar_intervals:to_list(Intervals)]; +get_unique_sorted_intervals([{BucketSize, Bucket, _Packing} | StorageModules], Intervals) -> + End = (Bucket + 1) * BucketSize, + Start = Bucket * BucketSize, + get_unique_sorted_intervals(StorageModules, ar_intervals:add(Intervals, End, Start)). + +has_range(PartitionStart, PartitionEnd, _Intervals) + when PartitionStart >= PartitionEnd -> + true; +has_range(_PartitionStart, _PartitionEnd, []) -> + false; +has_range(PartitionStart, _PartitionEnd, [{Start, _End} | _Intervals]) + when PartitionStart < Start -> + %% The given intervals are unique and sorted. + false; +has_range(PartitionStart, PartitionEnd, [{_Start, End} | Intervals]) + when PartitionStart >= End -> + has_range(PartitionStart, PartitionEnd, Intervals); +has_range(_PartitionStart, PartitionEnd, [{_Start, End} | Intervals]) -> + has_range(End, PartitionEnd, Intervals). + %%%=================================================================== %%% Tests. %%%=================================================================== @@ -223,6 +280,23 @@ label_test() -> ?assertEqual("storage_module_524288_2_3", label({524288, 2, {spora_2_6, <<"s÷">>}})). - - - +has_any_test() -> + ?assertEqual(false, has_any(0, [])), + ?assertEqual(false, has_any(0, [{10, 1, p}])), + ?assertEqual(false, has_any(10, [{10, 1, p}])), + ?assertEqual(true, has_any(11, [{10, 1, p}])), + ?assertEqual(true, has_any(20 + ?OVERLAP, [{10, 1, p}])), + ?assertEqual(false, has_any(20 + ?OVERLAP + 1, [{10, 1, p}])). + +get_unique_sorted_intervals_test() -> + ?assertEqual([{0, 24}, {90, 120}], + get_unique_sorted_intervals([{10, 0, p}, {30, 3, p}, {20, 0, p}, {12, 1, p}])). + +has_range_test() -> + ?assertEqual(false, has_range(0, 10, [])), + ?assertEqual(false, has_range(0, 10, [{0, 9}])), + ?assertEqual(true, has_range(0, 10, [{0, 10}])), + ?assertEqual(true, has_range(0, 10, [{0, 11}])), + ?assertEqual(true, has_range(0, 10, [{0, 9}, {9, 10}])), + ?assertEqual(true, has_range(5, 10, [{0, 9}, {9, 10}])), + ?assertEqual(true, has_range(5, 10, [{0, 2}, {2, 9}, {9, 10}])). diff --git a/apps/arweave/test/ar_data_sync_tests.erl b/apps/arweave/test/ar_data_sync_tests.erl index f56c0203d..23c3826a5 100644 --- a/apps/arweave/test/ar_data_sync_tests.erl +++ b/apps/arweave/test/ar_data_sync_tests.erl @@ -221,7 +221,7 @@ test_rejects_chunks_exceeding_disk_pool_limit() -> Data1 = crypto:strong_rand_bytes( (?DEFAULT_MAX_DISK_POOL_DATA_ROOT_BUFFER_MB * 1024 * 1024) + 1 ), - Chunks1 = split(?DATA_CHUNK_SIZE, Data1), + Chunks1 = imperfect_split(Data1), {DataRoot1, _} = ar_merkle:generate_tree( ar_tx:sized_chunks_to_sized_chunk_ids( ar_tx:chunks_to_size_tagged_chunks(Chunks1) @@ -249,7 +249,7 @@ test_rejects_chunks_exceeding_disk_pool_limit() -> ?DEFAULT_MAX_DISK_POOL_DATA_ROOT_BUFFER_MB - 1 ) * 1024 * 1024 ), - Chunks2 = split(Data2), + Chunks2 = imperfect_split(Data2), {DataRoot2, _} = ar_merkle:generate_tree( ar_tx:sized_chunks_to_sized_chunk_ids( ar_tx:chunks_to_size_tagged_chunks(Chunks2) @@ -275,7 +275,7 @@ test_rejects_chunks_exceeding_disk_pool_limit() -> byte_size(Data2), ?assert(Left < ?DEFAULT_MAX_DISK_POOL_DATA_ROOT_BUFFER_MB * 1024 * 1024), Data3 = crypto:strong_rand_bytes(Left + 1), - Chunks3 = split(Data3), + Chunks3 = imperfect_split(Data3), {DataRoot3, _} = ar_merkle:generate_tree( ar_tx:sized_chunks_to_sized_chunk_ids( ar_tx:chunks_to_size_tagged_chunks(Chunks3) @@ -303,12 +303,14 @@ test_rejects_chunks_exceeding_disk_pool_limit() -> true = ar_util:do_until( fun() -> %% After a block is mined, the chunks receive their absolute offsets, which - %% end up above the rebase threshold and so the node discovers the very last - %% chunks of the last two transactions are invalid under these offsets and - %% frees up 131072 + 131072 bytes in the disk pool => we can submit a 262144-byte - %% chunk. + %% end up above the strict data split threshold and so the node discovers + %% the very last chunks of the last two transactions are invalid under these + %% offsets and frees up 131072 + 131072 bytes in the disk pool => we can submit + %% a 262144-byte chunk. Also, expect 303 instead of 200 because the last block + %% was large such that the configured partitions do not cover at least two + %% times as much space ahead of the current weave size. case ar_test_node:post_chunk(main, ar_serialize:jsonify(FirstProof3)) of - {ok, {{<<"200">>, _}, _, _, _, _}} -> + {ok, {{<<"303">>, _}, _, _, _, _}} -> true; _ -> false @@ -1012,23 +1014,24 @@ v2_standard_split_get_chunks(<< _:262144/binary, LastChunk/binary >> = Rest, Chu v2_standard_split_get_chunks(<< Chunk:262144/binary, Rest/binary >>, Chunks, MinSize) -> v2_standard_split_get_chunks(Rest, [Chunk | Chunks], MinSize). -split(Data) -> - split(?DATA_CHUNK_SIZE, Data). +imperfect_split(Data) -> + imperfect_split(?DATA_CHUNK_SIZE, Data). -split(_ChunkSize, Bin) when byte_size(Bin) == 0 -> +imperfect_split(_ChunkSize, Bin) when byte_size(Bin) == 0 -> []; -split(ChunkSize, Bin) when byte_size(Bin) < ChunkSize -> +imperfect_split(ChunkSize, Bin) when byte_size(Bin) < ChunkSize -> [Bin]; -split(ChunkSize, Bin) -> +imperfect_split(ChunkSize, Bin) -> <> = Bin, HalfSize = ChunkSize div 2, case byte_size(Rest) < HalfSize of true -> - HalfSize = ChunkSize div 2, <> = Bin, + %% If Rest is <<>>, both chunks are HalfSize - the chunks are invalid + %% after the strict data split threshold. [ChunkBin2, Rest2]; false -> - [ChunkBin | split(ChunkSize, Rest)] + [ChunkBin | imperfect_split(ChunkSize, Rest)] end. build_proofs(B, TX, Chunks) ->