Skip to content

Commit

Permalink
Sequentially replicates to new servers before setting up continuous r…
Browse files Browse the repository at this point in the history
…eplication to each of them. The status information about resharding is now tracking correctly instead of showing ready when replication is initiated like it used to be when setting up continuous replication directly.
  • Loading branch information
khellan committed Jun 2, 2010
1 parent 4e9640a commit f210289
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 60 deletions.
119 changes: 68 additions & 51 deletions src/pillow_routing_table.erl
Expand Up @@ -23,6 +23,9 @@
% Functions for general use
-export([update_routing_table/0, to_list/0, get_server/1, reshard/0, flip/0]).

% Functions meant for spawning
-export([execute_reshard/1]).

-vsn(0.4).

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -61,7 +64,7 @@ terminate(_Reason, _RoutingTable) ->
%%--------------------------------------------------------------------
init(_) ->
pillow_monitor:add_status(pillow_reshard_status, ok),
set_routing_table(couch_config:get("routing", "routing_table")).
set_routing_table(couch_config:get("routing", "routing_table"), true).

%%--------------------------------------------------------------------
%% Function: to_list/0
Expand Down Expand Up @@ -90,7 +93,7 @@ update_routing_table() ->
%%--------------------------------------------------------------------
%% Function: reshard/0
%% Description: Currently sets up validation filters for resharding the databases
%% Returns: The new dict
%% Returns: ok
%%--------------------------------------------------------------------
reshard() ->
gen_server:call(?MODULE, reshard).
Expand All @@ -107,7 +110,7 @@ flip() ->
%%--------------------------------------------------------------------
%% Function: get_status/0
%% Description: Toggles the resharding in progress flag
%% Returns: {CurrentRoutingTableAsList, NewRoutingTableAsList}
%% Returns: {CurrentRoutingTableAsList, NewRoutingTableAsList, Databases}
%%--------------------------------------------------------------------
get_status() ->
gen_server:call(?MODULE, status).
Expand All @@ -123,22 +126,20 @@ handle_call(to_list, _From, RoutingTable) ->
{reply, dict:to_list(RoutingTable), RoutingTable};
handle_call(update_routing_table, _From, _OldRoutingTable) ->
{upgrade, PreVersion, PostVersion} = reload_routing_table(),
{reply, {upgrade, PreVersion, PostVersion}, set_routing_table(couch_config:get("routing", "routing_table"))};
{reply, {upgrade, PreVersion, PostVersion}, set_routing_table(couch_config:get("routing", "routing_table"), true)};
handle_call(reshard, _From, RoutingTable) ->
pillow_monitor:update_status(pillow_reshard_status, resharding),
Result = execute_reshard(RoutingTable),
pillow_monitor:update_status(pillow_reshard_status, ready),
{reply, Result, RoutingTable};
spawn(?MODULE, execute_reshard, [RoutingTable]),
{reply, ok, RoutingTable};
handle_call(flip, _From, RoutingTable) ->
{ok, NewRoutingTable} = execute_flip(RoutingTable),
pillow_monitor:update_status(pillow_reshard_status, ok),
pillow_monitor:update_status(pillow_reshard_status, ok),
{reply, ok, NewRoutingTable};
handle_call({set_routing_table, NewRoutingTable}, _From, _RoutingTable) ->
{ok, RoutingTable} = set_routing_table(NewRoutingTable),
{ok, RoutingTable} = set_routing_table(NewRoutingTable, true),
{reply, ok, RoutingTable};
handle_call(status, _From, RoutingTable) ->
{ok, NewRoutingTable} = safely_get_resharding_routing_table(),
{reply, {dict:to_list(RoutingTable), dict:to_list(NewRoutingTable)}, RoutingTable}.
{ok, NewRoutingTable} = safely_get_resharding_routing_table(false),
{reply, {dict:to_list(RoutingTable), dict:to_list(NewRoutingTable), get_databases()}, RoutingTable}.

