Skip to content

Commit

Permalink
Merge pull request #159 from SergeTupchiy/configurable-bootstrap-batc…
Browse files Browse the repository at this point in the history
…h-size

feat: make bootstrap batch size configurable per shard
  • Loading branch information
SergeTupchiy committed Sep 11, 2023
2 parents c06a252 + 2cad3dd commit 267b820
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/mria_bootstrapper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ server_loop(St = #server{tables = [], subscriber = Subscriber, iterator = undefi
server_loop(St0 = #server{tables = [Table|Rest], subscriber = Subscriber, iterator = It0, shard = Shard}) ->
{It, Records} = case It0 of
undefined ->
BatchSize = 500, % TODO: make it configurable per shard
BatchSize = mria_config:shard_bootstrap_batch_size(Shard),
?tp(info, start_shard_table_bootstrap,
#{ shard => Shard
, table => Table
Expand Down
27 changes: 26 additions & 1 deletion src/mria_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
, set_shard_transport/2
, shard_transport/1

, set_shard_bootstrap_batch_size/2
, shard_bootstrap_batch_size/1

%% Callbacks
, register_callback/2
, unregister_callback/1
Expand Down Expand Up @@ -78,7 +81,7 @@
-define(shard_config(SHARD), {mria_shard_config, SHARD}).
-define(is_dirty(SHARD), {mria_is_dirty_shard, SHARD}).
-define(shard_transport(SHARD), {mria_shard_transport, SHARD}).
-define(shard_transport, mria_shard_transport).
-define(shard_bootstrap_batch_size(SHARD), {mria_shard_bootstrap_batch_size, SHARD}).

-define(mria(Key), {mria, Key}).

Expand Down Expand Up @@ -157,6 +160,7 @@ load_config() ->
copy_from_env(replay_batch_size),
copy_from_env(shard_transport),
copy_from_env(max_mql),
copy_from_env(bootstrap_batch_size),
consistency_check().

-spec set_dirty_shard(mria_rlog:shard(), boolean()) -> ok.
Expand All @@ -182,6 +186,17 @@ shard_transport(Shard) ->
Default = persistent_term:get(?mria(shard_transport), gen_rpc),
persistent_term:get(?shard_transport(Shard), Default).

-spec set_shard_bootstrap_batch_size(mria_rlog:shard(), non_neg_integer()) -> ok.
set_shard_bootstrap_batch_size(Shard, BatchSize) when is_integer(BatchSize), BatchSize > 0 ->
ok = persistent_term:put(?shard_bootstrap_batch_size(Shard), BatchSize);
set_shard_bootstrap_batch_size(Shard, BatchSize) ->
error({badarg, Shard, BatchSize}).

-spec shard_bootstrap_batch_size(mria_rlog:shard()) -> non_neg_integer().
shard_bootstrap_batch_size(Shard) ->
Default = persistent_term:get(?mria(bootstrap_batch_size), 500),
persistent_term:get(?shard_bootstrap_batch_size(Shard), Default).

-spec load_shard_config(mria_rlog:shard(), [mria:table()]) -> ok.
load_shard_config(Shard, Tables) ->
?tp(info, "Setting RLOG shard config",
Expand Down Expand Up @@ -266,6 +281,10 @@ erase_shard_config(Shard) ->
persistent_term:erase(Key);
({Key = ?shard_config(S), _}) when S =:= Shard ->
persistent_term:erase(Key);
({Key = ?shard_transport(S), _}) when S =:= Shard ->
persistent_term:erase(Key);
({Key = ?shard_bootstrap_batch_size(S), _}) when S =:= Shard ->
persistent_term:erase(Key);
(_) ->
ok
end
Expand All @@ -283,6 +302,12 @@ erase_all_config() ->
persistent_term:erase(Key);
?mria(_) ->
persistent_term:erase(Key);
?is_dirty(_) ->
persistent_term:erase(Key);
?shard_transport(_) ->
persistent_term:erase(Key);
?shard_bootstrap_batch_size(_) ->
persistent_term:erase(Key);
_ ->
ok
end
Expand Down

0 comments on commit 267b820

Please sign in to comment.