Skip to content

Commit

Permalink
Add cowboy:get_protocol_options/1 and cowboy_set_protocol_options/2
Browse files Browse the repository at this point in the history
This allows any application to upgrade the protocol options without
having to restart the listener. This is most useful to update the
dispatch list of HTTP servers, for example.

The upgrade is done at the acceptor level, meaning only new connections
receive the new protocol options.
  • Loading branch information
Loïc Hoguin committed Jan 31, 2012
1 parent 830cfc0 commit e5aef5c
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 41 deletions.
32 changes: 31 additions & 1 deletion src/cowboy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
%% @doc Cowboy API to start and stop listeners.
-module(cowboy).

-export([start_listener/6, stop_listener/1, child_spec/6, accept_ack/1]).
-export([start_listener/6, stop_listener/1, child_spec/6, accept_ack/1,
get_protocol_options/1, set_protocol_options/2]).

%% @doc Start a listener for the given transport and protocol.
%%
Expand Down Expand Up @@ -83,3 +84,32 @@ child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)
-spec accept_ack(pid()) -> ok.
accept_ack(ListenerPid) ->
receive {shoot, ListenerPid} -> ok end.

%% @doc Return the current protocol options for the given listener.
-spec get_protocol_options(any()) -> any().
get_protocol_options(Ref) ->
ListenerPid = ref_to_listener_pid(Ref),
{ok, ProtoOpts} = cowboy_listener:get_protocol_options(ListenerPid),
ProtoOpts.

%% @doc Upgrade the protocol options for the given listener.
%%
%% The upgrade takes place at the acceptor level, meaning that only the
%% newly accepted connections receive the new protocol options. This has
%% no effect on the currently opened connections.
-spec set_protocol_options(any(), any()) -> ok.
set_protocol_options(Ref, ProtoOpts) ->
ListenerPid = ref_to_listener_pid(Ref),
ok = cowboy_listener:set_protocol_options(ListenerPid, ProtoOpts).

%% Internal.

-spec ref_to_listener_pid(any()) -> pid().
ref_to_listener_pid(Ref) ->
Children = supervisor:which_children(cowboy_sup),
{_, ListenerSupPid, _, _} = lists:keyfind(
{cowboy_listener_sup, Ref}, 1, Children),
ListenerSupChildren = supervisor:which_children(ListenerSupPid),
{_, ListenerPid, _, _} = lists:keyfind(
cowboy_listener, 1, ListenerSupChildren),
ListenerPid.
28 changes: 17 additions & 11 deletions src/cowboy_acceptor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
-module(cowboy_acceptor).

-export([start_link/6]). %% API.
-export([acceptor/6]). %% Internal.
-export([acceptor/7]). %% Internal.

%% API.

Expand All @@ -25,27 +25,33 @@
start_link(LSocket, Transport, Protocol, Opts,
ListenerPid, ReqsSup) ->
Pid = spawn_link(?MODULE, acceptor,
[LSocket, Transport, Protocol, Opts, ListenerPid, ReqsSup]),
[LSocket, Transport, Protocol, Opts, 1, ListenerPid, ReqsSup]),
{ok, Pid}.

%% Internal.

-spec acceptor(inet:socket(), module(), module(), any(),
pid(), pid()) -> no_return().
acceptor(LSocket, Transport, Protocol, Opts, ListenerPid, ReqsSup) ->
case Transport:accept(LSocket, 2000) of
non_neg_integer(), pid(), pid()) -> no_return().
acceptor(LSocket, Transport, Protocol, Opts, OptsVsn, ListenerPid, ReqsSup) ->
Res = case Transport:accept(LSocket, 2000) of
{ok, CSocket} ->
{ok, Pid} = supervisor:start_child(ReqsSup,
[ListenerPid, CSocket, Transport, Protocol, Opts]),
Transport:controlling_process(CSocket, Pid),
ok = cowboy_listener:add_connection(ListenerPid,
default, Pid);
cowboy_listener:add_connection(ListenerPid,
default, Pid, OptsVsn);
{error, timeout} ->
ignore;
ok;
{error, _Reason} ->
%% @todo Probably do something here. If the socket was closed,
%% we may want to try and listen again on the port?
ignore
ok
end,
?MODULE:acceptor(LSocket, Transport, Protocol, Opts,
ListenerPid, ReqsSup).
case Res of
ok ->
?MODULE:acceptor(LSocket, Transport, Protocol,
Opts, OptsVsn, ListenerPid, ReqsSup);
{upgrade, Opts2, OptsVsn2} ->
?MODULE:acceptor(LSocket, Transport, Protocol,
Opts2, OptsVsn2, ListenerPid, ReqsSup)
end.
94 changes: 66 additions & 28 deletions src/cowboy_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@
-module(cowboy_listener).
-behaviour(gen_server).

-export([start_link/1, stop/1,
add_connection/3, move_connection/3, remove_connection/2]). %% API.
-export([start_link/2, stop/1,
add_connection/4, move_connection/3, remove_connection/2,
get_protocol_options/1, set_protocol_options/2]). %% API.
-export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]). %% gen_server.

-type pools() :: [{atom(), non_neg_integer()}].

-record(state, {
req_pools = [] :: [{atom(), non_neg_integer()}],
req_pools = [] :: pools(),
reqs_table :: ets:tid(),
queue = [] :: [{pid(), reference()}],
max_conns = undefined :: non_neg_integer()
max_conns = undefined :: non_neg_integer(),
proto_opts :: any(),
proto_opts_vsn = 1 :: non_neg_integer()
}).