%%--------------------------------------------------------------------
%% Function: handle_cast/2
Expand All @@ -163,7 +164,7 @@ handle_info(Info, RoutingTable) ->
%% Returns: {ok, RoutingTable}
%%--------------------------------------------------------------------
code_change(_OldVsn, _RoutingTable, _Extra) ->
{ok, set_routing_table(couch_config:get("routing", "routing_table"))}.
{ok, set_routing_table(couch_config:get("routing", "routing_table"), true)}.

%%--------------------------------------------------------------------
%% INTERNAL FUNCTIONS
Expand Down Expand Up @@ -235,35 +236,45 @@ create_shard_validators(Db, Dict) ->
create_shard_validator(Db, 1, Dict).

%%--------------------------------------------------------------------
%% Function: init_replication/3
%% Description: Sets up a continuous pull replication
%% Function: init_replication/4
%% Description: Sets up a pull replication
%% Returns: The ibrowse response
%%--------------------------------------------------------------------
init_replication(Db, Source, Target) ->
init_replication(Db, Source, Target, Continuous) ->
Url = Source ++ "_replicate",
TargetDb = Target ++ Db,
io:format("init_replication from ~s/~s to ~s~n", [Source, Db, TargetDb]),
Message = mochijson2:encode(
{struct, [
{<<"source">>, list_to_binary(Db)},
{<<"target">>, list_to_binary(TargetDb)},
{<<"continuous">>, true}
]}
),
ReplicationDefinition = [
{<<"source">>, list_to_binary(Db)},
{<<"target">>, list_to_binary(TargetDb)}
],
ReplicationSpecification = case Continuous of
true -> lists:flatten(ReplicationDefinition, [{<<"continuous">>, Continuous}]);
_ -> ReplicationDefinition
end,
Message = mochijson2:encode({struct, ReplicationSpecification}),
io:format("~s: ~s~n", [Url, Message]),
ibrowse:send_req(Url, [], post, Message).
% {ok, "202", "Headers", "Response"}.
% Status = case Continuous of
% true -> "202";
% _ -> "200"
% end,
% {ok, Status, "Headers", "Response"}.

%%--------------------------------------------------------------------
%% Function: init_all_replication/5
%% Function: init_all_replication/6
%% Description: Sets up a continuous push replication for all shards
%% Returns: ok or error depending on result
%%--------------------------------------------------------------------
init_all_replication(Db, SourceShard, TargetShard, SourceDict, TargetDict) ->
init_all_replication(Db, SourceShard, TargetShard, SourceDict, TargetDict, Continuous) ->
Status = case Continuous of
true -> "202";
_ -> "200"
end,
CurrentResult = case dict:find(SourceShard, SourceDict) of
{ok, SourceUrl} ->
case dict:find(TargetShard, TargetDict) of
{ok, TargetUrl} -> init_replication(Db, SourceUrl, TargetUrl);
{ok, TargetUrl} -> init_replication(Db, SourceUrl, TargetUrl, Continuous);
_ ->
io:format("Tried to find ~i of ~i target shards", [TargetShard, dict:size(TargetDict)]),
error
Expand All @@ -273,17 +284,17 @@ init_all_replication(Db, SourceShard, TargetShard, SourceDict, TargetDict) ->
error
end,
NextResult = case CurrentResult of
{ok, "202", _Headers1, _Response1} ->
{ok, Status, _Headers1, _Response1} ->
case dict:size(TargetDict) of
TargetShard -> ok;
_ -> init_all_replication(Db, SourceShard, TargetShard + 1, SourceDict, TargetDict)
_ -> init_all_replication(Db, SourceShard, TargetShard + 1, SourceDict, TargetDict, Continuous)
end
end,
case NextResult of
ok ->
case dict:size(SourceDict) of
SourceShard -> ok;
_ -> init_all_replication(Db, SourceShard + 1, TargetShard, SourceDict, TargetDict)
_ -> init_all_replication(Db, SourceShard + 1, TargetShard, SourceDict, TargetDict, Continuous)
end
end.

