Skip to content

Commit

Permalink
Refactor handoff configuration (#998)
Browse files Browse the repository at this point in the history
* Refactor handoff configuration

Extend defaults, and allow for re-configuration mid-handoff
  • Loading branch information
martinsumner committed Mar 8, 2023
1 parent 10cad32 commit b1696b3
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 76 deletions.
15 changes: 15 additions & 0 deletions priv/riak_core.schema
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@
end}.

%% @doc Number of concurrent node-to-node transfers allowed.
%% It is generally safe to raise this number to increase concurrency. A
%% common value to use would be 8. However, it is necessary to monitor for
%% handoff timeouts. If handoff timeouts occur, reduce the
%% handoff_batch_threshold_count. If timeouts continue to occur, even with a
%% low value for the handoff_batch_threshold_count (e.g. 200), then raise the
%% handoff_timeout. Further, if using the leveled backend in Riak KV
%% investigate raising the the backend_pause.
{mapping, "transfer_limit", "riak_core.handoff_concurrency", [
{datatype, integer},
{default, 2},
Expand All @@ -54,6 +61,14 @@
{commented, 500}
]}.

%% @doc Handoff timeout (milliseconds)
%% The timeout to wait for acknowledgement of the previous handoff batch
{mapping, "handoff_timeout", "riak_core.handoff_timeout", [
{datatype, integer},
{default, 120000},
{commented, 120000}
]}.

%% @doc Default location of ringstate
{mapping, "ring.state_dir", "riak_core.ring_state_dir", [
{datatype, directory},
Expand Down
175 changes: 99 additions & 76 deletions src/riak_core_handoff_sender.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@
-include("riak_core_vnode.hrl").
-include("riak_core_handoff.hrl").
-include("stacktrace.hrl").
%% can be set with env riak_core, handoff_timeout
-define(TCP_TIMEOUT, 60000).

-define(HANDOFF_TIMEOUT, 120000).
-define(HANDOFF_ACKSYNC_THRESHOLD, 1).
-define(HANDOFF_ACKLOG_THRESHOLD, 100).
-define(HANDOFF_BATCH_THRESHOLD, 1024 * 1024).
-define(HANDOFF_BATCH_THRESHOLD_COUNT, 500).

%% can be set with env riak_core, handoff_status_interval
%% note this is in seconds
-define(STATUS_INTERVAL, 2).
Expand Down Expand Up @@ -65,14 +70,19 @@
item_queue_length :: non_neg_integer(),
item_queue_byte_size :: non_neg_integer(),

acksync_threshold :: pos_integer(),
acklog_threshold :: pos_integer(),
acksync_threshold = ?HANDOFF_ACKSYNC_THRESHOLD
:: pos_integer(),
acklog_threshold = ?HANDOFF_ACKLOG_THRESHOLD
:: pos_integer(),
keepalive_next :: erlang:timestamp(),
acklog_last :: erlang:timestamp(),
handoff_batch_threshold_size :: pos_integer(), % bytes
handoff_batch_threshold_count :: pos_integer(),
handoff_batch_threshold_size = ?HANDOFF_BATCH_THRESHOLD
:: pos_integer(), % bytes
handoff_batch_threshold_count = ?HANDOFF_BATCH_THRESHOLD_COUNT
:: pos_integer(),

rcv_timeout :: pos_integer(), % milliseconds
rcv_timeout = ?HANDOFF_TIMEOUT
:: pos_integer(), % milliseconds

type :: ho_type(),

Expand Down Expand Up @@ -138,36 +148,45 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid, SslOpts) ->
ConfigBin = term_to_binary(Config),
ok = TcpMod:send(Socket, <<?PT_MSG_CONFIGURE:8, ConfigBin/binary>>),

RecvTimeout = get_handoff_timeout(),

AckSyncThreshold =
app_helper:get_env(
riak_core, handoff_acksync_threshold, 1),
% This will force a sync before each batch by default - so sending
% the next batch will be delayed until the previous batch has been
% processed. Prior to 3.0.13, this was set to a default of 25
AckLogThreshold =
app_helper:get_env(
riak_core, handoff_acklog_threshold, 100),
% If the batch size is 1MB, this will log progress every 100MB
HandoffBatchThresholdSize =
app_helper:get_env(
riak_core, handoff_batch_threshold, 1024 * 1024),
% Batch threshold is in bytes
HandoffBatchThresholdCount =
app_helper:get_env(
riak_core, handoff_batch_threshold_count, 500),
% Batch threshold as a count of objects. If the handoff_timeout
% is 60s, this requires the receiver vnode to handle each handoff
% item in less than 120ms (assuming the fold to create the batch is
% fast).
StartFoldTime = os:timestamp(),
Stats = #ho_stats{interval_end=future_now(get_status_interval())},
UnsentAcc0 = get_notsent_acc0(Opts),
UnsentFun = get_notsent_fun(Opts),

InitAcc =
#ho_acc{
ack=0,
error=ok,
filter=Filter,
module=Module,
parent=ParentPid,
socket=Socket,
src_target={SrcPartition, TargetPartition},
stats=Stats,
tcp_mod=TcpMod,

total_bytes=0,
total_objects=0,

item_queue=[],
item_queue_length=0,
item_queue_byte_size=0,

acklog_last = os:timestamp(),
keepalive_next = next_keepalive_time(),

type=Type,
notsent_acc=UnsentAcc0,
notsent_fun=UnsentFun},

