Skip to content

Commit

Permalink
Fix bug where syncing slows down after store chunk queues saturate
Browse files Browse the repository at this point in the history
  • Loading branch information
Lev Berman committed Apr 12, 2023
1 parent 963461f commit c6d1e33
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 36 deletions.
22 changes: 21 additions & 1 deletion apps/arweave/include/ar_data_sync.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@
-define(EXCLUDE_MISSING_INTERVAL_TIMEOUT_MS, 10 * 60 * 1000).
-endif.

%% Let at least this many chunks stack up, per storage module, then write them on disk in the
%% ascending order, to reduce out-of-order disk writes causing fragmentation.
-ifdef(DEBUG).
-define(STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD, 2).
-else.
-define(STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD, 100). % ~ 25 MB worth of chunks.
-endif.

%% If a chunk spends longer than this in the store queue, write it on disk without waiting
%% for ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD chunks to stack up.
-ifdef(DEBUG).
-define(STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD, 1000).
-else.
-define(STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD, 2_000). % 2 seconds.
-endif.

%% @doc The state of the server managing data synchronization.
-record(sync_data_state, {
%% The last entries of the block index.
Expand Down Expand Up @@ -193,5 +209,9 @@
%% them in the ascending order and reduce out-of-order disk writes causing fragmentation.
store_chunk_queue = gb_sets:new(),
%% The length of the store chunk queue.
store_chunk_queue_len = 0
store_chunk_queue_len = 0,
%% The threshold controlling the brief accumuluation of the chunks in the queue before
%% the actual disk dump, to reduce the chance of out-of-order write causing disk
%% fragmentation.
store_chunk_queue_threshold = ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD
}).
39 changes: 19 additions & 20 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,6 @@
-define(COLLECT_SYNC_INTERVALS_FREQUENCY_MS, 300000).
-endif.

%% Let at least this many chunks stack up, per storage module, then write them on disk in the
%% ascending order, to reduce out-of-order disk writes causing fragmentation.
-ifdef(DEBUG).
-define(STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD, 2).
-else.
-define(STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD, 200). % ~ 50 MB worth of chunks.
-endif.

%% If a chunk spends longer than this in the store queue, write it on disk without waiting
%% for ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD chunks to stack up.
-ifdef(DEBUG).
-define(STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD, 1000).
-else.
-define(STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD, 15_000). % 15 seconds.
-endif.

%%%===================================================================
%%% Public interface.
%%%===================================================================
Expand Down Expand Up @@ -2608,18 +2592,33 @@ pack_and_store_chunk(Args, State) ->
process_store_chunk_queue(#sync_data_state{ store_chunk_queue_len = 0 } = State) ->
State;
process_store_chunk_queue(State) ->
#sync_data_state{ store_chunk_queue = Q, store_chunk_queue_len = Len } = State,
#sync_data_state{ store_chunk_queue = Q, store_chunk_queue_len = Len,
store_chunk_queue_threshold = Threshold } = State,
Timestamp = element(2, gb_sets:smallest(Q)),
Now = os:system_time(millisecond),
case Len > ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD
Threshold2 =
case Threshold < ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD of
true ->
Threshold;
false ->
case Len > Threshold of
true ->
0;
false ->
Threshold
end
end,
case Len > Threshold2
orelse Now - Timestamp > ?STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD of
true ->
{{_Offset, _Timestamp, _Ref, ChunkArgs, Args}, Q2} = gb_sets:take_smallest(Q),
store_chunk2(ChunkArgs, Args, State),
decrement_chunk_cache_size(),
State2 = State#sync_data_state{ store_chunk_queue = Q2,
store_chunk_queue_len = Len - 1 },
State2;
store_chunk_queue_len = Len - 1,
store_chunk_queue_threshold = min(Threshold2 + 1,
?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD) },
process_store_chunk_queue(State2);
false ->
State
end.
Expand Down
22 changes: 11 additions & 11 deletions apps/arweave/src/ar_data_sync_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ handle_cast(Cast, State) ->
?LOG_WARNING("event: unhandled_cast, cast: ~p", [Cast]),
{noreply, State}.

handle_info(Message, State) ->
?LOG_WARNING("event: unhandled_info, message: ~p", [Message]),
handle_info(_Message, State) ->
{noreply, State}.

