Permalink
Browse files

Force each worker to write items into a single vbucket database

This change allows each writer to write items into a single
vbucket database only by spawning a new worker whenever an item
with the new vbucket Id is received. Consequently, there are no
more than one writer for a given database at anytime. In addition,
batch-related parameters (i.e., batch size and number of workers)
are removed from the config file and now configured and controlled
by ep-engine.

Change-Id: I6c38d7d872699ddf6c4abfad1628256a27709ecc
Reviewed-on: http://review.couchbase.org/10567
Reviewed-by: Steve Yen <steve.yen@gmail.com>
Tested-by: Chiyoung Seo <chiyoung.seo@gmail.com>
  • Loading branch information...
1 parent 34514d9 commit b605aa1713d2ed26189b9c4f0acd9f94fbf767a7 @chiyoung chiyoung committed Nov 1, 2011
Showing with 114 additions and 142 deletions.
  1. +0 −5 etc/couchdb/local.d/mccouch.ini
  2. +3 −0 include/mc_constants.hrl
  3. +59 −63 src/mc_batch_sup.erl
  4. +10 −0 src/mc_connection.erl
  5. +42 −74 src/mc_daemon.erl
@@ -6,10 +6,5 @@ mc_daemon={mc_sup, start_link, [11213]}
max_dbs_open=10000
[mc_couch]
-; Number of workers to concurrently store data
-write_workers = 8
-; write out batches as soon as they get this size (or when an explicit
-; request comes in)
-write_worker_batch_size = 1000
optimistic_writes = true
presorted = true
View
@@ -56,6 +56,9 @@
-define(DELETE_VBUCKET, 16#3f).
-define(SNAPSHOT_VB_STATES, 16#50).
+%% vbucket batch stuff
+-define(VBUCKET_BATCH_COUNT, 16#51).
+
%% VBucket States
-define(VB_STATE_ACTIVE, 16#1).
View
@@ -2,10 +2,10 @@
-behaviour(supervisor).
--export([start_link/0, start_worker/4]).
+-export([start_link/0, start_worker/5]).
%% Internal junk that needs to be exported
--export([start_link_worker/3, sync_update_docs/3]).
+-export([start_link_worker/4, sync_update_docs/4]).
%% Supervisor callbacks
-export([init/1]).
@@ -24,68 +24,64 @@ init([]) ->
temporary, 3600000, worker, []}]}}.
%% {ok, Ref}
-start_worker(Sup, Batch, BucketName, Socket) ->
- {ok, Pid} = supervisor:start_child(Sup, [Batch, BucketName, Socket]),
+start_worker(Sup, CurrentVBucket, CurrentList, BucketName, Socket) ->
+ {ok, Pid} = supervisor:start_child(Sup, [CurrentVBucket, CurrentList, BucketName, Socket]),
Ref = monitor(process, Pid),
{ok, Ref}.
-start_link_worker(Batch, BucketName, Socket) ->
- {ok, proc_lib:spawn_link(?MODULE, sync_update_docs, [Batch, BucketName, Socket])}.
+start_link_worker(CurrentVBucket, CurrentList, BucketName, Socket) ->
+ {ok, proc_lib:spawn_link(?MODULE, sync_update_docs,
+ [CurrentVBucket, CurrentList, BucketName, Socket])}.
-sync_update_docs(Batch, BucketName, Socket) ->
- UpdateOptions = [clobber, return_errors_only] ++ case couch_config:get("mc_couch", "optimistic_writes", "true") of
- "true" ->
- [optimistic];
- _ ->
- []
- end ++ case couch_config:get("mc_couch", "presorted", "true") of
- "true" ->
- [presorted];
- _ ->
- []
- end,
- dict:fold(
- fun(VBucketId, Jobs, _Acc) ->
- Docs = lists:map(
- fun({Opaque, Op, {set, Key, Flags, Expiration, Value, JsonMode}}) ->
- {Opaque, Op,
- mc_couch_kv:mk_doc(Key, Flags, Expiration, Value, JsonMode)};
- ({Opaque, Op, {set, Key, Flags, Expiration, Value, MetaData, JsonMode}}) ->
- {Opaque, Op,
- mc_couch_kv:mk_doc(Key, Flags, Expiration, Value, MetaData, JsonMode)};
- ({Opaque, Op, {delete, Key}}) ->
- {Opaque, Op, #doc{id = Key, deleted = true, body = {[]}}}
- end, Jobs),
- DbName = iolist_to_binary([<<BucketName/binary, $/>>,
- integer_to_list(VBucketId)]),
- case couch_db:open_int(DbName, []) of
- {ok, Db} ->
- {ok, Results} = couch_db:update_docs(
- Db, [Doc || {_Opaque, _Op, Doc} <- Docs], UpdateOptions),
- lists:foreach(
- fun({Id, Error}) ->
- [{Opaque, Op}] = [{Opaque1, Op1} || {Opaque1, Op1, #doc{id=Id1}} <- Docs, Id == Id1],
- ErrorResp = #mc_response{
- status = ?EINTERNAL,
- body = io_lib:format(
- "Error persisting key ~s in database ~s: ~p",
- [Id, DbName, Error])
- },
- mc_connection:respond(Socket, Op, Opaque, ErrorResp)
- end,
- Results),
- couch_db:close(Db);
- Error ->
- ErrorResp = #mc_response{
- status = ?EINVAL,
- body = io_lib:format("Error opening database ~s: ~s",
- [DbName, couch_util:to_binary(Error)])
- },
- lists:foreach(
- fun({Opaque, Op, _Doc}) ->
- mc_connection:respond(Socket, Op, Opaque, ErrorResp)
- end,
- Docs)
- end
- end,
- [], Batch).
+sync_update_docs(CurrentVBucket, CurrentList, BucketName, Socket) ->
+ UpdateOptions =
+ [clobber, return_errors_only] ++
+ case couch_config:get("mc_couch", "optimistic_writes", "true") of
+ "true" ->
+ [optimistic];
+ _ ->
+ []
+ end ++
+ case couch_config:get("mc_couch", "presorted", "true") of
+ "true" ->
+ [presorted];
+ _ ->
+ []
+ end,
+ Docs = lists:map(
+ fun({Opaque, Op, {set, Key, Flags, Expiration, Value, JsonMode}}) ->
+ {Opaque, Op, mc_couch_kv:mk_doc(Key, Flags, Expiration, Value, JsonMode)};
+ ({Opaque, Op, {set, Key, Flags, Expiration, Value, MetaData, JsonMode}}) ->
+ {Opaque, Op,mc_couch_kv:mk_doc(Key, Flags, Expiration, Value, MetaData, JsonMode)};
+ ({Opaque, Op, {delete, Key}}) ->
+ {Opaque, Op, #doc{id = Key, deleted = true, body = {[]}}}
+ end, CurrentList),
+ DbName = iolist_to_binary([<<BucketName/binary, $/>>, integer_to_list(CurrentVBucket)]),
+ case couch_db:open_int(DbName, []) of
+ {ok, Db} ->
+ {ok, Results} = couch_db:update_docs(Db, [Doc || {_Opaque, _Op, Doc} <- Docs],
+ UpdateOptions),
+ lists:foreach(
+ fun({Id, Error}) ->
+ [{Opaque, Op}] = [{Opaque1, Op1} ||
+ {Opaque1, Op1, #doc{id=Id1}} <- Docs, Id == Id1],
+ ErrorResp = #mc_response{
+ status = ?EINTERNAL,
+ body = io_lib:format(
+ "Error persisting key ~s in database ~s: ~p",
+ [Id, DbName, Error])
+ },
+ mc_connection:respond(Socket, Op, Opaque, ErrorResp)
+ end, Results),
+ couch_db:close(Db);
+ Error ->
+ ErrorResp = #mc_response{
+ status = ?EINVAL,
+ body = io_lib:format("Error opening database ~s: ~s",
+ [DbName, couch_util:to_binary(Error)])
+ },
+ lists:foreach(
+ fun({Opaque, Op, _Doc}) ->
+ mc_connection:respond(Socket, Op, Opaque, ErrorResp)
+ end, Docs)
+ end.
View
@@ -117,6 +117,16 @@ process_message(Socket, StorageServer, <<?REQ_MAGIC:8, ?SNAPSHOT_VB_STATES:8,
gen_fsm:sync_send_all_state_event(StorageServer,
{?SNAPSHOT_VB_STATES, Body, BodyLen},
infinity));
+process_message(Socket, StorageServer, <<?REQ_MAGIC:8, ?VBUCKET_BATCH_COUNT:8,
+ KeyLen:16, ExtraLen:8, 0:8, _VBucket:16,
+ BodyLen:32,
+ Opaque:32,
+ _CAS:64>>) ->
+ {_Extra, _Key, Body} = read_message(Socket, KeyLen, ExtraLen, BodyLen),
+ respond(Socket, ?VBUCKET_BATCH_COUNT, Opaque,
+ gen_fsm:sync_send_all_state_event(StorageServer,
+ {?VBUCKET_BATCH_COUNT, Body},
+ infinity));
process_message(Socket, StorageServer, <<?REQ_MAGIC:8, OpCode:8, KeyLen:16,
ExtraLen:8, 0:8, VBucket:16,
BodyLen:32,
View
@@ -26,12 +26,10 @@
batch_ops = 0,
terminal_opaque = nil,
errors = [],
- worker_batch_size,
+ next_vb_batch,
current_vbucket,
current_vbucket_list,
- whole_batch,
- batch_size = 0,
- max_workers,
+ max_workers = 4,
worker_sup,
caller = nil,
worker_refs = [],
@@ -42,15 +40,9 @@ start_link(Socket) ->
gen_fsm:start_link(?MODULE, Socket, []).
init(Socket) ->
- WorkerBatchSize = list_to_integer(
- couch_config:get("mc_couch", "write_worker_batch_size", "100")),
- MaxWorkers = list_to_integer(
- couch_config:get("mc_couch", "write_workers", "8")),
{ok, WorkerSup} = mc_batch_sup:start_link(),
{ok, processing, #state{db = <<"default">>,
socket = Socket,
- worker_batch_size = WorkerBatchSize,
- max_workers = MaxWorkers,
worker_sup = WorkerSup
}}.
@@ -123,8 +115,6 @@ create_async_batch(State, VBucket, Opaque, Op, Job) ->
State#state{
current_vbucket = VBucket,
current_vbucket_list = [{Opaque, Op, Job}],
- whole_batch = dict:new(),
- batch_size = 1,
caller = nil,
terminal_opaque = nil
}.
@@ -234,45 +224,27 @@ add_async_job(State, From, VBucket, Opaque, Op, Job) ->
#state{
current_vbucket = CurrentVBucket,
current_vbucket_list = CurrentList,
- whole_batch = Batch, batch_size = BatchSize,
- worker_batch_size = WorkerBatchSize,
max_workers = MaxWorkers
} = State,
- case (BatchSize < WorkerBatchSize) orelse (num_workers(State) < MaxWorkers) of
- true ->
- gen_fsm:reply(From, ok);
- false ->
- ok
- end,
+
if VBucket == CurrentVBucket ->
+ gen_fsm:reply(From, ok),
CurrentList2 = [{Opaque, Op, Job} | CurrentList],
- CurrentVBucket2 = CurrentVBucket,
- Batch2 = Batch;
+ NewState = State#state{current_vbucket_list = CurrentList2},
+ NewState;
true ->
- CurrentList2 = [{Opaque, Op, Job}],
- CurrentVBucket2 = VBucket,
- case CurrentList of
- [] ->
- Batch2 = Batch;
- _ ->
- Batch2 = dict:append_list(CurrentVBucket,
- CurrentList, Batch)
+ State2 = State#state{
+ next_vb_batch = {CurrentVBucket, CurrentList},
+ current_vbucket = VBucket,
+ current_vbucket_list = [{Opaque, Op, Job}]
+ },
+ case (num_workers(State) >= MaxWorkers) of
+ true ->
+ State2#state{caller = From};
+ false ->
+ gen_fsm:reply(From, ok),
+ maybe_start_worker(State2)
end
- end,
- BatchSize2 = BatchSize + 1,
- State2 = State#state{whole_batch = Batch2, batch_size = BatchSize2,
- current_vbucket = CurrentVBucket2,
- current_vbucket_list = CurrentList2},
- case BatchSize2 >= WorkerBatchSize of
- true ->
- case (num_workers(State) >= MaxWorkers) of
- true ->
- State2#state{caller = From};
- false ->
- maybe_start_worker(State2)
- end;
- false ->
- State2
end.
batching({?SETQ = Op, VBucket, <<Flags:32, Expiration:32>>, Key, Value,
@@ -300,30 +272,18 @@ batching({?DELETEQ = Op, VBucket, <<>>, Key, <<>>, _CAS, Opaque}, From, State) -
batching({?NOOP, Opaque}, From, State) ->
#state{
- whole_batch = Batch, batch_size = BatchSize, socket = Socket,
+ socket = Socket,
current_vbucket = CurrentVBucket, current_vbucket_list = CurrentList
} = State,
- case BatchSize > 0 of
- true ->
- case CurrentList of
- [] ->
- Batch2 = Batch;
- _ ->
- Batch2 = dict:append_list(CurrentVBucket,
- CurrentList, Batch)
- end,
- mc_batch_sup:sync_update_docs(Batch2, State#state.db, Socket);
- false ->
- ok
- end,
+ mc_batch_sup:sync_update_docs(CurrentVBucket, CurrentList, State#state.db, Socket),
+ State2 = State#state{current_vbucket = 0, current_vbucket_list = []},
case num_workers(State) of
0 ->
mc_connection:respond(Socket, ?NOOP, Opaque, #mc_response{}),
gen_fsm:reply(From, ok),
- {next_state, processing, State};
+ {next_state, processing, State2};
_ ->
- NewState = State#state{terminal_opaque = Opaque, caller = From},
- {next_state, batch_ending, NewState}
+ {next_state, batch_ending, State2#state{terminal_opaque = Opaque, caller = From}}
end.
%% Everything else
@@ -352,28 +312,36 @@ handle_sync_event({?SNAPSHOT_VB_STATES, Body, BodyLen},
[byte_size(Body), BodyLen]),
{reply, #mc_response{status=?EINVAL}, processing, State}
end;
+handle_sync_event({?VBUCKET_BATCH_COUNT, <<>>} = Msg, _From, StateName, State) ->
+ ?LOG_INFO("Error: Missing batch count value in VBUCKET_BATCH_COUNT command: ~p", [Msg]),
+ {reply, #mc_response{status=?EINVAL}, StateName, State};
+handle_sync_event({?VBUCKET_BATCH_COUNT, <<BatchCounter:32>>},
+ _From, StateName, State) ->
+ case BatchCounter =< 0 of
+ true ->
+ ?LOG_INFO("Error: Invalid batch count value in VBUCKET_BATCH_COUNT command: ~p",
+ [BatchCounter]),
+ {reply, #mc_response{status=?EINVAL}, StateName, State};
+ false ->
+ NewState = State#state{max_workers=BatchCounter},
+ {reply, #mc_response{}, StateName, NewState}
+ end;
handle_sync_event(_Event, _From, StateName, State) ->
Reply = ok,
{reply, Reply, StateName, State}.
-maybe_start_worker(#state{batch_size = 0} = State) ->
- State;
maybe_start_worker(State) ->
#state{
- worker_sup = WorkerSup, whole_batch = Batch, socket = Socket,
- worker_refs = WorkerRefs, current_vbucket = CurrentVBucket,
- current_vbucket_list = CurrentList
+ worker_sup = WorkerSup, socket = Socket,
+ worker_refs = WorkerRefs,
+ next_vb_batch = {VBucket, ItemList}
} = State,
- Batch2 = dict:append_list(CurrentVBucket, CurrentList, Batch),
- {ok, WorkerRef} = mc_batch_sup:start_worker(WorkerSup, Batch2,
+ {ok, WorkerRef} = mc_batch_sup:start_worker(WorkerSup, VBucket, ItemList,
State#state.db, Socket),
State#state{
- current_vbucket = 0,
- current_vbucket_list = [],
- whole_batch = dict:new(),
- batch_size = 0,
+ next_vb_batch = {},
worker_refs = [WorkerRef|WorkerRefs]
- }.
+ }.
handle_info({'DOWN', Ref, process, _Pid, normal}, batching, State) ->
#state{caller = From, worker_refs = WorkerRefs} = State,

0 comments on commit b605aa1

Please sign in to comment.