%% API.
Expand All @@ -37,9 +42,9 @@
%% Setting the process priority to high ensures the connection-related code
%% will always be executed when a connection needs it, allowing Cowboy to
%% scale far beyond what it would with a normal priority.
-spec start_link(non_neg_integer()) -> {ok, pid()}.
start_link(MaxConns) ->
gen_server:start_link(?MODULE, [MaxConns],
-spec start_link(non_neg_integer(), any()) -> {ok, pid()}.
start_link(MaxConns, ProtoOpts) ->
gen_server:start_link(?MODULE, [MaxConns, ProtoOpts],
[{spawn_opt, [{priority, high}]}]).

%% @private
Expand All @@ -59,9 +64,15 @@ stop(ServerPid) ->
%% pool. If the socket has been sent to another process, it is up to the
%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
%% the previous and adding the new one.
-spec add_connection(pid(), atom(), pid()) -> ok.
add_connection(ServerPid, Pool, ConnPid) ->
gen_server:call(ServerPid, {add_connection, Pool, ConnPid}, infinity).
%%
%% This function also returns whether the protocol options have been modified.
%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of
%% the atom 'ok'. The acceptor can then continue with the new protocol options.
-spec add_connection(pid(), atom(), pid(), non_neg_integer())
-> ok | {upgrade, any(), non_neg_integer()}.
add_connection(ServerPid, Pool, ConnPid, OptsVsn) ->
gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn},
infinity).

%% @doc Move a connection from one pool to another.
-spec move_connection(pid(), atom(), pid()) -> ok.
Expand All @@ -73,35 +84,46 @@ move_connection(ServerPid, DestPool, ConnPid) ->
remove_connection(ServerPid, ConnPid) ->
gen_server:cast(ServerPid, {remove_connection, ConnPid}).

%% @doc Return the current protocol options.
-spec get_protocol_options(pid()) -> {ok, any()}.
get_protocol_options(ServerPid) ->
gen_server:call(ServerPid, get_protocol_options).

%% @doc Upgrade the protocol options.
-spec set_protocol_options(pid(), any()) -> ok.
set_protocol_options(ServerPid, ProtoOpts) ->
gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}).

%% gen_server.

%% @private
-spec init(list()) -> {ok, #state{}}.
init([MaxConns]) ->
ReqsTablePid = ets:new(requests_table, [set, private]),
{ok, #state{reqs_table=ReqsTablePid, max_conns=MaxConns}}.
init([MaxConns, ProtoOpts]) ->
ReqsTable = ets:new(requests_table, [set, private]),
{ok, #state{reqs_table=ReqsTable, max_conns=MaxConns,
proto_opts=ProtoOpts}}.

%% @private
-spec handle_call(_, _, State)
-> {reply, ignored, State} | {stop, normal, stopped, State}.
handle_call({add_connection, Pool, ConnPid}, From, State=#state{
handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{
req_pools=Pools, reqs_table=ReqsTable,
queue=Queue, max_conns=MaxConns}) ->
MonitorRef = erlang:monitor(process, ConnPid),
ConnPid ! {shoot, self()},
{NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
false ->
{1, [{Pool, 1}|Pools]};
{Pool, NbConns} ->
NbConns2 = NbConns + 1,
{NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
end,
ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
if NbConnsRet > MaxConns ->
{noreply, State#state{req_pools=Pools2, queue=[From|Queue]}};
queue=Queue, max_conns=MaxConns,
proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
{NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable),
State2 = State#state{req_pools=Pools2},
if AccOptsVsn =/= LisOptsVsn ->
{reply, {ugprade, ProtoOpts, LisOptsVsn}, State2};
NbConns > MaxConns ->
{noreply, State2#state{queue=[From|Queue]}};
true ->
{reply, ok, State#state{req_pools=Pools2}}
{reply, ok, State2}
end;
handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) ->
{reply, {ok, ProtoOpts}, State};
handle_call({set_protocol_options, ProtoOpts}, _From,
State=#state{proto_opts_vsn=OptsVsn}) ->
{reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}};
handle_call(stop, _From, State) ->
{stop, normal, stopped, State};
handle_call(_Request, _From, State) ->
Expand Down Expand Up @@ -147,6 +169,22 @@ code_change(_OldVsn, State, _Extra) ->

%% Internal.

%% @private
-spec add_pid(pid(), atom(), pools(), ets:tid())
-> {non_neg_integer(), pools()}.
add_pid(ConnPid, Pool, Pools, ReqsTable) ->
MonitorRef = erlang:monitor(process, ConnPid),
ConnPid ! {shoot, self()},
{NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
false ->
{1, [{Pool, 1}|Pools]};
{Pool, NbConns} ->
NbConns2 = NbConns + 1,
{NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
end,
ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
{NbConnsRet, Pools2}.

%% @private
-spec remove_pid(pid(), State) -> State.
remove_pid(Pid, State=#state{
Expand Down
2 changes: 1 addition & 1 deletion src/cowboy_listener_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
{ok, SupPid} = supervisor:start_link(?MODULE, []),
{ok, ListenerPid} = supervisor:start_child(SupPid,
{cowboy_listener, {cowboy_listener, start_link, [MaxConns]},
{cowboy_listener, {cowboy_listener, start_link, [MaxConns, ProtoOpts]},
permanent, 5000, worker, [cowboy_listener]}),
{ok, ReqsPid} = supervisor:start_child(SupPid,
{cowboy_requests_sup, {cowboy_requests_sup, start_link, []},
Expand Down

0 comments on commit e5aef5c

Please sign in to comment.