Skip to content
Browse files

Merge branch 'buildpackization'

  • Loading branch information...
2 parents 182c317 + b780b1d commit d92b056b6a9709b98745b22385f00b24e4c1d539 @archaelus archaelus committed May 2, 2012
Showing with 270 additions and 280 deletions.
  1. +0 −7 Makefile
  2. +1 −1 Procfile
  3. +0 −3 bin/api_test
  4. +3 −3 bin/connect
  5. +0 −11 bin/console
  6. +0 −13 bin/get-deps
  7. +0 −27 bin/load
  8. +0 −52 bin/load.js
  9. +3 −0 bin/logplex
  10. +0 −7 bin/logplex.local
  11. +0 −9 bin/logplex.sh
  12. +0 −14 bin/recover_db
  13. +0 −23 bin/test
  14. +0 −7 bin/weight
  15. +1 −1 rebar.config
  16. +0 −98 release/build_rel.escript
  17. +23 −0 src/logplex_app.erl
  18. +3 −1 src/logplex_queue.erl
  19. +211 −1 src/logplex_shard.erl
  20. +23 −1 src/logplex_shard_info.erl
  21. +2 −1 src/tcp_proxy.erl
View
7 Makefile
@@ -1,7 +0,0 @@
-all:
- @rm -rf deps
- @./rebar get-deps update-deps compile
- @ERL_LIBS=`pwd`/deps escript release/build_rel.escript boot logplex `pwd`/ebin
-
-clean:
- @./rebar clean
View
2 Procfile
@@ -1 +1 @@
-logplex: bin/logplex.local
+logplex: bin/logplex
View
3 bin/api_test
@@ -1,3 +0,0 @@
-#!/bin/sh
-
-envdir keys erl -name logplex_api_test@$hostname -pa ebin -pa deps/*/ebin -boot start_sasl -boot crypto -noshell -eval 'eunit:test({application,logplex})' -s init stop
View
6 bin/connect
@@ -1,7 +1,7 @@
#!/bin/bash
+. /home/logplex/keys.sh
+
HOST=`hostname --fqdn`
-COOKIE=`awk -F"'" '/LOGPLEX_COOKIE/ { print $2 }' /home/logplex/keys.sh`
-HOME=/home/logplex
-erl -name remsh@$HOST -hidden -setcookie $COOKIE -remsh logplex@$HOST
+erl -name remsh@$HOST -hidden -setcookie $LOGPLEX_COOKIE -remsh logplex@$HOST
View
11 bin/console
@@ -1,11 +0,0 @@
-#!/bin/sh
-
-unamestr=`uname`
-if [[ "$unamestr" == 'Darwin' ]]; then
- hostname=`hostname`
-else
- hostname=`hostname --fqdn`
-fi
-
-ulimit -n 65535
-erl +K true +A30 +P500000 -env ERL_MAX_PORTS 65535 -env HTTP_PORT 8002 -env INSTANCE_NAME localhost -env LOGPLEX_AUTH_KEY secret -env LOGPLEX_WORKERS 1 -env LOGPLEX_DRAIN_WRITERS 1 -env LOGPLEX_REDIS_WRITERS 1 -env LOGPLEX_READERS 1 -name logplex_console@$hostname -pa ebin -pa deps/*/ebin -boot release/logplex-1.0
View
13 bin/get-deps
@@ -1,13 +0,0 @@
-#!/bin/bash
-
-if [[ $1 = "development" ]]; then
- git config -f .gitmodules submodule.deps/redis_pool.url git://github.com/JacobVorreuter/redis_pool.git
- git config -f .gitmodules submodule.deps/pagerduty.url git://github.com/JacobVorreuter/pagerduty.git
- git config -f .gitmodules submodule.deps/mochiweb.url git://github.com/mochi/mochiweb.git
- git config -f .gitmodules submodule.deps/redgrid.url git://github.com/JacobVorreuter/redgrid.git
- git config -f .gitmodules submodule.deps/redo.url git://github.com/JacobVorreuter/redo.git
- git config -f .gitmodules submodule.deps/nsync.url git://github.com/JacobVorreuter/nsync.git
- git submodule sync
-fi
-
-git submodule update --init
View
27 bin/load
@@ -1,27 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-%%! -pa ebin -pa deps/mochiweb/ebin
-
-main([]) ->
- io:format("USAGE: bin/load [module1 module2 ...]~n");
-
-main(Modules) ->
- application:start(inets),
- JSON = iolist_to_binary(mochijson2:encode([list_to_binary(Module) || Module <- Modules])),
- {ok, File} = file:read_file("keys.sh"),
- {match, [Auth]} = re:run(File, "export LOGPLEX_AUTH_KEY='(\\S+)'", [{capture, all_but_first, binary}]),
- Headers = [
- {"Authorization", Auth}
- ],
- case httpc:request(post, {"http://localhost:8001/load", Headers, "application/json", JSON}, [], []) of
- {ok, {{_, 200, _}, _, Body}} ->
- io:format("~p~n", [Body]),
- ok;
- {ok, {{_, 400, _}, _, Body}} ->
- io:format("~p~n", [Body]),
- halt(1);
- {error, Reason} ->
- io:format("~p~n", [Reason]),
- halt(1)
- end.
-
View
52 bin/load.js
@@ -1,52 +0,0 @@
-var dgram = require('dgram');
-var net = require('net');
-var sys = require('sys');
-
-var token;
-var host;
-var port;
-var rate = 100;
-var proto = 'tcp';
-
-for (i = 2; i < process.argv.length; i++) {
- switch (process.argv[i]) {
- case "--token":
- token = process.argv[++i];
- break;
- case "--host":
- host = process.argv[++i];
- break;
- case "--port":
- port = parseInt(process.argv[++i]);
- break;
- case "--rate":
- rate = parseInt(process.argv[++i]);
- break;
- case "--proto":
- proto = process.argv[++i];
- break;
- }
-}
-
-var message = new Buffer("<40>1 2010-11-10T17:16:33-08:00 domU-12-31-39-13-74-02 " + token + " web.1 - - State changed from created to starting");
-
-var tick;
-
-if (proto == 'tcp') {
- var client = net.createConnection(port, host);
- tick = function(counter) {
- client.write(message);
- if (counter == rate) setTimeout(tick, 1, 0);
- else tick(counter+1);
- };
-} else if (proto == 'udp') {
- var client = dgram.createSocket("udp4");
- tick = function(counter) {
- client.send(message, 0, message.length, port, host);
- if (counter == rate) setTimeout(tick, 1, 0);
- else tick(counter+1);
- };
-}
-
-setTimeout(tick, 1, 0);
-
View
3 bin/logplex
@@ -0,0 +1,3 @@
+#!/bin/sh
+
+erl +K true +A100 +P500000 -env ERL_MAX_PORTS 65535 -kernel inet_dist_listen_min 9100 -kernel inet_dist_listen_max 9200 -name logplex@`hostname --fqdn` -pa ebin -env ERL_LIBS deps -noshell -noinput -s logplex_app -setcookie ${LOGPLEX_COOKIE}
View
7 bin/logplex.local
@@ -1,7 +0,0 @@
-#!/bin/sh
-
-export HTTP_PORT="$PORT"
-export SERVER_UID=`id -u $USER`
-export SERVER_GID=`id -g $USER`
-
-erl +K true +A100 +P500000 -kernel inet_dist_listen_min 9100 -kernel inet_dist_listen_max 9200 -env ERL_FULLSWEEP_AFTER 0 -env ERL_MAX_PORTS 65535 -name logplex@`hostname --fqdn` -pa ebin -pa deps/*/ebin -noshell -boot release/logplex-1.0
View
9 bin/logplex.sh
@@ -1,9 +0,0 @@
-#!/bin/sh
-
-export USER=logplex
-export SERVER_UID=`id -u $USER`
-export SERVER_GID=`id -g $USER`
-export HOME=/home/logplex
-
-ulimit -n 65535
-erl +K true +A100 +P500000 -kernel inet_dist_listen_min 9100 -kernel inet_dist_listen_max 9200 -env ERL_FULLSWEEP_AFTER 0 -env ERL_MAX_PORTS 65535 -name logplex@`hostname --fqdn` -pa ebin -pa deps/*/ebin -noshell -boot release/logplex-1.0
View
14 bin/recover_db
@@ -1,14 +0,0 @@
-#!/bin/sh
-
-if [ $# -ne 1 ]
-then
- echo "Usage: `basename $0` {node}"
- exit 1
-fi
-
-HOSTNAME=`hostname --fqdn`
-COOKIE=`awk -F"'" '/LOGPLEX_COOKIE/ { print $2 }' /home/logplex/keys.sh`
-HOME=/home/logplex
-
-erl -name recover@$HOSTNAME -pa ebin -pa deps/*/ebin -setcookie $COOKIE -noinput -eval "logplex_utils:rpc('logplex@$HOSTNAME', 'logplex_db', 'recover_from', ['$1'])" -s init stop
-
View
23 bin/test
@@ -1,23 +0,0 @@
-#!/bin/sh
-
-unamestr=`uname`
-if [[ "$unamestr" == 'Darwin' ]]; then
- hostname=`hostname`
-else
- hostname=`hostname --fqdn`
-fi
-
-if [[ "$HTTP_PORT" == '' ]]; then
- export HTTP_PORT=8002
-fi
-
-if [[ "$INSTANCE_NAME" == '' ]]; then
- export INSTANCE_NAME=localhost
-fi
-
-if [[ "$LOGPLEX_AUTH_KEY" == '' ]]; then
- export LOGPLEX_AUTH_KEY=secret
-fi
-
-ulimit -n 65535
-erl +K true +A30 +P500000 -env ERL_MAX_PORTS 65535 -env LOGPLEX_WORKERS 1 -env LOGPLEX_DRAIN_WRITERS 1 -env LOGPLEX_REDIS_WRITERS 1 -env LOGPLEX_READERS 1 -name logplex_test@$hostname -pa ebin -pa deps/*/ebin -boot release/logplex-1.0 -noshell -eval 'eunit:test({application,logplex})' -s init stop
View
7 bin/weight
@@ -1,7 +0,0 @@
-#!/bin/sh
-
-HOSTNAME=`hostname --fqdn`
-COOKIE=`awk -F"'" '/LOGPLEX_COOKIE/ { print $2 }' /home/logplex/keys.sh`
-HOME=/home/logplex
-
-erl -name nodes@$HOSTNAME -pa ebin -pa deps/*/ebin -hidden -setcookie $COOKIE -noinput -eval "logplex_utils:rpc('logplex@$HOSTNAME', 'logplex_utils', 'set_weight', [$1])" -s init stop
View
2 rebar.config
@@ -5,7 +5,7 @@
,{mochiweb, "", {git, "git@git.herokai.com:mochiweb.git", "master"}}
,{pagerduty, "", {git, "git@git.herokai.com:erlang_pagerduty.git", "master"}}
,{redgrid, "", {git, "git@git.herokai.com:redgrid.git", "stable"}}
- ,{redo, "", {git, "git@git.herokai.com:redo.git", "master"}}
+ ,{redo, "", {git, "git@git.herokai.com:redo.git", "redo_clean_shutdown"}}
,{nsync, "", {git, "git@git.herokai.com:nsync.git", "master"}}
,{cowboy, "", {git, "git@git.herokai.com:cowboy.git", "master"}}
,{quoted, "", {git, "git@git.herokai.com:quoted.git", "master"}}
View
98 release/build_rel.escript
@@ -1,98 +0,0 @@
-#!/usr/bin/env escript
-
-main(["target", AppName, EbinDir]) ->
- code:add_patha(EbinDir),
- AppProps = app_props(AppName),
- AppDeps = proplists:get_value(applications, AppProps, []),
- AppVsn = proplists:get_value(vsn, AppProps),
- ok = write_rel_file(AppName, AppDeps, AppVsn),
- file:write_file("sys.config", <<"[].">>),
- file:make_dir("releases"),
- systools:make_script("release/" ++ AppName ++ "-" ++ AppVsn, [no_module_tests]),
- file:copy(filename:join([code:root_dir(), "bin", "start_clean.boot"]), "bin/start.boot"),
- file:write_file("releases/start_erl.data", iolist_to_binary([erlang:system_info(version), " ", AppVsn, "\n"])),
- systools:make_tar("release/" ++ AppName ++ "-" ++ AppVsn, [{erts, code:root_dir()}, {dirs, ['releases' | tar_dirs()]}]),
- file:delete("sys.config"),
- file:delete("bin/start.boot"),
- file:delete("releases/start_erl.data"),
- file:del_dir("releases"),
- file:delete("release/" ++ AppName ++ "-" ++ AppVsn ++ ".boot"),
- file:delete("release/" ++ AppName ++ "-" ++ AppVsn ++ ".rel"),
- file:delete("release/" ++ AppName ++ "-" ++ AppVsn ++ ".script"),
- ok;
-
-main(["release", AppName, EbinDir]) ->
- case filelib:is_regular("release/" ++ AppName ++ ".appup") of
- true ->
- ok;
- false ->
- io:format("*** missing release/~s.appup~n", [AppName]),
- halt(1)
- end,
- code:add_patha(EbinDir),
- AppProps = app_props(AppName),
- AppDeps = proplists:get_value(applications, AppProps, []),
- AppVsn = proplists:get_value(vsn, AppProps),
- ok = write_rel_file(AppName, AppDeps, AppVsn),
- systools:make_script("release/" ++ AppName ++ "-" ++ AppVsn, [{exref, AppDeps}, {outdir, "release"}]),
- systools:make_tar("release/" ++ AppName ++ "-" ++ AppVsn, [{dirs, tar_dirs()}, {outdir, "release"}]),
- file:delete("release/" ++ AppName ++ "-" ++ AppVsn ++ ".boot"),
- file:delete("release/" ++ AppName ++ "-" ++ AppVsn ++ ".rel"),
- file:delete("release/" ++ AppName ++ "-" ++ AppVsn ++ ".script"),
- ok;
-
-main(["boot", AppName, EbinDir]) ->
- code:add_patha(EbinDir),
- AppProps = app_props(AppName),
- AppDeps = proplists:get_value(applications, AppProps, []),
- AppVsn = proplists:get_value(vsn, AppProps),
- ok = write_rel_file(AppName, AppDeps, AppVsn),
- systools:make_script("release/" ++ AppName ++ "-" ++ AppVsn, [{exref, AppDeps}, {outdir, "release"}]),
- file:delete("release/" ++ AppName ++ "-" ++ AppVsn ++ ".rel"),
- file:delete("release/" ++ AppName ++ "-" ++ AppVsn ++ ".script"),
- ok.
-
-write_rel_file(AppName, AppDeps, AppVsn) ->
- {ok, FD} = file:open("release/" ++ AppName ++ "-" ++ AppVsn ++ ".rel", [write]),
- RelInfo = {release,
- {AppName, AppVsn},
- {erts, erts_vsn()},
- [{Pkg, lib_vsn(Pkg)} || Pkg <- AppDeps] ++
- [{list_to_atom(AppName), AppVsn}]
- },
- io:format(FD, "~p.", [RelInfo]),
- file:close(FD),
- ok.
-
-app_props(AppName) ->
- {ok, [{application,_,AppProps}]} = file:consult("ebin/" ++ AppName ++ ".app"),
- AppProps.
-
-erts_vsn() ->
- erlang:system_info(version).
-
-lib_vsn(App) ->
- load(App),
- {ok, Vsn} = application:get_key(App, vsn),
- Vsn.
-
-load(App) ->
- case application:load(App) of
- ok ->
- ok;
- {error, {already_loaded, _}} ->
- ok;
- E ->
- io:format(standard_error, "Warning - can't load ~p (~p)~n", [App, E]),
- erlang:exit(E)
- end.
-
-tar_dirs() ->
- {ok, Files} = file:list_dir("."),
- [list_to_atom(Dir) || Dir <- lists:filter(
- fun ("." ++ _) -> false;
- (File) ->
- filelib:is_dir(File) andalso not lists:member(File, ["bin"])
- end, Files)].
-
-
View
23 src/logplex_app.erl
@@ -23,6 +23,9 @@
-module(logplex_app).
-behaviour(application).
+-define(APP, logplex).
+
+
%% Application callbacks
-export([start/2, stop/1]).
@@ -31,11 +34,20 @@
,config/0
,config/1
,config/2
+ ,start/0
+ ,a_start/2
]).
-include("logplex.hrl").
-include("logplex_logging.hrl").
+%%%===================================================================
+%%% Convenience Functions
+%%%===================================================================
+
+start() ->
+ a_start(?APP, permanent).
+
%% ===================================================================
%% Application callbacks
%% ===================================================================
@@ -145,3 +157,14 @@ config(Key) ->
config() ->
application:get_all_env(logplex).
+
+a_start(App, Type) ->
+ start_ok(App, Type, application:start(App, Type)).
+
+start_ok(_App, _Type, ok) -> ok;
+start_ok(_App, _Type, {error, {already_started, _App}}) -> ok;
+start_ok(App, Type, {error, {not_started, Dep}}) ->
+ ok = a_start(Dep, Type),
+ a_start(App, Type);
+start_ok(App, _Type, {error, Reason}) ->
+ erlang:error({app_start_failed, App, Reason}).
View
4 src/logplex_queue.erl
@@ -191,7 +191,9 @@ handle_cast({in, Packet}, #state{queue=Queue, length=Length, waiting=Waiting}=St
handle_cast({max_length, MaxLength}, State) ->
{noreply, State#state{max_length=MaxLength}};
-handle_cast(stop, State) ->
+handle_cast(stop, State = #state{workers=Workers}) ->
+ [ exit(Worker, shutdown) ||
+ Worker <- Workers ],
{stop, normal, State};
handle_cast({register, WorkerPid}, #state{workers=Workers}=State) ->
View
212 src/logplex_shard.erl
@@ -29,9 +29,24 @@
-export([lookup/3, lookup_urls/0, urls/0]).
+%% Redis Migration API
+-export([prepare_new_urls/1,
+ update_redis/1,
+ prepare_url_update/2,
+ attempt_to_commit_url_update/1,
+ make_update_permanent/1
+ ]).
+
-include("logplex.hrl").
-include("logplex_logging.hrl").
+-define(NEW_READ_MAP, new_logplex_read_pool_map).
+-define(CURRENT_READ_MAP, logplex_read_pool_map).
+-define(BACKUP_READ_MAP, backup_logplex_read_pool_map).
+-define(NEW_WRITE_MAP, new_logplex_redis_buffer_map).
+-define(CURRENT_WRITE_MAP, logplex_redis_buffer_map).
+-define(BACKUP_WRITE_MAP, backup_logplex_redis_buffer_map).
+
-record(state, {urls}).
-define(TIMEOUT, 30000).
@@ -84,12 +99,58 @@ init([]) ->
%% Description: Handling call messages
%% @hidden
%%--------------------------------------------------------------------
+handle_call({commit, new_shard_info}, _From, State) ->
+ backup_shard_info(),
+ try
+ make_new_shard_info_permanent(),
+ {reply, ok, State}
+ catch C:E ->
+ revert_shard_info(),
+ {reply, {error, {C, E}}, State}
+ end;
+
+handle_call({abort, new_shard_info}, _From, State) ->
+ try
+ true = have_backup(),
+ {reply, revert_shard_info(), State}
+ catch C:E ->
+ {reply, {error, {C, E}}, State}
+ end;
+
+handle_call({prepare, {new_shard_info, OldNewMap}}, _From, State) ->
+ {reply, prepare_new_shard_info(OldNewMap), State};
+
+handle_call({make_permanent, new_shard_info}, _From, State) ->
+ try
+ [ stop_buffer(B)
+ || B <- logplex_shard_info:pid_list(?BACKUP_WRITE_MAP) ],
+ logplex_shard_info:delete(?BACKUP_WRITE_MAP),
+ logplex_shard_info:delete(?NEW_WRITE_MAP),
+ [ stop_pool(P)
+ || P <- logplex_shard_info:pid_list(?BACKUP_READ_MAP) ],
+ logplex_shard_info:delete(?BACKUP_READ_MAP),
+ logplex_shard_info:delete(?NEW_READ_MAP),
+ {reply, ok, State}
+ catch
+ C:E ->
+ {reply, {error, {C,E}}, State}
+ end;
+
handle_call(consistency_check, _From, State = #state{urls = Urls}) ->
{reply, try consistent(Urls)
catch C:E ->
{error, {C, E, erlang:get_stacktrace()}}
end, State};
+handle_call({state_apply, F}, _From, State)
+ when is_function(F, 1) ->
+ case catch F(State) of
+ NewState = #state{} ->
+ {reply, ok, NewState};
+ Else ->
+ {reply, {error, Else}, State}
+ end;
+
handle_call(urls, _From, State) ->
{reply, State#state.urls, State};
@@ -227,7 +288,9 @@ handle_child_death(Pid) ->
NewMap = dict:store(Shard, {Url, NewPid}, Map),
logplex_shard_info:save(logplex_redis_buffer_map, NewMap, V),
?INFO("at=write_pool_restart oldpid=~p newpid=~p",
- [Pid, NewPid])
+ [Pid, NewPid]);
+ undefined ->
+ ?WARN("at=trap_exit err=unknown_pid pid=~p", [Pid])
end,
ok.
@@ -241,3 +304,150 @@ consistent(URLs) ->
lists:member(U,URLs)],
true = length(Correct) =:= length(URLs),
consistent.
+
+%%--------------------------------------------------------------------
+%%% Redis cluster move code
+%%--------------------------------------------------------------------
+
+
+%% Update the boot-time list of redis servers
+update_redis(OldNewMap) ->
+ {OldUrls, NewUrls} = lists:unzip(OldNewMap),
+ [redo:cmd(config, [<<"SADD">>, <<"redis:shard:urls">>, list_to_binary(New)])
+ || New <- NewUrls] ++
+ [redo:cmd(config, [<<"SREM">>, <<"redis:shard:urls">>, list_to_binary(Old)])
+ || Old <- OldUrls].
+
+%% Attempt to create new shard maps with new redo processes. Catch
+%% errors and destroy any created processes.
+prepare_new_shard_info(OldNewMap) ->
+ {links, OldLinks} = process_info(self(), links),
+ try
+ new_shard_info(OldNewMap)
+ catch
+ C:E ->
+ {links, NewLinks} = process_info(self(), links),
+ %% Clean up any new processes we started
+ [ erlang:exit(P, kill)
+ || P <- (NewLinks -- OldLinks),
+ P > self()],
+ delete_new_shard_info(),
+ {error, {C,E}}
+ end.
+
+delete_new_shard_info() ->
+ logplex_shard_info:delete(?NEW_READ_MAP),
+ logplex_shard_info:delete(?NEW_WRITE_MAP),
+ ok.
+
+backup_shard_info() ->
+ logplex_shard_info:copy(?CURRENT_WRITE_MAP, ?BACKUP_WRITE_MAP),
+ logplex_shard_info:copy(?CURRENT_READ_MAP, ?BACKUP_READ_MAP),
+ ok.
+
+have_backup() ->
+ logplex_shard_info:read(?BACKUP_WRITE_MAP) =/= no_such_key
+ andalso logplex_shard_info:read(?BACKUP_READ_MAP) =/= no_such_key.
+
+revert_shard_info() ->
+ logplex_shard_info:copy(?BACKUP_WRITE_MAP, ?CURRENT_WRITE_MAP),
+ logplex_shard_info:copy(?BACKUP_READ_MAP, ?CURRENT_READ_MAP),
+ ok.
+
+make_new_shard_info_permanent() ->
+ logplex_shard_info:copy(?NEW_WRITE_MAP, ?CURRENT_WRITE_MAP),
+ logplex_shard_info:copy(?NEW_READ_MAP, ?CURRENT_READ_MAP),
+ ok.
+
+new_shard_info(OldNewMap) ->
+ {RM, RI, _} = logplex_shard_info:read(?CURRENT_READ_MAP),
+ NewReadMap = dict:map(fun (_Slice, {OldUrl, _OldPid}) ->
+ NewUrl = proplists:get_value(OldUrl, OldNewMap),
+ NewPid = add_pool(NewUrl),
+ {NewUrl, NewPid}
+ end,
+ RM),
+ {WM, WI, _} = logplex_shard_info:read(?CURRENT_WRITE_MAP),
+ NewWriteMap = dict:map(fun (_Slice, {OldUrl, _OldPid}) ->
+ NewUrl = proplists:get_value(OldUrl, OldNewMap),
+ NewPid = add_buffer(NewUrl),
+ {NewUrl, NewPid}
+ end,
+ WM),
+ logplex_shard_info:save(?NEW_READ_MAP,
+ NewReadMap, RI),
+ logplex_shard_info:save(?NEW_WRITE_MAP,
+ NewWriteMap, WI),
+ ok.
+
+prepare_new_urls(NewIps) ->
+ NewIpsSorted = lists:sort(NewIps),
+ OldUrls = lists:sort([binary_to_list(Url)
+ || Url <- redis_helper:shard_urls()]),
+ length(OldUrls) =:= length(NewIpsSorted)
+ orelse erlang:error({invalid_ip_list, different_length_to_existing}),
+ NewUrls = [ begin
+ OldInfo = redo_uri:parse(OldUrl),
+ NewInfo = lists:keyreplace(host, 1, OldInfo, {host, NewIp}),
+ to_redis_url(NewInfo)
+ end
+ || {OldUrl, NewIp} <- lists:zip(OldUrls, NewIpsSorted)],
+ lists:zip(OldUrls, NewUrls).
+
+to_redis_url(Info) ->
+ Host = proplists:get_value(host, Info),
+ Port = proplists:get_value(port, Info),
+ Pass = proplists:get_value(pass, Info),
+ lists:flatten(["redis://", Pass, "@", Host,
+ ":", integer_to_list(Port), "/"]).
+
+prepare_url_update(Nodes, OldNewMap) ->
+ lists:foldl(fun (Node, {good, Acc}) ->
+ try gen_server:call({?MODULE, Node},
+ {prepare, {new_shard_info, OldNewMap}}) of
+ ok ->
+ {good, [Node | Acc]};
+ Err ->
+ {failed, {Node, Err}, Acc}
+ catch
+ C:E ->
+ {failed, {Node, {C,E}}, Acc}
+ end;
+ (_Node, Acc) -> Acc
+ end,
+ {good,[]},
+ Nodes).
+
+attempt_to_commit_url_update(Nodes) ->
+ lists:foldl(fun (Node, {good, Acc}) ->
+ try gen_server:call({?MODULE, Node},
+ {commit, new_shard_info}) of
+ ok ->
+ {good, [Node | Acc]};
+ Err ->
+ abort_url_update(Acc),
+ {failed, {Node, Err}, Acc}
+ catch
+ C:E ->
+ abort_url_update(Acc),
+ {failed, {Node, {C,E}}, Acc}
+ end;
+ (_Node, Acc) -> Acc
+ end,
+ {good,[]},
+ Nodes).
+
+abort_url_update(Nodes) ->
+ [ {N, catch gen_server:call({?MODULE, N}, {abort, new_shard_info})}
+ || N <- Nodes].
+
+make_update_permanent(Nodes) ->
+ [ {N, catch gen_server:call({?MODULE, N},
+ {make_permanent, new_shard_info})}
+ || N <- Nodes].
+
+stop_pool(Pid) ->
+ redo:shutdown(Pid).
+
+stop_buffer(Pid) ->
+ logplex_queue:stop(Pid).
View
24 src/logplex_shard_info.erl
@@ -13,9 +13,15 @@
,cached_read/2
,map_interval/1
,pid_info/1
+ ,map_list/1
+ ,pid_list/1
+ ,copy/2
+ ,delete/1
]).
--type key() :: 'logplex_read_pool_map' | 'logplex_redis_buffer_map'.
+-type key() :: 'logplex_read_pool_map' | 'logplex_redis_buffer_map' |
+ 'new_logplex_read_pool_map' | 'new_logplex_redis_buffer_map' |
+ 'backup_logplex_read_pool_map' | 'backup_logplex_redis_buffer_map'.
-type map() :: dict().
-type interval() :: pos_integer().
-type shard_info() :: {map(), interval(), erlang:timestamp()}.
@@ -93,3 +99,19 @@ pid_info(Pid, {Map, V, _TS}) ->
{Item, Map, V};
[] -> undefined
end.
+
+map_list(Key) ->
+ {Map, _, _} = read(Key),
+ dict:to_list(Map).
+
+pid_list(Key) ->
+ [ Pid || {_, {_, Pid}} <- map_list(Key) ].
+
+copy(FromKey, ToKey) when FromKey =/= ToKey ->
+ {Map, Interval, _} = read(FromKey),
+ save(ToKey, Map, Interval).
+
+delete('logplex_read_pool_map') -> {error, not_allowed};
+delete('logplex_redis_buffer_map') -> {error, not_allowed};
+delete(Key) ->
+ ets:delete(logplex_shard_info, Key).
View
3 src/tcp_proxy.erl
@@ -103,7 +103,8 @@ handle_info({tcp, Sock, Packet},
Class:Ex ->
?WARN("at=process_msgs class=~p ex=~p "
"msg_len=~p stack=~p",
- [Class, Ex, length(Msgs), erlang:get_stacktrace()])
+ [Class, Ex, length(Msgs), erlang:get_stacktrace()]),
+ State
end,
inet:setopts(Sock, [{active, once}]),
{noreply, NewState#state{buffer=NewBuf}};

0 comments on commit d92b056

Please sign in to comment.
Something went wrong with that request. Please try again.