HandoffAcc = set_timeouts_and_thresholds(InitAcc),

%% Since handoff_concurrency applies to both outbound and inbound
%% connections there is a chance that the receiver may decide to
%% reject the senders attempt to start a handoff.
%% The sender must assume that a closed socket at this point is a
%% rejection by the receiver to enforce handoff_concurrency.
case send_sync(TcpMod, Socket, RecvTimeout) of
case send_sync(TcpMod, Socket, HandoffAcc#ho_acc.rcv_timeout) of
ok ->
ok;
{error, DirectionS, timeout} ->
Expand All @@ -189,45 +208,10 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid, SslOpts) ->

M = <<?PT_MSG_INIT:8,TargetPartition:160/integer>>,
ok = TcpMod:send(Socket, M),
StartFoldTime = os:timestamp(),
Stats = #ho_stats{interval_end=future_now(get_status_interval())},
UnsentAcc0 = get_notsent_acc0(Opts),
UnsentFun = get_notsent_fun(Opts),

Req =
riak_core_util:make_fold_req(
fun visit_item/3,
#ho_acc{
ack=0,
error=ok,
filter=Filter,
module=Module,
parent=ParentPid,
socket=Socket,
src_target={SrcPartition, TargetPartition},
stats=Stats,
tcp_mod=TcpMod,

total_bytes=0,
total_objects=0,

item_queue=[],
item_queue_length=0,
item_queue_byte_size=0,

acksync_threshold=AckSyncThreshold,
acklog_threshold=AckLogThreshold,
acklog_last = os:timestamp(),
keepalive_next = next_keepalive_time(),
handoff_batch_threshold_size = HandoffBatchThresholdSize,
handoff_batch_threshold_count = HandoffBatchThresholdCount,
rcv_timeout = RecvTimeout,

type=Type,
notsent_acc=UnsentAcc0,
notsent_fun=UnsentFun},
false,
FoldOpts),
fun visit_item/3, HandoffAcc, false, FoldOpts),

%% IFF the vnode is using an async worker to perform the fold
%% then sync_command will return error on vnode crash,
Expand Down Expand Up @@ -257,7 +241,8 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid, SslOpts) ->
total_objects=TotalObjects,
total_bytes=TotalBytes,
stats=FinalStats,
notsent_acc=NotSentAcc} = AccRecord,
notsent_acc=NotSentAcc,
rcv_timeout=RecvTimeout} = AccRecord,

case ErrStatus of
ok ->
Expand Down Expand Up @@ -339,6 +324,44 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid, SslOpts) ->
gen_fsm:send_event(ParentPid, {handoff_error, Err, Reason})
end.