terminate(Reason, _State) ->
Expand Down Expand Up @@ -170,13 +169,15 @@ read_range2({Start, End, OriginStoreID, TargetStoreID, SkipSmall}) ->
end
end.

sync_range({Start, End, _Peer, _TargetStoreID}) when Start >= End ->
sync_range({Start, End, _Peer, _TargetStoreID, _RetryCount}) when Start >= End ->
ok;
sync_range({Start, End, Peer, TargetStoreID} = Args) ->
sync_range({_Start, _End, _Peer, _TargetStoreID, 0}) ->
ok;
sync_range({Start, End, Peer, TargetStoreID, RetryCount} = Args) ->
IsChunkCacheFull =
case ar_data_sync:is_chunk_cache_full() of
true ->
ar_util:cast_after(1000, self(), {sync_range, Args}),
ar_util:cast_after(100, self(), {sync_range, Args}),
true;
false ->
false
Expand Down Expand Up @@ -210,7 +211,6 @@ sync_range({Start, End, Peer, TargetStoreID} = Args) ->
false ->
fun ar_http_iface_client:get_chunk_json/3
end,
ar_data_sync:increment_chunk_cache_size(),
case Fun(Peer, Start2, any) of
{ok, #{ chunk := Chunk } = Proof, Time, TransferSize} ->
%% In case we fetched a packed small chunk,
Expand All @@ -222,17 +222,17 @@ sync_range({Start, End, Peer, TargetStoreID} = Args) ->
gen_server:cast(list_to_atom("ar_data_sync_" ++ TargetStoreID),
{store_fetched_chunk, Peer, Time, TransferSize, Start2 - 1,
Proof}),
sync_range({Start3, End, Peer, TargetStoreID});
ar_data_sync:increment_chunk_cache_size(),
sync_range({Start3, End, Peer, TargetStoreID, RetryCount});
{error, timeout} ->
ar_data_sync:decrement_chunk_cache_size(),
ar_util:cast_after(1000, self(), {sync_range, Args}),
Args2 = {Start, End, Peer, TargetStoreID, RetryCount - 1},
ar_util:cast_after(1000, self(), {sync_range, Args2}),
recast;
{error, Reason} ->
?LOG_DEBUG([{event, failed_to_fetch_chunk},
{peer, ar_util:format_peer(Peer)},
{offset, Start2},
{reason, io_lib:format("~p", [Reason])}]),
ar_data_sync:decrement_chunk_cache_size()
{reason, io_lib:format("~p", [Reason])}])
end
end
end.
6 changes: 3 additions & 3 deletions apps/arweave/src/ar_data_sync_worker_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ process_task_queue(N, State) ->
case Task of
read_range ->
{Start, End, OriginStoreID, TargetStoreID, SkipSmall} = Args,
End2 = min(Start + 262144, End),
End2 = min(Start + 10 * 262144, End),
{{value, W}, WorkerQ2} = queue:out(WorkerQ),
ets:update_counter(?MODULE, scheduled_tasks, {2, 1}, {scheduled_tasks, 0}),
prometheus_gauge:inc(scheduled_sync_tasks),
Expand All @@ -121,11 +121,11 @@ process_task_queue(N, State) ->
process_task_queue(N - 1, State2);
sync_range ->
{Start, End, Peer, TargetStoreID} = Args,
End2 = min(Start + 262144, End),
End2 = min(Start + 200 * 262144, End),
{{value, W}, WorkerQ2} = queue:out(WorkerQ),
ets:update_counter(?MODULE, scheduled_tasks, {2, 1}, {scheduled_tasks, 0}),
prometheus_gauge:inc(scheduled_sync_tasks),
gen_server:cast(W, {sync_range, {Start, End2, Peer, TargetStoreID}}),
gen_server:cast(W, {sync_range, {Start, End2, Peer, TargetStoreID, 3}}),
WorkerQ3 = queue:in(W, WorkerQ2),
{Q3, Len2} =
case End2 == End of
Expand Down
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_http_iface_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ get_chunk(Peer, Offset, RequestedPacking, Encoding) ->
peer => Peer,
method => get,
path => get_chunk_path(Offset, Encoding),
timeout => 20 * 1000,
timeout => 30 * 1000,
connect_timeout => 5000,
limit => ?MAX_SERIALIZED_CHUNK_PROOF_SIZE,
headers => p2p_headers() ++ Headers
Expand Down

0 comments on commit c6d1e33

Please sign in to comment.