Skip to content

Commit

Permalink
MB-52988: Hooks and validation for pause and resume OPs
Browse files Browse the repository at this point in the history
Change-Id: I595af46f813b6a5b3d67ae189219c9007c46544e
Reviewed-on: https://review.couchbase.org/c/ns_server/+/182856
Tested-by: Navdeep S Boparai <navdeep.boparai@couchbase.com>
Reviewed-by: Artem Stemkovski <artem@couchbase.com>
  • Loading branch information
boparai11 committed Jan 23, 2023
1 parent 62adce7 commit 13ad4f9
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 132 deletions.
119 changes: 43 additions & 76 deletions src/hibernation_manager.erl
Expand Up @@ -29,51 +29,36 @@
-define(RESUME_BUCKET_DRY_RUN_TIMEOUT,
?get_timeout({dry_run, resume_bucket}, 5 * 60 * 1000)).

-export([pause_bucket/2,
resume_bucket/2]).
-export([pause_bucket/3,
resume_bucket/4]).

-type supported_services() :: index | fts.

-spec build_remote_path(
For :: data | supported_services(),
RemotePath :: string()) -> string().
build_remote_path(For, RemotePath) ->
filename:join(RemotePath, atom_to_list(For)).

get_data_remote_path(RemotePath) ->
KvRemotePath = build_remote_path(data, RemotePath),
"s3:" ++ Rest = KvRemotePath,
Rest ++ "/".

-spec register_worker(For :: kv | supported_services()) -> true.
-spec register_worker(For :: service()) -> true.
register_worker(For) ->
WorkerName = list_to_atom(?MODULE_STRING ++ "-worker-" ++
atom_to_list(For)),
erlang:register(WorkerName, self()).

supported_services() ->
[index, fts].

%% Stubbed to test just kv service for now
build_service_workers_params(_RemotePath, _Snapshot) ->
SupportedServices = supported_services(),
SupportedServices = hibernation_utils:supported_services(),
?log_debug("Supported services stubbed out: ~p", [SupportedServices]),
[].

build_kv_worker_params(RemotePath, KvNodes) ->
{kv, KvNodes, get_data_remote_path(RemotePath)}.
{kv, KvNodes, hibernation_utils:get_data_remote_path(RemotePath)}.

build_workers_params(RemotePath, KvNodes, Snapshot) ->
[build_kv_worker_params(RemotePath, KvNodes) |
build_service_workers_params(RemotePath, Snapshot)].

pause_bucket(Bucket, RemotePath) ->
pause_bucket(Bucket, Snapshot, RemotePath) ->
spawn_link_hibernation_manager(
pause_bucket, ?cut(do_pause_bucket(Bucket, RemotePath))).
pause_bucket, ?cut(do_pause_bucket(Bucket, Snapshot, RemotePath))).

resume_bucket(Bucket, RemotePath) ->
resume_bucket(Bucket, NewBucketConfig, Metadata, RemotePath) ->
spawn_link_hibernation_manager(
resume_bucket, ?cut(do_resume_bucket(Bucket, RemotePath))).
resume_bucket, ?cut(do_resume_bucket(Bucket, NewBucketConfig, Metadata,
RemotePath))).