-spec set_timeouts_and_thresholds(ho_acc()) -> ho_acc().
set_timeouts_and_thresholds(HoAcc) ->
RecvTimeout = get_handoff_timeout(),
AckSyncThreshold =
app_helper:get_env(
riak_core, handoff_acksync_threshold,
?HANDOFF_ACKSYNC_THRESHOLD),
% This will force a sync before each batch by default - so sending
% the next batch will be delayed until the previous batch has been
% processed. Prior to 3.0.13, this was set to a default of 25
AckLogThreshold =
app_helper:get_env(
riak_core, handoff_acklog_threshold,
?HANDOFF_ACKLOG_THRESHOLD),
% If the batch size is 1MB, this will log progress every 100MB
HandoffBatchThresholdSize =
app_helper:get_env(
riak_core, handoff_batch_threshold,
?HANDOFF_BATCH_THRESHOLD),
% Batch threshold is in bytes
HandoffBatchThresholdCount =
app_helper:get_env(
riak_core, handoff_batch_threshold_count,
?HANDOFF_BATCH_THRESHOLD_COUNT),
% Batch threshold as a count of objects. If the handoff_timeout
% is 60s, this requires the receiver vnode to handle each handoff
% item in less than 120ms (assuming the fold to create the batch is
% fast).

HoAcc#ho_acc{
acksync_threshold=AckSyncThreshold,
acklog_threshold=AckLogThreshold,
handoff_batch_threshold_size = HandoffBatchThresholdSize,
handoff_batch_threshold_count = HandoffBatchThresholdCount,
rcv_timeout = RecvTimeout
}.


visit_item(K, V, Acc0) ->
Acc = maybe_keepalive_receiver(Acc0),
#ho_acc{filter=Filter,
Expand Down Expand Up @@ -468,7 +491,7 @@ send_objects(ItemsReverseList, Acc) ->
_ ->
ok
end,
UpdAckLogLast =
Acc0 =
case {ReceiverInSync, Ack rem AckLogThreshold} of
{ok, 0} ->
lager:info(
Expand All @@ -484,9 +507,10 @@ send_objects(ItemsReverseList, Acc) ->
timer:now_diff(os:timestamp(), SyncClock) div 1000,
timer:now_diff(os:timestamp(), AckLogLast) div 1000]
),
os:timestamp();
UpdConfigAcc = set_timeouts_and_thresholds(Acc),
UpdConfigAcc#ho_acc{acklog_last = os:timestamp()};
{ok, _} ->
AckLogLast;
Acc;
{{error, SyncFailure}, _} ->
throw_error(Acc, {error, SyncFailure})
end,
Expand All @@ -500,10 +524,9 @@ send_objects(ItemsReverseList, Acc) ->

case TcpMod:send(Socket, M) of
ok ->
Acc#ho_acc{ack=Ack+1, error=ok, stats=Stats3,
Acc0#ho_acc{ack=Ack+1, error=ok, stats=Stats3,
total_objects=TotalObjects+NObjects,
total_bytes=TotalBytes+NumBytes,
acklog_last=UpdAckLogLast,
keepalive_next=next_keepalive_time(),
item_queue=[],
item_queue_length=0,
Expand All @@ -517,7 +540,7 @@ send_objects(ItemsReverseList, Acc) ->
SrcPartition, TargetPartition,
Type, Module]
),
throw_error(Acc#ho_acc{stats=Stats3}, {error, SendFailure})
throw_error(Acc0#ho_acc{stats=Stats3}, {error, SendFailure})
end.

-spec throw_error(ho_acc(), {error, term()}) -> ok.
Expand Down Expand Up @@ -580,13 +603,13 @@ get_handoff_timeout() ->
%% reached.
%% A sync message is sent every handoff_ack_sync_threshold batches, as
%% well as when initialising and closing the handoff.
app_helper:get_env(riak_core, handoff_timeout, ?TCP_TIMEOUT).
app_helper:get_env(riak_core, handoff_timeout, ?HANDOFF_TIMEOUT).

-spec get_receiver_handoff_timeout() -> pos_integer().
get_receiver_handoff_timeout() ->
%% If a receiver is idle for this time (seconds), i.e. not either
%% processing or receiving messages, it will timeout. So if the sender
%% is non-performant it must sendd a keepalive (sync) message before this
%% is non-performant it must send a keepalive (sync) message before this
%% timeout is breached
(riak_core_handoff_receiver:get_handoff_timeout() div 1000).

Expand Down

0 comments on commit b1696b3

Please sign in to comment.