Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Commit

Permalink
Merge branch 'v52'
Browse files Browse the repository at this point in the history
  • Loading branch information
archaelus committed Sep 13, 2012
2 parents 366c4ea + 2bdddfe commit 74cbf25
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 4 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Expand Up @@ -5,7 +5,7 @@
,{mochiweb, "", {git, "git@git.herokai.com:mochiweb.git", "master"}}
,{pagerduty, "", {git, "git@git.herokai.com:erlang_pagerduty.git", "master"}}
,{redgrid, "", {git, "git@git.herokai.com:redgrid.git", "stable"}}
,{redo, "", {git, "git@git.herokai.com:redo.git", "redo_clean_shutdown"}}
,{redo, "^1.1", {git, "git@git.herokai.com:redo.git", "v1.1.0"}}
,{nsync, "", {git, "git@git.herokai.com:nsync.git", "master"}}
,{cowboy, "", {git, "git@git.herokai.com:cowboy.git", "client_improvements"}}
,{quoted, "", {git, "git@git.herokai.com:quoted.git", "master"}}
Expand Down
1 change: 1 addition & 0 deletions src/logplex_channel.erl
Expand Up @@ -76,6 +76,7 @@ flags(#channel{flags=Flags}) -> Flags.

register({channel, ChannelId} = C)
when is_integer(ChannelId) ->
put(logplex_channel_id, ChannelId), %% post mortem debug info
gproc:add_local_property(C, true).

whereis({channel, _ChannelId} = Name) ->
Expand Down
4 changes: 4 additions & 0 deletions src/logplex_drain.erl
Expand Up @@ -284,6 +284,10 @@ register(DrainId, ChannelId, Type, Dest)

-spec register(id(), atom(), term()) -> ok.
register(DrainId, Type, Dest) ->
put(logplex_drain_id, DrainId), %% post mortem debug info
put(logplex_drain_dest, Dest), %% post mortem debug info
put(logplex_drain_type, Type), %% post mortem debug info

gproc:reg({n, l, {drain, DrainId}}, undefined),
gproc:mreg(p, l, [{drain_dest, Dest},
{drain_type, Type}]),
Expand Down
18 changes: 18 additions & 0 deletions src/logplex_ops.erl
@@ -0,0 +1,18 @@
%%%-------------------------------------------------------------------
%%% @doc
%%% Provides operational functions for inspecting running logplex.
%%% Only for use in the shell. Do NOT call from other modules.
%%% @end
%%%-------------------------------------------------------------------
-module(logplex_ops).

-export([version/0,
git_commit/0]).

version() ->
os:cmd("git branch 2> /dev/null | sed -e '/^[^*]/d'").

git_commit() ->
os:cmd("git log -1 --format=\"%H\"").


6 changes: 3 additions & 3 deletions src/logplex_redis_writer.erl
Expand Up @@ -50,7 +50,7 @@ loop(BufferPid, Socket, RedisOpts) ->
{ok, _} -> ok;
{error, timeout} -> ok;
{error, closed} ->
error_logger:error_msg("~p event=recv result=closed", [?MODULE]),
?INFO("event=recv result=closed", []),
exit({error, closed})
end,
case catch logplex_queue:out(BufferPid, 100) of
Expand All @@ -63,11 +63,11 @@ loop(BufferPid, Socket, RedisOpts) ->
logplex_stats:incr(message_processed, NumItems),
logplex_realtime:incr(message_processed, NumItems);
{error, closed} ->
error_logger:error_msg("~p event=send result=closed", [?MODULE]),
?INFO("event=send result=closed", []),
timer:sleep(500),
exit(normal);
Err ->
error_logger:error_msg("~p event=send result=~p", [?MODULE, Err]),
?INFO("event=send result=~p", [Err]),
exit(normal)
end
end,
Expand Down
1 change: 1 addition & 0 deletions src/logplex_tail.erl
Expand Up @@ -37,6 +37,7 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

register(ChannelId) when is_integer(ChannelId) ->
put(logplex_tail, {channel_id, ChannelId}), %% post mortem debug info
Self = self(),
gen_server:abcast([node()|nodes()], ?MODULE, {register, ChannelId, Self}),
ok.
Expand Down
23 changes: 23 additions & 0 deletions upgrades/v50_51/live_upgrade.erl
@@ -0,0 +1,23 @@
f(UpgradeNode).
UpgradeNode = fun () ->
case logplex_app:config(git_branch) of
"v50" ->
io:format(whereis(user), "at=upgrade_start cur_vsn=50~n", []);
"v51" ->
io:format(whereis(user),
"at=upgrade type=retry cur_vsn=51 old_vsn=50~n", []);
Else ->
io:format(whereis(user),
"at=upgrade_start old_vsn=~p abort=wrong_version", [tl(Else)]),
erlang:error({wrong_version, Else})
end,

%% Stateless
l(logplex_drain),
l(logplex_channel),
l(logplex_tail),

io:format(whereis(user), "at=upgrade_end cur_vsn=51~n", []),
application:set_env(logplex, git_branch, "v51"),
ok
end.
30 changes: 30 additions & 0 deletions upgrades/v50_51/rolling_upgrade.erl
@@ -0,0 +1,30 @@
f(NodeVersions).
NodeVersions = fun () ->
F = fun () ->
{node(), logplex_app:config(git_branch),
os:getenv("INSTANCE_NAME")}
end,
{Good, _} = rpc:multicall(erlang, apply, [F, [] ]),
lists:keysort(2, Good)
end.

f(NodesAt).
NodesAt = fun (Vsn) ->
[ N || {N, V, _} <- NodeVersions(), V =:= Vsn ]
end.

f(RollingUpgrade).
RollingUpgrade = fun (Nodes) ->
lists:foldl(fun (N, {good, Upgraded}) ->
case rpc:call(N, erlang, apply, [ UpgradeNode, [] ]) of
ok ->
{good, [N | Upgraded]};
Else ->
{{bad, N, Else}, Upgraded}
end;
(N, {_, _} = Acc) -> Acc
end,
{good, []},
Nodes)
end.

22 changes: 22 additions & 0 deletions upgrades/v51_52/live_upgrade.erl
@@ -0,0 +1,22 @@
f(UpgradeNode).
UpgradeNode = fun () ->
case logplex_app:config(git_branch) of
"v51" ->
io:format(whereis(user), "at=upgrade_start cur_vsn=51~n", []);
"v52" ->
io:format(whereis(user),
"at=upgrade type=retry cur_vsn=51 old_vsn=51~n", []);
Else ->
io:format(whereis(user),
"at=upgrade_start old_vsn=~p abort=wrong_version", [tl(Else)]),
erlang:error({wrong_version, Else})
end,

%% Stateless
l(redo),
l(logplex_redis_writer),

io:format(whereis(user), "at=upgrade_end cur_vsn=52~n", []),
application:set_env(logplex, git_branch, "v52"),
ok
end.
30 changes: 30 additions & 0 deletions upgrades/v51_52/rolling_upgrade.erl
@@ -0,0 +1,30 @@
f(NodeVersions).
NodeVersions = fun () ->
F = fun () ->
{node(), logplex_app:config(git_branch),
os:getenv("INSTANCE_NAME")}
end,
{Good, _} = rpc:multicall(erlang, apply, [F, [] ]),
lists:keysort(2, Good)
end.

f(NodesAt).
NodesAt = fun (Vsn) ->
[ N || {N, V, _} <- NodeVersions(), V =:= Vsn ]
end.

f(RollingUpgrade).
RollingUpgrade = fun (Nodes) ->
lists:foldl(fun (N, {good, Upgraded}) ->
case rpc:call(N, erlang, apply, [ UpgradeNode, [] ]) of
ok ->
{good, [N | Upgraded]};
Else ->
{{bad, N, Else}, Upgraded}
end;
(N, {_, _} = Acc) -> Acc
end,
{good, []},
Nodes)
end.

0 comments on commit 74cbf25

Please sign in to comment.