From f2102893d9a346e9fa459aa55f460fbceb84c1b5 Mon Sep 17 00:00:00 2001 From: khellan Date: Wed, 2 Jun 2010 11:47:01 +0200 Subject: [PATCH] Sequentially replicates to new servers before setting up continuous replication to each of them. The status information about resharding is now tracking correctly instead of showing ready when replication is initiated like it used to be when setting up continuous replication directly. --- src/pillow_routing_table.erl | 119 ++++++++++++++++++++--------------- src/pillow_status.erl | 38 ++++++++--- 2 files changed, 97 insertions(+), 60 deletions(-) diff --git a/src/pillow_routing_table.erl b/src/pillow_routing_table.erl index a9d8301..c761f5d 100644 --- a/src/pillow_routing_table.erl +++ b/src/pillow_routing_table.erl @@ -23,6 +23,9 @@ % Functions for general use -export([update_routing_table/0, to_list/0, get_server/1, reshard/0, flip/0]). +% Functions meant for spawning +-export([execute_reshard/1]). + -vsn(0.4). %%-------------------------------------------------------------------- @@ -61,7 +64,7 @@ terminate(_Reason, _RoutingTable) -> %%-------------------------------------------------------------------- init(_) -> pillow_monitor:add_status(pillow_reshard_status, ok), - set_routing_table(couch_config:get("routing", "routing_table")). + set_routing_table(couch_config:get("routing", "routing_table"), true). %%-------------------------------------------------------------------- %% Function: to_list/0 @@ -90,7 +93,7 @@ update_routing_table() -> %%-------------------------------------------------------------------- %% Function: reshard/0 %% Description: Currently sets up validation filters for resharding the databases -%% Returns: The new dict +%% Returns: ok %%-------------------------------------------------------------------- reshard() -> gen_server:call(?MODULE, reshard). @@ -107,7 +110,7 @@ flip() -> %%-------------------------------------------------------------------- %% Function: get_status/0 %% Description: Toggles the resharding in progress flag -%% Returns: {CurrentRoutingTableAsList, NewRoutingTableAsList} +%% Returns: {CurrentRoutingTableAsList, NewRoutingTableAsList, Databases} %%-------------------------------------------------------------------- get_status() -> gen_server:call(?MODULE, status). @@ -123,22 +126,20 @@ handle_call(to_list, _From, RoutingTable) -> {reply, dict:to_list(RoutingTable), RoutingTable}; handle_call(update_routing_table, _From, _OldRoutingTable) -> {upgrade, PreVersion, PostVersion} = reload_routing_table(), - {reply, {upgrade, PreVersion, PostVersion}, set_routing_table(couch_config:get("routing", "routing_table"))}; + {reply, {upgrade, PreVersion, PostVersion}, set_routing_table(couch_config:get("routing", "routing_table"), true)}; handle_call(reshard, _From, RoutingTable) -> - pillow_monitor:update_status(pillow_reshard_status, resharding), - Result = execute_reshard(RoutingTable), - pillow_monitor:update_status(pillow_reshard_status, ready), - {reply, Result, RoutingTable}; + spawn(?MODULE, execute_reshard, [RoutingTable]), + {reply, ok, RoutingTable}; handle_call(flip, _From, RoutingTable) -> {ok, NewRoutingTable} = execute_flip(RoutingTable), - pillow_monitor:update_status(pillow_reshard_status, ok), + pillow_monitor:update_status(pillow_reshard_status, ok), {reply, ok, NewRoutingTable}; handle_call({set_routing_table, NewRoutingTable}, _From, _RoutingTable) -> - {ok, RoutingTable} = set_routing_table(NewRoutingTable), + {ok, RoutingTable} = set_routing_table(NewRoutingTable, true), {reply, ok, RoutingTable}; handle_call(status, _From, RoutingTable) -> - {ok, NewRoutingTable} = safely_get_resharding_routing_table(), - {reply, {dict:to_list(RoutingTable), dict:to_list(NewRoutingTable)}, RoutingTable}. + {ok, NewRoutingTable} = safely_get_resharding_routing_table(false), + {reply, {dict:to_list(RoutingTable), dict:to_list(NewRoutingTable), get_databases()}, RoutingTable}. %%-------------------------------------------------------------------- %% Function: handle_cast/2 @@ -163,7 +164,7 @@ handle_info(Info, RoutingTable) -> %% Returns: {ok, RoutingTable} %%-------------------------------------------------------------------- code_change(_OldVsn, _RoutingTable, _Extra) -> - {ok, set_routing_table(couch_config:get("routing", "routing_table"))}. + {ok, set_routing_table(couch_config:get("routing", "routing_table"), true)}. %%-------------------------------------------------------------------- %% INTERNAL FUNCTIONS @@ -235,35 +236,45 @@ create_shard_validators(Db, Dict) -> create_shard_validator(Db, 1, Dict). %%-------------------------------------------------------------------- -%% Function: init_replication/3 -%% Description: Sets up a continuous pull replication +%% Function: init_replication/4 +%% Description: Sets up a pull replication %% Returns: The ibrowse response %%-------------------------------------------------------------------- -init_replication(Db, Source, Target) -> +init_replication(Db, Source, Target, Continuous) -> Url = Source ++ "_replicate", TargetDb = Target ++ Db, io:format("init_replication from ~s/~s to ~s~n", [Source, Db, TargetDb]), - Message = mochijson2:encode( - {struct, [ - {<<"source">>, list_to_binary(Db)}, - {<<"target">>, list_to_binary(TargetDb)}, - {<<"continuous">>, true} - ]} - ), + ReplicationDefinition = [ + {<<"source">>, list_to_binary(Db)}, + {<<"target">>, list_to_binary(TargetDb)} + ], + ReplicationSpecification = case Continuous of + true -> lists:flatten(ReplicationDefinition, [{<<"continuous">>, Continuous}]); + _ -> ReplicationDefinition + end, + Message = mochijson2:encode({struct, ReplicationSpecification}), io:format("~s: ~s~n", [Url, Message]), ibrowse:send_req(Url, [], post, Message). -% {ok, "202", "Headers", "Response"}. +% Status = case Continuous of +% true -> "202"; +% _ -> "200" +% end, +% {ok, Status, "Headers", "Response"}. %%-------------------------------------------------------------------- -%% Function: init_all_replication/5 +%% Function: init_all_replication/6 %% Description: Sets up a continuous push replication for all shards %% Returns: ok or error depending on result %%-------------------------------------------------------------------- -init_all_replication(Db, SourceShard, TargetShard, SourceDict, TargetDict) -> +init_all_replication(Db, SourceShard, TargetShard, SourceDict, TargetDict, Continuous) -> + Status = case Continuous of + true -> "202"; + _ -> "200" + end, CurrentResult = case dict:find(SourceShard, SourceDict) of {ok, SourceUrl} -> case dict:find(TargetShard, TargetDict) of - {ok, TargetUrl} -> init_replication(Db, SourceUrl, TargetUrl); + {ok, TargetUrl} -> init_replication(Db, SourceUrl, TargetUrl, Continuous); _ -> io:format("Tried to find ~i of ~i target shards", [TargetShard, dict:size(TargetDict)]), error @@ -273,17 +284,17 @@ init_all_replication(Db, SourceShard, TargetShard, SourceDict, TargetDict) -> error end, NextResult = case CurrentResult of - {ok, "202", _Headers1, _Response1} -> + {ok, Status, _Headers1, _Response1} -> case dict:size(TargetDict) of TargetShard -> ok; - _ -> init_all_replication(Db, SourceShard, TargetShard + 1, SourceDict, TargetDict) + _ -> init_all_replication(Db, SourceShard, TargetShard + 1, SourceDict, TargetDict, Continuous) end end, case NextResult of ok -> case dict:size(SourceDict) of SourceShard -> ok; - _ -> init_all_replication(Db, SourceShard + 1, TargetShard, SourceDict, TargetDict) + _ -> init_all_replication(Db, SourceShard + 1, TargetShard, SourceDict, TargetDict, Continuous) end end. @@ -292,11 +303,11 @@ init_all_replication(Db, SourceShard, TargetShard, SourceDict, TargetDict) -> %% Description: Initiates replication for all databases %% Returns: ok or error depending on result %%-------------------------------------------------------------------- -init_all_databases_replication([], _, _) -> ok; -init_all_databases_replication([Db | Tail], RoutingTable, NewRoutingTable) -> +init_all_databases_replication([], _, _, _) -> ok; +init_all_databases_replication([Db | Tail], RoutingTable, NewRoutingTable, Continuous) -> io:format("Setting up replication of ~s~n", [Db]), - init_all_replication(Db, 1, 1, RoutingTable, NewRoutingTable), - init_all_databases_replication(Tail, RoutingTable, NewRoutingTable). + init_all_replication(Db, 1, 1, RoutingTable, NewRoutingTable, Continuous), + init_all_databases_replication(Tail, RoutingTable, NewRoutingTable, Continuous). %%-------------------------------------------------------------------- %% Function: create_all_databases_shard_validators/3 @@ -310,22 +321,22 @@ create_all_databases_shard_validators([Db | Tail], NewRoutingTable) -> create_all_databases_shard_validators(Tail, NewRoutingTable). %%-------------------------------------------------------------------- -%% Function: get_resharding_routing_table/0 +%% Function: get_resharding_routing_table/1 %% Description: Gets the config stating the new routing Table %% Returns: ok or error depending on result %%-------------------------------------------------------------------- -get_resharding_routing_table() -> - set_routing_table(couch_config:get("resharding", "routing_table")). +get_resharding_routing_table(LogInfo) -> + set_routing_table(couch_config:get("resharding", "routing_table"), LogInfo). %%-------------------------------------------------------------------- -%% Function: safely_get_resharding_routing_table/0 +%% Function: safely_get_resharding_routing_table/1 %% Description: Same as above, but doesn't fail if not found %% Returns: The resharding routing table or an empty dict %%-------------------------------------------------------------------- -safely_get_resharding_routing_table() -> +safely_get_resharding_routing_table(LogInfo) -> case couch_config:get("resharding", "routing_table") of undefined -> {ok, dict:new()}; - List -> set_routing_table(List) + List -> set_routing_table(List, LogInfo) end. %%-------------------------------------------------------------------- @@ -334,9 +345,12 @@ safely_get_resharding_routing_table() -> %% Returns: ok or error depending on result %%-------------------------------------------------------------------- execute_reshard(RoutingTable) -> - {ok, NewRoutingTable} = get_resharding_routing_table(), + pillow_monitor:update_status(pillow_reshard_status, resharding), + {ok, NewRoutingTable} = get_resharding_routing_table(true), create_all_databases_shard_validators(get_databases(), NewRoutingTable), - init_all_databases_replication(get_databases(), RoutingTable, NewRoutingTable). + init_all_databases_replication(get_databases(), RoutingTable, NewRoutingTable, false), + init_all_databases_replication(get_databases(), RoutingTable, NewRoutingTable, true), + pillow_monitor:update_status(pillow_reshard_status, ready). %%-------------------------------------------------------------------- %% Function: execute_flip/1 @@ -345,7 +359,7 @@ execute_reshard(RoutingTable) -> %% Returns: {ok, NewRoutingTable} or {error, RoutingTable} %%-------------------------------------------------------------------- execute_flip(RoutingTable) -> - case set_routing_table(couch_config:get("resharding", "routing_table")) of + case set_routing_table(couch_config:get("resharding", "routing_table"), true) of {ok, NewRoutingTable} -> ok = couch_config:set("old_routing", "routing_table", couch_config:get("routing", "routing_table"), true), ok = couch_config:set("routing", "routing_table", couch_config:get("resharding", "routing_table"), true), @@ -365,23 +379,26 @@ get_routing(Id, Dict) -> dict:fetch(Key, Dict). %%-------------------------------------------------------------------- -%% Function: set_routing/3 +%% Function: set_routing/4 %% Description: Adds the new route to the routing table %% Returns: The updated routing table %%-------------------------------------------------------------------- -set_routing(_NextNum, [], Dict) -> Dict; -set_routing(NextNum, [Head|Tail], OldDict) -> - io:format("Route shard: ~B -> ~s~n", [NextNum, Head]), +set_routing(_NextNum, [], Dict, _LogInfo) -> Dict; +set_routing(NextNum, [Head|Tail], OldDict, LogInfo) -> + case LogInfo of + true -> io:format("Route shard: ~B -> ~s~n", [NextNum, Head]); + _ -> ok + end, NewDict = dict:store(NextNum, Head, OldDict), - set_routing(NextNum + 1, Tail, NewDict). + set_routing(NextNum + 1, Tail, NewDict, LogInfo). %%-------------------------------------------------------------------- -%% Function: set_routing_table/1 +%% Function: set_routing_table/2 %% Description: Purges and reloads the routing table %% Returns: {upgrade, PreVersion, PostVersion} %%-------------------------------------------------------------------- -set_routing_table(NewRoutingTable) -> - FullTable = set_routing(1, re:split(NewRoutingTable, " *, *", [{return, list}]), dict:new()), +set_routing_table(NewRoutingTable, LogInfo) -> + FullTable = set_routing(1, re:split(NewRoutingTable, " *, *", [{return, list}]), dict:new(), LogInfo), {ok, FullTable}. %%-------------------------------------------------------------------- diff --git a/src/pillow_status.erl b/src/pillow_status.erl index 4dd88e4..ae9ea40 100644 --- a/src/pillow_status.erl +++ b/src/pillow_status.erl @@ -49,12 +49,28 @@ to_html(ReqData, Context) -> %% INTERNAL FUNCTIONS %%-------------------------------------------------------------------- +%%-------------------------------------------------------------------- +%% Function: html_list/3 +%% Description: Creates html list from the input information +%% Returns: HTML +%%-------------------------------------------------------------------- +html_list(Type, Class, Items) -> + "<" ++ Type ++ " class=" ++ Class ++ ">" + ++ lists:map(fun(Tuple) -> + "
  • " + ++ case Tuple of + {_, Item} -> Item; + _ -> Tuple + end + ++ "
  • " end, Items) + ++ "". + %%-------------------------------------------------------------------- %% Function: html_encode/1 %% Description: Creates html status from the input information %% Returns: HTML status information %%-------------------------------------------------------------------- -html_encode({Version, ReshardStatus, {CurrentServers, NewServers}}) -> +html_encode({Version, ReshardStatus, {CurrentServers, NewServers, Databases}}) -> "" ++ "" ++ "Pillow Status" @@ -62,15 +78,19 @@ html_encode({Version, ReshardStatus, {CurrentServers, NewServers}}) -> ++ "" ++ "

    Pillow " ++ Version ++ " Status

    " ++ "
    Servers are " ++ io_lib:format("~s", [ReshardStatus]) ++ "
    " - ++ "

    Current Servers

    " - ++ "

    New Servers

    ". + ++ "

    Databases

    " ++ html_list("ul", "", Databases) ++ "
    " + ++ "

    Current Servers

    " ++ html_list("ul", "", CurrentServers) ++ "
    " + ++ "

    New Servers

    " ++ html_list("ul", "", NewServers) ++ "
    " + ++ "". -json_prepare_status({Version, Servers}) -> - {struct, [{version, Version}, {servers, lists:map(fun({_, Server}) -> Server end, Servers)}]}. +json_prepare_status({Version, ReshardStatus, {CurrentServers, NewServers, Databases}}) -> + {struct, [ + {version, Version}, + {reshard_status, ReshardStatus}, + {current_servers, lists:map(fun({_, Server}) -> Server end, CurrentServers)}, + {new_servers, lists:map(fun({_, Server}) -> Server end, NewServers)}, + {databases, lists:map(fun({_, Database}) -> Database end, Databases)} + ]}. %%-------------------------------------------------------------------- %% Function: get_status/1