Expand All @@ -292,11 +303,11 @@ init_all_replication(Db, SourceShard, TargetShard, SourceDict, TargetDict) ->
%% Description: Initiates replication for all databases
%% Returns: ok or error depending on result
%%--------------------------------------------------------------------
init_all_databases_replication([], _, _) -> ok;
init_all_databases_replication([Db | Tail], RoutingTable, NewRoutingTable) ->
init_all_databases_replication([], _, _, _) -> ok;
init_all_databases_replication([Db | Tail], RoutingTable, NewRoutingTable, Continuous) ->
io:format("Setting up replication of ~s~n", [Db]),
init_all_replication(Db, 1, 1, RoutingTable, NewRoutingTable),
init_all_databases_replication(Tail, RoutingTable, NewRoutingTable).
init_all_replication(Db, 1, 1, RoutingTable, NewRoutingTable, Continuous),
init_all_databases_replication(Tail, RoutingTable, NewRoutingTable, Continuous).

%%--------------------------------------------------------------------
%% Function: create_all_databases_shard_validators/3
Expand All @@ -310,22 +321,22 @@ create_all_databases_shard_validators([Db | Tail], NewRoutingTable) ->
create_all_databases_shard_validators(Tail, NewRoutingTable).

%%--------------------------------------------------------------------
%% Function: get_resharding_routing_table/0
%% Function: get_resharding_routing_table/1
%% Description: Gets the config stating the new routing Table
%% Returns: ok or error depending on result
%%--------------------------------------------------------------------
get_resharding_routing_table() ->
set_routing_table(couch_config:get("resharding", "routing_table")).
get_resharding_routing_table(LogInfo) ->
set_routing_table(couch_config:get("resharding", "routing_table"), LogInfo).

%%--------------------------------------------------------------------
%% Function: safely_get_resharding_routing_table/0
%% Function: safely_get_resharding_routing_table/1
%% Description: Same as above, but doesn't fail if not found
%% Returns: The resharding routing table or an empty dict
%%--------------------------------------------------------------------
safely_get_resharding_routing_table() ->
safely_get_resharding_routing_table(LogInfo) ->
case couch_config:get("resharding", "routing_table") of
undefined -> {ok, dict:new()};
List -> set_routing_table(List)
List -> set_routing_table(List, LogInfo)
end.

%%--------------------------------------------------------------------
Expand All @@ -334,9 +345,12 @@ safely_get_resharding_routing_table() ->
%% Returns: ok or error depending on result
%%--------------------------------------------------------------------
execute_reshard(RoutingTable) ->
{ok, NewRoutingTable} = get_resharding_routing_table(),
pillow_monitor:update_status(pillow_reshard_status, resharding),
{ok, NewRoutingTable} = get_resharding_routing_table(true),
create_all_databases_shard_validators(get_databases(), NewRoutingTable),
init_all_databases_replication(get_databases(), RoutingTable, NewRoutingTable).
init_all_databases_replication(get_databases(), RoutingTable, NewRoutingTable, false),
init_all_databases_replication(get_databases(), RoutingTable, NewRoutingTable, true),
pillow_monitor:update_status(pillow_reshard_status, ready).

%%--------------------------------------------------------------------
%% Function: execute_flip/1
Expand All @@ -345,7 +359,7 @@ execute_reshard(RoutingTable) ->
%% Returns: {ok, NewRoutingTable} or {error, RoutingTable}
%%--------------------------------------------------------------------
execute_flip(RoutingTable) ->
case set_routing_table(couch_config:get("resharding", "routing_table")) of
case set_routing_table(couch_config:get("resharding", "routing_table"), true) of
{ok, NewRoutingTable} ->
ok = couch_config:set("old_routing", "routing_table", couch_config:get("routing", "routing_table"), true),
ok = couch_config:set("routing", "routing_table", couch_config:get("resharding", "routing_table"), true),
Expand All @@ -365,23 +379,26 @@ get_routing(Id, Dict) ->
dict:fetch(Key, Dict).

%%--------------------------------------------------------------------
%% Function: set_routing/3
%% Function: set_routing/4
%% Description: Adds the new route to the routing table
%% Returns: The updated routing table
%%--------------------------------------------------------------------
set_routing(_NextNum, [], Dict) -> Dict;
set_routing(NextNum, [Head|Tail], OldDict) ->
io:format("Route shard: ~B -> ~s~n", [NextNum, Head]),
set_routing(_NextNum, [], Dict, _LogInfo) -> Dict;
set_routing(NextNum, [Head|Tail], OldDict, LogInfo) ->
case LogInfo of
true -> io:format("Route shard: ~B -> ~s~n", [NextNum, Head]);
_ -> ok
end,
NewDict = dict:store(NextNum, Head, OldDict),
set_routing(NextNum + 1, Tail, NewDict).
set_routing(NextNum + 1, Tail, NewDict, LogInfo).

