Skip to content

Commit

Permalink
Do setq async.
Browse files Browse the repository at this point in the history
This is like, too async.  NOOP doesn't do the right thing here and
errors are lost.

Change-Id: I50fdc5b0fde3b29105af314d52d73e69cce254a0
  • Loading branch information
dustin committed Jun 13, 2011
1 parent bc1af47 commit a31c27f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
7 changes: 7 additions & 0 deletions src/mc_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ process_message(Socket, StorageServer, {ok, <<?REQ_MAGIC:8, ?STAT:8, KeyLen:16,

% Hand the request off to the server.
gen_server:cast(StorageServer, {?STAT, Extra, Key, Body, CAS, Socket, Opaque});
process_message(Socket, StorageServer, {ok, <<?REQ_MAGIC:8, ?SETQ:8, KeyLen:16,
ExtraLen:8, 0:8, VBucket:16,
BodyLen:32,
Opaque:32,
CAS:64>>}) ->
{Extra, Key, Body} = read_message(Socket, KeyLen, ExtraLen, BodyLen),
gen_server:cast(StorageServer, {?SETQ, VBucket, Extra, Key, Body, CAS, Socket, Opaque});
process_message(Socket, StorageServer, {ok, <<?REQ_MAGIC:8, OpCode:8, KeyLen:16,
ExtraLen:8, 0:8, VBucket:16,
BodyLen:32,
Expand Down
42 changes: 31 additions & 11 deletions src/mc_daemon.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
-include("couch_db.hrl").
-include("mc_constants.hrl").

-record(state, {mc_serv, db, json_mode}).
-record(state, {mc_serv, db, json_mode, setqs=0}).

start_link(DbName, JsonMode) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [DbName, JsonMode],
Expand Down Expand Up @@ -55,6 +55,28 @@ handle_set_call(Db, Key, Flags, Expiration, Value, JsonMode) ->
JsonMode),
#mc_response{cas=NewCas}.

handle_setq_call(VBucket, Key, Flags, Expiration, Value, _CAS, Opaque, Socket, State) ->
spawn_link(fun() ->
with_open_db(fun(Db) ->
case catch(mc_couch_kv:set(Db, Key,
Flags,
Expiration,
Value,
State#state.json_mode)) of
{ok, _} -> ok;
_Error ->
%% TODO: You heard the comment
do_something_about_this
end,
gen_server:cast(?MODULE, {setq_complete,
Opaque, %% opaque
Socket,
0})
end, VBucket, State)
end),
%% TODO: Put the actual opaque here
State#state{setqs=State#state.setqs + 1}.

handle_delete_call(Db, Key) ->
case mc_couch_kv:delete(Db, Key) of
ok -> #mc_response{};
Expand Down Expand Up @@ -83,16 +105,6 @@ handle_call({?SET, VBucket, <<Flags:32, Expiration:32>>, Key, Value, _CAS},
end, VBucket, State);
handle_call({?SET, _, _, _, _, _}, _From, State) ->
{reply, #mc_response{status=?EINVAL}, State};
handle_call({?SETQ, VBucket, <<Flags:32, Expiration:32>>, Key, Value, _CAS},
_From, State) ->
with_open_db(fun(Db) ->
handle_set_call(Db, Key, Flags,
Expiration, Value,
State#state.json_mode),
{reply, quiet, State}
end, VBucket, State);
handle_call({?SETQ, _, _, _, _, _}, _From, State) ->
{reply, #mc_response{status=?EINVAL}, State};
handle_call({?NOOP, _, _, _, _, _}, _From, State) ->
{reply, #mc_response{}, State};
handle_call({?DELETE, VBucket, <<>>, Key, <<>>, _CAS}, _From, State) ->
Expand Down Expand Up @@ -135,6 +147,14 @@ handle_cast({?TAP_CONNECT, Extra, _Key, Body, _CAS, Socket, Opaque}, State) ->
handle_cast({?STAT, _Extra, _Key, _Body, _CAS, Socket, Opaque}, State) ->
mc_couch_stats:stats(Socket, Opaque),
{noreply, State};
handle_cast({?SETQ, VBucket, <<Flags:32, Expiration:32>>, Key, Value,
CAS, Socket, Opaque}, State) ->
{noreply, handle_setq_call(VBucket, Key, Flags, Expiration, Value,
CAS, Opaque, Socket, State)};
%% non-protocol below
handle_cast({setq_complete, Opaque, _Socket, _Status}, State) ->
?LOG_INFO("Completed a setq: ~p, now have ~p", [Opaque, State#state.setqs - 1]),
{noreply, State#state{setqs=State#state.setqs - 1}};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
Expand Down

0 comments on commit a31c27f

Please sign in to comment.