Permalink
Browse files

The PBC listener/socket handoff had a race condition with setting act…

…ive_once

before it was the controller.  Changed to add a seperate set_socket call once
the controller has been changed.
  • Loading branch information...
1 parent 05ed245 commit 0a12ca44e6d263f792791066487a9a8995fc0b7a @jonmeredith jonmeredith committed Oct 4, 2010
Showing with 18 additions and 12 deletions.
  1. +2 −1 src/riak_kv_pb_listener.erl
  2. +13 −8 src/riak_kv_pb_socket.erl
  3. +3 −3 src/riak_kv_pb_socket_sup.erl
View
3 src/riak_kv_pb_listener.erl
@@ -52,7 +52,8 @@ terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
new_connection(Socket, State) ->
- {ok, Pid} = riak_kv_pb_socket_sup:start_socket(Socket),
+ {ok, Pid} = riak_kv_pb_socket_sup:start_socket(),
ok = gen_tcp:controlling_process(Socket, Pid),
+ ok = riak_kv_pb_socket:set_socket(Pid, Socket),
{ok, State}.
View
21 src/riak_kv_pb_socket.erl
@@ -30,7 +30,7 @@
-include_lib("riakc/include/riakc_pb.hrl").
-behaviour(gen_server).
--export([start_link/1]).
+-export([start_link/0, set_socket/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -50,23 +50,28 @@
%% Public API
%% ===================================================================
-start_link(Socket) ->
- gen_server2:start_link(?MODULE, [Socket], []).
+start_link() ->
+ gen_server2:start_link(?MODULE, [], []).
-init([Socket]) ->
+set_socket(Pid, Socket) ->
+ gen_server2:call(Pid, {set_socket, Socket}).
+
+init([]) ->
riak_kv_stat:update(pbc_connect),
- inet:setopts(Socket, [{active, once}, {packet, 4}, {header, 1}]),
{ok, C} = riak:local_client(),
- {ok, #state{sock = Socket, client = C}}.
+ {ok, #state{client = C}}.
-handle_call(_Request, _From, State) ->
- {reply, not_implemented, State}.
+handle_call({set_socket, Socket}, _From, State) ->
+ inet:setopts(Socket, [{active, once}, {packet, 4}, {header, 1}]),
+ {reply, ok, State#state{sock = Socket}}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({tcp_closed, Socket}, State=#state{sock=Socket}) ->
{stop, normal, State};
+handle_info({tcp_error, Socket, _Reason}, State=#state{sock=Socket}) ->
+ {stop, normal, State};
handle_info({tcp, _Sock, Data}, State=#state{sock=Socket}) ->
[MsgCode|MsgData] = Data,
Msg = riakc_pb:decode(MsgCode, MsgData),
View
6 src/riak_kv_pb_socket_sup.erl
@@ -25,10 +25,10 @@
-module(riak_kv_pb_socket_sup).
-behaviour(supervisor).
-export([start_link/0, init/1, stop/1]).
--export([start_socket/1]).
+-export([start_socket/0]).
-start_socket(Socket) ->
- supervisor:start_child(?MODULE, [Socket]).
+start_socket() ->
+ supervisor:start_child(?MODULE, []).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

0 comments on commit 0a12ca4

Please sign in to comment.