%%--------------------------------------------------------------------
%% Function: set_routing_table/1
%% Function: set_routing_table/2
%% Description: Purges and reloads the routing table
%% Returns: {upgrade, PreVersion, PostVersion}
%%--------------------------------------------------------------------
set_routing_table(NewRoutingTable) ->
FullTable = set_routing(1, re:split(NewRoutingTable, " *, *", [{return, list}]), dict:new()),
set_routing_table(NewRoutingTable, LogInfo) ->
FullTable = set_routing(1, re:split(NewRoutingTable, " *, *", [{return, list}]), dict:new(), LogInfo),
{ok, FullTable}.

%%--------------------------------------------------------------------
Expand Down
38 changes: 29 additions & 9 deletions src/pillow_status.erl
Expand Up @@ -49,28 +49,48 @@ to_html(ReqData, Context) ->
%% INTERNAL FUNCTIONS
%%--------------------------------------------------------------------

%%--------------------------------------------------------------------
%% Function: html_list/3
%% Description: Creates html list from the input information
%% Returns: HTML
%%--------------------------------------------------------------------
html_list(Type, Class, Items) ->
"<" ++ Type ++ " class=" ++ Class ++ ">"
++ lists:map(fun(Tuple) ->
"<li>"
++ case Tuple of
{_, Item} -> Item;
_ -> Tuple
end
++ "</li>" end, Items)
++ "</ul>".

%%--------------------------------------------------------------------
%% Function: html_encode/1
%% Description: Creates html status from the input information
%% Returns: HTML status information
%%--------------------------------------------------------------------
html_encode({Version, ReshardStatus, {CurrentServers, NewServers}}) ->
html_encode({Version, ReshardStatus, {CurrentServers, NewServers, Databases}}) ->
"<!DOCTYPE HTML PUBLIC '-//W3C//DTD HTML 4.01 Transitional//EN'>"
++ "<html>"
++ "<head><title>Pillow Status</title>"
++ "<meta http-equiv='refresh' content='5' />"
++ "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'></head>"
++ "<body><h1>Pillow " ++ Version ++ " Status</h1>"
++ "<div class='reshard_status'>Servers are " ++ io_lib:format("~s", [ReshardStatus]) ++ "</div>"
++ "<div class='current_servers'><h2>Current Servers</h2><ul>"
++ lists:map(fun({_, Server}) -> "<li>" ++ Server ++ "</li>" end, CurrentServers)
++ "</ul></div>"
++ "<div class='new_servers'><h2>New Servers</h2><ul>"
++ lists:map(fun({_, Server}) -> "<li>" ++ Server ++ "</li>" end, NewServers)
++ "</ul></div></body></html>".
++ "<div class='databases'><h2>Databases</h2>" ++ html_list("ul", "", Databases) ++ "</div>"
++ "<div class='current_servers'><h2>Current Servers</h2>" ++ html_list("ul", "", CurrentServers) ++ "</div>"
++ "<div class='new_servers'><h2>New Servers</h2>" ++ html_list("ul", "", NewServers) ++ "</div>"
++ "</body></html>".

json_prepare_status({Version, Servers}) ->
{struct, [{version, Version}, {servers, lists:map(fun({_, Server}) -> Server end, Servers)}]}.
json_prepare_status({Version, ReshardStatus, {CurrentServers, NewServers, Databases}}) ->
{struct, [
{version, Version},
{reshard_status, ReshardStatus},
{current_servers, lists:map(fun({_, Server}) -> Server end, CurrentServers)},
{new_servers, lists:map(fun({_, Server}) -> Server end, NewServers)},
{databases, lists:map(fun({_, Database}) -> Database end, Databases)}
]}.

%%--------------------------------------------------------------------
%% Function: get_status/1
Expand Down

0 comments on commit f210289

Please sign in to comment.