spawn_link_hibernation_manager(Op, Body) ->
proc_lib:spawn_link(
Expand All @@ -90,8 +75,7 @@ get_pause_kv_nodes(Bucket, Snapshot) ->
{ok, BucketCfg} = ns_bucket:get_bucket(Bucket, Snapshot),
ns_bucket:get_servers(BucketCfg).

do_pause_bucket(Bucket, RemotePath) ->
Snapshot = hibernation_utils:get_snapshot(Bucket),
do_pause_bucket(Bucket, Snapshot, RemotePath) ->
KvNodes = get_pause_kv_nodes(Bucket, Snapshot),
WorkersParams = build_workers_params(RemotePath, KvNodes, Snapshot),

Expand All @@ -110,7 +94,7 @@ do_pause_bucket(Bucket, RemotePath) ->
kv_hibernation_agent:unprepare_pause_bucket(Bucket, KvNodes).

-spec pause_bucket_body(For, Bucket, Snapshot, RemotePath, Nodes) -> ok
when For :: kv | supported_services(),
when For :: service(),
Bucket :: bucket_name(),
Snapshot :: map(),
RemotePath :: string(),
Expand All @@ -121,22 +105,9 @@ pause_bucket_body(For, Bucket, Snapshot, RemotePath, Nodes) ->
service_manager:with_trap_exit_spawn_monitor_pause_bucket(
For, Bucket, Snapshot, RemotePath, Nodes, ProgressCallback, #{}).

get_filtered_bucket_cfg(BucketCfg) ->
Filter = [servers, desired_servers, map],
lists:filter(fun ({K, _V}) ->
not lists:member(K, Filter)
end, BucketCfg).

get_new_map(OldMap, ServerMapping) ->
[[maps:get(Item, ServerMapping) || Item <- VbChain] || VbChain <- OldMap].

get_new_bucket_config(Bucket, PausedBucketCfg,
BucketVersion) when BucketVersion =:= ?VERSION_ELIXIR ->
NewConfig = get_filtered_bucket_cfg(PausedBucketCfg) ++
[{servers, []}, {hibernation_state, resuming}],
{ok, PlacedConfig} = bucket_placer:place_bucket(Bucket, NewConfig),
PlacedConfig.

validate_server_lists(DesiredServers, OldServerList) ->
case length(DesiredServers) =:= length(OldServerList) of
true ->
Expand All @@ -162,21 +133,9 @@ activate_restored_bucket(Bucket, PausedBucketCfg, DesiredServers,
NewMap = get_new_map(OldMap, ServerMapping),
ns_bucket:set_restored_attributes(Bucket, NewMap, DesiredServers).

get_metadata(RemotePath) ->
KvRemotePath = get_data_remote_path(RemotePath),
hibernation_utils:get_bucket_metadata_from_s3(KvRemotePath).

get_paused_bucket_cfg(Metadata) ->
BucketVersion = hibernation_utils:get_bucket_version(Metadata),
PausedBucketCfg = hibernation_utils:get_bucket_config(Metadata),
{BucketVersion, PausedBucketCfg}.

do_resume_bucket(Bucket, RemotePath) ->
do_resume_bucket(Bucket, NewBucketConfig, Metadata, RemotePath) ->
Snapshot = ns_cluster_membership:get_snapshot(),
Metadata = get_metadata(RemotePath),
{Version, PausedBucketCfg} = get_paused_bucket_cfg(Metadata),

NewBucketConfig = get_new_bucket_config(Bucket, PausedBucketCfg, Version),
PausedBucketCfg = hibernation_utils:get_bucket_config(Metadata),
DesiredServers = proplists:get_value(desired_servers, NewBucketConfig),
ServerMapping = get_server_mapping(PausedBucketCfg, DesiredServers),

Expand Down Expand Up @@ -228,7 +187,7 @@ do_resume_bucket(Bucket, RemotePath) ->

-spec resume_bucket_body(For, Bucket, ServerMapping, RemotePath, DryRun,
Nodes) -> ok
when For :: kv | supported_services(),
when For :: service(),
Bucket :: bucket_name(),
ServerMapping :: #{node() => node()},
RemotePath :: string(),
Expand Down Expand Up @@ -266,7 +225,7 @@ meck_expect_base() ->
fun (_) ->
#{}
end),
meck:expect(hibernation_utils, get_bucket_metadata_from_s3,
meck:expect(hibernation_utils, get_metadata_from_s3,
fun (_) ->
[{bucket_cfg, [{map, []}, {servers, []}]},
{version, ?VERSION_ELIXIR}, {bucket_manifest, []},
Expand Down Expand Up @@ -377,7 +336,7 @@ hibernation_manager_test() ->
end),

run_test_and_assert(
?cut(do_pause_bucket("foo", "s3://foo-remote-path")),
?cut(do_pause_bucket("foo", #{}, "s3://foo-remote-path")),
exit_normal, exit_not_normal)
end
},
Expand All @@ -391,21 +350,24 @@ hibernation_manager_test() ->
end),

run_test_and_assert(
?cut(do_pause_bucket("foo", "s3://foo-remote-path")),
?cut(do_pause_bucket("foo", #{}, "s3://foo-remote-path")),
exit_not_normal, exit_normal)
end},
{"Resume Bucket Success",
fun () ->
meck:expect(service_manager,
with_trap_exit_spawn_monitor_resume_bucket,
fun (_Service, _Bucket, _ServerMapping, _RemotePath,
_DryRun, _Nodes, _ProgressCallback, _Opts) ->
hibernation_op_success()
end),

run_test_and_assert(
?cut(do_resume_bucket("foo", "s3://foo-remote-path")),
exit_normal, exit_not_normal)
meck:expect(service_manager,
with_trap_exit_spawn_monitor_resume_bucket,
fun (_Service, _Bucket, _ServerMapping, _RemotePath,
_DryRun, _Nodes, _ProgressCallback, _Opts) ->
hibernation_op_success()
end),
{ok, {NewBucketConfig, Metadata}} =
hibernation_utils:check_allow_resume_op(
"foo", "s3://foo-remote-path"),
run_test_and_assert(
?cut(do_resume_bucket("foo", NewBucketConfig, Metadata,
"s3://foo-remote-path")),
exit_normal, exit_not_normal)
end},
{"Resume Bucket Failure",
fun () ->
Expand All @@ -415,9 +377,12 @@ hibernation_manager_test() ->
_DryRun, _Nodes, _ProgressCallback, _Opts) ->
hibernation_op_fail(Service)
end),

{ok, {NewBucketConfig, Metadata}} =
hibernation_utils:check_allow_resume_op(
"foo", "s3://foo-remote-path"),
run_test_and_assert(
?cut(do_resume_bucket("foo", "s3://foo-remote-path")),
?cut(do_resume_bucket("foo", NewBucketConfig, Metadata,
"s3://foo-remote-path")),
exit_not_normal, exit_normal)
end}]}.

Expand Down Expand Up @@ -488,7 +453,7 @@ force_unpause_via_calling_process_failure_test() ->
meck:unload(Modules).

resume_helpers_test_body() ->
meck:expect(hibernation_utils, get_bucket_metadata_from_s3,
meck:expect(hibernation_utils, get_metadata_from_s3,
fun (_) ->
[{bucket_cfg, [{map, [['n_0@127.0.0.1','n_1@127.0.0.1'],
['n_0@127.0.0.1','n_1@127.0.0.1'],
Expand All @@ -513,10 +478,12 @@ resume_helpers_test_body() ->
end),

Bucket = "BucketName",
Metadata = hibernation_utils:get_bucket_metadata_from_s3("/metadata"),
{Version, PausedStubCfg} = get_paused_bucket_cfg(Metadata),
NewBucketCfg = get_new_bucket_config(Bucket, PausedStubCfg, Version),
DesiredServers = proplists:get_value(desired_servers, NewBucketCfg),
Metadata = hibernation_utils:get_metadata_from_s3("s3://foo-remote-path"),
PausedStubCfg = hibernation_utils:get_bucket_config(Metadata),
NewConfig = [{map, proplists:get_value(map, PausedStubCfg)}] ++
[{servers, []}, {hibernation_state, resuming}],
{ok, NewBucketCfg} = bucket_placer:place_bucket(Bucket, NewConfig),
DesiredServers= proplists:get_value(desired_servers, NewBucketCfg),
ServerMap = get_server_mapping(PausedStubCfg, DesiredServers),
OldToNewServerMap = maps:fold(fun(Key, Value, Acc) ->
maps:put(Value, Key, Acc)
Expand Down

0 comments on commit 13ad4f9

Please sign in to comment.