Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

connection pools, breaks api

  • Loading branch information...
commit 5e897ada8bac7c1f04389db2a08ffd94e85b5f97 1 parent 0d21a72
@SergejJurecko authored
Showing with 300 additions and 172 deletions.
  1. +7 −4 README.rdoc
  2. +1 −1  erlmongo.hrl
  3. +31 −32 mongoapi.erl
  4. +261 −135 mongodb.erl
View
11 README.rdoc
@@ -22,11 +22,11 @@ I haven't used erlmongo in production yet, so all the bugs might not be ironed o
erl
rr("erlmongo.hrl").
application:start(erlmongo).
- % Set mongodb server info. singleServer() is the same as singleServer("localhost:27017")
- mongodb:singleServer().
- mongodb:connect().
+ % Set mongodb server info. singleServer(PoolName) is the same as singleServer(PoolName,"localhost:27017")
+ mongodb:singleServer(def).
+ mongodb:connect(def).
% Create an interface for test database (it has to be a binary)
- Mong = mongoapi:new(<<"test">>).
+ Mong = mongoapi:new(def,<<"test">>).
% Save a new document
Mong:save(#mydoc{name = "MyDocument", i = 10}).
@@ -102,6 +102,7 @@ Collections
* count
* dropCollection
* createCollection
+* group
Search
* find
@@ -110,6 +111,8 @@ Search
* findOne
DB
+* eval
+* stats
* runCmd
* repairDatabase
* cloneDatabase
View
2  erlmongo.hrl
@@ -17,7 +17,7 @@
record_info(fields, gfs_file),
record_info(fields, gfs_chunk)}).
--record(gfs_state,{proc, db, file, collection, length = 0, mode,
+-record(gfs_state,{pool,proc, db, file, collection, length = 0, mode,
nchunk = 0, flush_limit = 1048576, closed = false}).
-export([rec2prop/2, prop2rec/4]).
View
63 mongoapi.erl
@@ -1,4 +1,4 @@
--module(mongoapi, [DB]).
+-module(mongoapi, [Pool,DB]).
% -export([save/1,findOne/2,findOne/1,find/1,find/2,find/3,find/4, update/2, insert/1]).
-compile(export_all).
-include_lib("erlmongo.hrl").
@@ -25,9 +25,9 @@ name(Collection) when is_atom(Collection) ->
% Example: remove(#mydoc{},[{#mydoc.docid,Id}])
remove(Rec, Selector) when is_tuple(Rec) ->
- mongodb:exec_delete(name(element(1,Rec)), #delete{selector = mongodb:encoderec_selector(Rec, Selector)});
+ mongodb:exec_delete(Pool,name(element(1,Rec)), #delete{selector = mongodb:encoderec_selector(Rec, Selector)});
remove(Col, Selector) ->
- mongodb:exec_delete(name(Col), #delete{selector = mongodb:encode(Selector)}).
+ mongodb:exec_delete(Pool,name(Col), #delete{selector = mongodb:encode(Selector)}).
save(Collection, [_|_] = L) ->
@@ -35,14 +35,14 @@ save(Collection, [_|_] = L) ->
case lists:keysearch(<<"_id">>, 1, L) of
false ->
OID = mongodb:create_id(),
- case mongodb:exec_insert(name(Collection), #insert{documents = mongodb:encode([{<<"_id">>, {oid, OID}}|L])}) of
+ case mongodb:exec_insert(Pool,name(Collection), #insert{documents = mongodb:encode([{<<"_id">>, {oid, OID}}|L])}) of
ok ->
{oid, OID};
R ->
R
end;
{value, {_, OID}} ->
- case mongodb:exec_update(name(Collection), #update{selector = mongodb:encode([{<<"_id">>, OID}]), document = mongodb:encode(L)}) of
+ case mongodb:exec_update(Pool,name(Collection), #update{selector = mongodb:encode([{<<"_id">>, OID}]), document = mongodb:encode(L)}) of
ok ->
OID;
R ->
@@ -54,14 +54,14 @@ save(Collection, Rec) ->
case element(Offset, Rec) of
undefined ->
OID = mongodb:create_id(),
- case mongodb:exec_insert(name(Collection), #insert{documents = mongodb:encoderec(setelement(Offset, Rec, {oid, OID}))}) of
+ case mongodb:exec_insert(Pool,name(Collection), #insert{documents = mongodb:encoderec(setelement(Offset, Rec, {oid, OID}))}) of
ok ->
{oid, OID};
R ->
R
end;
OID ->
- case mongodb:exec_update(name(Collection),
+ case mongodb:exec_update(Pool,name(Collection),
#update{selector = mongodb:encode([{<<"_id">>, OID}]), document = mongodb:encoderec(Rec)}) of
ok ->
OID;
@@ -82,11 +82,11 @@ save(Rec) ->
% modifier list: inc, set, push, pushAll, pop, pull, pullAll
% Flags can be: [upsert,multi]
update(Selector, Rec, Flags) ->
- mongodb:exec_update(name(element(1,Rec)), #update{selector = mongodb:encoderec_selector(Rec, Selector),
+ mongodb:exec_update(Pool,name(element(1,Rec)), #update{selector = mongodb:encoderec_selector(Rec, Selector),
upsert = updateflags(Flags,0),
document = mongodb:encoderec(Rec)}).
update(Collection, [_|_] = Selector, [_|_] = Doc, Flags) ->
- mongodb:exec_update(name(Collection), #update{selector = mongodb:encode(Selector), document = mongodb:encode(Doc),
+ mongodb:exec_update(Pool,name(Collection), #update{selector = mongodb:encode(Selector), document = mongodb:encode(Doc),
upsert = updateflags(Flags,0)}).
% batchUpdate is not like batchInsert in that everything is one mongo command. With batchUpdate every document becomes
% a new mongodb command, but they are all encoded and sent at once. So the communication and encoding overhead is smaller.
@@ -94,11 +94,11 @@ update(Collection, [_|_] = Selector, [_|_] = Doc, Flags) ->
% - All documents need to be in the same collection
batchUpdate(Sels,Recs,Flags) ->
[R|_] = Recs,
- mongodb:exec_update(name(element(1,R)), encbu([], Sels,Recs,updateflags(Flags,0))).
+ mongodb:exec_update(Pool,name(element(1,R)), encbu([], Sels,Recs,updateflags(Flags,0))).
% Selector and doc are lists of document lists
batchUpdate(Col, [_|_] = Selector, [_|_] = Doc, Flags) ->
- mongodb:exec_update(name(Col),encbu([],Selector,Doc,updateflags(Flags,0))).
+ mongodb:exec_update(Pool,name(Col),encbu([],Selector,Doc,updateflags(Flags,0))).
encbu(L, [Sel|ST],[[_|_] = Doc|DT],Flags) ->
encbu([#update{selector = mongodb:encode(Sel), document = mongodb:encode(Doc), upsert = Flags}|L],ST,DT,Flags);
@@ -115,17 +115,17 @@ updateflags([], V) ->
V.
insert(Col, [_|_] = L) ->
- mongodb:exec_insert(name(Col), #insert{documents = mongodb:encode(L)}).
+ mongodb:exec_insert(Pool,name(Col), #insert{documents = mongodb:encode(L)}).
insert(Rec) ->
- mongodb:exec_insert(name(element(1,Rec)), #insert{documents = mongodb:encoderec(Rec)}).
+ mongodb:exec_insert(Pool,name(element(1,Rec)), #insert{documents = mongodb:encoderec(Rec)}).
batchInsert(Col, [[_|_]|_] = LRecs) ->
DocBin = lists:foldl(fun(L, Bin) -> <<Bin/binary, (mongodb:encode(L))/binary>> end, <<>>, LRecs),
- mongodb:exec_insert(name(Col), #insert{documents = DocBin}).
+ mongodb:exec_insert(Pool,name(Col), #insert{documents = DocBin}).
batchInsert(LRecs) ->
[FRec|_] = LRecs,
DocBin = lists:foldl(fun(Rec, Bin) -> <<Bin/binary, (mongodb:encoderec(Rec))/binary>> end, <<>>, LRecs),
- mongodb:exec_insert(name(element(1,FRec)), #insert{documents = DocBin}).
+ mongodb:exec_insert(Pool,name(element(1,FRec)), #insert{documents = DocBin}).
% Advanced queries:
@@ -187,7 +187,7 @@ find(#search{} = Q) ->
find(Col, Query, Selector, From, Limit) when is_list(Query) ->
Quer = #search{ndocs = Limit, nskip = From, criteria = mongodb:encode(Query), field_selector = mongodb:encode(Selector)},
- case mongodb:exec_find(name(Col), Quer) of
+ case mongodb:exec_find(Pool,name(Col), Quer) of
not_connected ->
not_connected;
<<>> ->
@@ -197,7 +197,7 @@ find(Col, Query, Selector, From, Limit) when is_list(Query) ->
end;
find(Col, Query, Selector, From, Limit) ->
Quer = #search{ndocs = Limit, nskip = From, criteria = mongodb:encode_findrec(Query), field_selector = mongodb:encoderec_selector(Query, Selector)},
- case mongodb:exec_find(name(Col), Quer) of
+ case mongodb:exec_find(Pool,name(Col), Quer) of
not_connected ->
not_connected;
<<>> ->
@@ -225,7 +225,7 @@ findOpt(Col, Query, Selector, Opts, From, Limit) when is_list(Query) ->
findOpt(Col, Query, Selector, Opts, From, Limit) ->
Quer = #search{ndocs = Limit, nskip = From, field_selector = mongodb:encoderec_selector(Query, Selector),
criteria = mongodb:encode(translateopts(Query, Opts,[{<<"query">>, {bson, mongodb:encode_findrec(Query)}}]))},
- case mongodb:exec_find(name(Col), Quer) of
+ case mongodb:exec_find(Pool,name(Col), Quer) of
not_connected ->
not_connected;
<<>> ->
@@ -251,7 +251,7 @@ cursor(Query, Selector, Opts, From, Limit) ->
Quer = #search{ndocs = Limit, nskip = From, field_selector = mongodb:encoderec_selector(Query, Selector),
criteria = mongodb:encode(translateopts(Query, Opts,[{<<"query">>, {bson, mongodb:encode_findrec(Query)}}])),
opts = ?QUER_OPT_CURSOR},
- case mongodb:exec_cursor(name(element(1,Query)), Quer) of
+ case mongodb:exec_cursor(Pool,name(element(1,Query)), Quer) of
not_connected ->
not_connected;
{done, <<>>} ->
@@ -262,7 +262,7 @@ cursor(Query, Selector, Opts, From, Limit) ->
{ok, Cursor, mongodb:decoderec(Query, Result)}
end.
getMore(Rec, Cursor) ->
- case mongodb:exec_getmore(name(element(1,Rec)), Cursor) of
+ case mongodb:exec_getmore(Pool,name(element(1,Rec)), Cursor) of
not_connected ->
not_connected;
{done, <<>>} ->
@@ -306,23 +306,23 @@ ensureIndex(<<_/binary>> = Collection, Keys) ->
Bin = mongodb:encode([{plaintext, <<"name">>, mongodb:gen_prop_keyname(Keys, <<>>)},
{plaintext, <<"ns">>, name(Collection)},
{<<"key">>, {bson, mongodb:encode(Keys)}}]),
- mongodb:ensureIndex(DB, Bin);
+ mongodb:ensureIndex(Pool,DB, Bin);
% Example: ensureIndex(#mydoc{}, [{#mydoc.name, 1}])
ensureIndex(Rec, Keys) ->
Bin = mongodb:encode([{plaintext, <<"name">>, mongodb:gen_keyname(Rec, Keys)},
{plaintext, <<"ns">>, name(element(1,Rec))},
{<<"key">>, {bson, mongodb:encoderec_selector(Rec, Keys)}}]),
- mongodb:ensureIndex(DB, Bin).
+ mongodb:ensureIndex(Pool,DB, Bin).
deleteIndexes([_|_] = Collection) ->
deleteIndexes(list_to_binary(Collection));
deleteIndexes(<<_/binary>> = Collection) ->
mongodb:clearIndexCache(),
- mongodb:exec_cmd(DB, [{plaintext, <<"deleteIndexes">>, Collection}, {plaintext, <<"index">>, <<"*">>}]).
+ mongodb:exec_cmd(Pool,DB, [{plaintext, <<"deleteIndexes">>, Collection}, {plaintext, <<"index">>, <<"*">>}]).
deleteIndex(Rec, Key) ->
mongodb:clearIndexCache(),
- mongodb:exec_cmd(DB,[{plaintext, <<"deleteIndexes">>, atom_to_binary(element(1,Rec), latin1)},
+ mongodb:exec_cmd(Pool,DB,[{plaintext, <<"deleteIndexes">>, atom_to_binary(element(1,Rec), latin1)},
{plaintext, <<"index">>, mongodb:gen_keyname(Rec,Key)}]).
% How many documents in mydoc collection: Mong:count("mydoc").
@@ -349,7 +349,7 @@ count(ColIn, Query) ->
_ when Query == undefined ->
Cmd = [{plaintext, <<"count">>, Col}, {plaintext, <<"ns">>, DB}]
end,
- case mongodb:exec_cmd(DB, Cmd) of
+ case mongodb:exec_cmd(Pool,DB, Cmd) of
[{<<"n">>, Val}|_] ->
round(Val);
_ ->
@@ -387,7 +387,7 @@ eval(Code) ->
runCmd({_,_} = T) ->
runCmd([T]);
runCmd([{_,_}|_] = L) ->
- mongodb:exec_cmd(DB, L);
+ mongodb:exec_cmd(Pool,DB, L);
runCmd([_|_] = L) ->
runCmd([{L,1}]);
runCmd(<<_/binary>> = L) ->
@@ -451,10 +451,9 @@ gfsNew(Filename, Opts) ->
gfsNew([_|_] = Collection, Filename, Opts) ->
gfsNew(list_to_binary(Collection), Filename, Opts);
gfsNew(<<_/binary>> = Collection, Filename, Opts) ->
- mongodb:startgfs(gfsopts(Opts,#gfs_state{file = #gfs_file{filename = Filename, length = 0, chunkSize = 262144,
+ mongodb:startgfs(gfsopts(Opts,#gfs_state{pool = Pool,file = #gfs_file{filename = Filename, length = 0, chunkSize = 262144,
docid = mongodb:create_id(), uploadDate = now()},
collection = name(Collection), db = DB, mode = write})).
- % Name = name(<<Collection/binary, ".file">>).
gfsopts([{meta, Rec}|T], S) ->
gfsopts(T, S#gfs_state{file = (S#gfs_state.file)#gfs_file{metadata = Rec}});
@@ -490,18 +489,18 @@ gfsOpen(Collection, R) ->
case R#gfs_file.docid of
undefined ->
Quer = #search{ndocs = 1, nskip = 0, criteria = mongodb:encode_findrec(R)},
- case mongodb:exec_find(name(<<Collection/binary, ".files">>), Quer) of
+ case mongodb:exec_find(Pool,name(<<Collection/binary, ".files">>), Quer) of
not_connected ->
not_connected;
<<>> ->
[];
Result ->
[DR] = mongodb:decoderec(R, Result),
- gfsOpen(DR)
+ gfsOpen(Pool,DR)
end;
_ ->
% R
- mongodb:startgfs(#gfs_state{file = R, collection = name(Collection), db = DB, mode = read})
+ mongodb:startgfs(#gfs_state{pool = Pool,file = R, collection = name(Collection), db = DB, mode = read})
end.
gfsRead(PID, N) ->
@@ -521,7 +520,7 @@ gfsDelete(Collection, R) ->
case R#gfs_file.docid of
undefined ->
Quer = #search{ndocs = 1, nskip = 0, criteria = mongodb:encode_findrec(R)},
- case mongodb:exec_find(name(<<Collection/binary, ".files">>), Quer) of
+ case mongodb:exec_find(Pool,name(<<Collection/binary, ".files">>), Quer) of
not_connected ->
not_connected;
<<>> ->
View
396 mongodb.erl
@@ -2,12 +2,12 @@
-export([deser_prop/1,reload/0, print_info/0, start/0, stop/0, init/1, handle_call/3,
handle_cast/2, handle_info/2, terminate/2, code_change/3]).
% API
--export([connect/0, connect/1, is_connected/0, singleServer/1, singleServer/0, masterSlave/2,masterMaster/2, replicaPairs/2,
+-export([connect/1, connect/2, is_connected/1, singleServer/2, singleServer/1, masterSlave/3,masterMaster/3, replicaPairs/3,
datetime_to_now/1]).
% Internal
--export([exec_cursor/2, exec_delete/2, exec_cmd/2, exec_insert/2, exec_find/2, exec_update/2, exec_getmore/2,
+-export([exec_cursor/3, exec_delete/3, exec_cmd/3, exec_insert/3, exec_find/3, exec_update/3, exec_getmore/3,
encoderec/1, encode_findrec/1, encoderec_selector/2, gen_keyname/2, gen_prop_keyname/2, rec/0, recoffset/1, recfields/1,
- decoderec/2, encode/1, decode/1, ensureIndex/2, clearIndexCache/0, create_id/0, startgfs/1, dec2hex/2, hex2dec/2]).
+ decoderec/2, encode/1, decode/1, ensureIndex/3, clearIndexCache/0, create_id/0, startgfs/1, dec2hex/2, hex2dec/2]).
-include_lib("erlmongo.hrl").
% -compile(export_all).
@@ -72,52 +72,52 @@ print_info() ->
% API
%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-connect() ->
- gen_server:cast(?MODULE, {start_connection, undefined}).
+connect(Pool) ->
+ gen_server:cast(?MODULE, {start_connection, Pool, undefined}).
% For when connection is established. Parameter can be:
% - {Module,Function,Params}
% - PID, that gets a {mongodb_connected} message
-connect(Callback) when is_pid(Callback); is_tuple(Callback), tuple_size(Callback) == 3 ->
- gen_server:cast(?MODULE, {start_connection, Callback}).
+connect(Pool, Callback) when is_pid(Callback); is_tuple(Callback), tuple_size(Callback) == 3 ->
+ gen_server:cast(?MODULE, {start_connection, Pool, Callback}).
-is_connected() ->
- case gen_server:call(?MODULE, {getread}) of
+is_connected(Pool) ->
+ case gen_server:call(?MODULE, {getread,Pool}) of
undefined ->
false;
_ ->
true
end.
-singleServer() ->
- gen_server:cast(?MODULE, {conninfo, {replicaPairs, {"localhost",?MONGO_PORT}, {"localhost",?MONGO_PORT}}}).
-singleServer(Addr) ->
+singleServer(Pool) ->
+ gen_server:cast(?MODULE, {conninfo,Pool, {replicaPairs, {"localhost",?MONGO_PORT}, {"localhost",?MONGO_PORT}}}).
+singleServer(Pool,Addr) ->
[IP,Port] = string:tokens(Addr,":"),
% gen_server:cast(?MODULE, {conninfo, {single, {Addr,Port}}}).
- gen_server:cast(?MODULE, {conninfo, {replicaPairs, {IP,Port}, {IP,Port}}}).
-masterSlave(MasterAddr, SlaveAddr) ->
+ gen_server:cast(?MODULE, {conninfo,Pool, {replicaPairs, {IP,Port}, {IP,Port}}}).
+masterSlave(Pool,MasterAddr, SlaveAddr) ->
[IP1,Port1] = string:tokens(MasterAddr,":"),
[IP2,Port2] = string:tokens(SlaveAddr,":"),
- gen_server:cast(?MODULE, {conninfo, {masterSlave, {IP1,Port1}, {IP2,Port2}}}).
-masterMaster(Addr1,Addr2) ->
+ gen_server:cast(?MODULE, {conninfo,Pool, {masterSlave, {IP1,Port1}, {IP2,Port2}}}).
+masterMaster(Pool,Addr1,Addr2) ->
[IP1,Port1] = string:tokens(Addr1,":"),
[IP2,Port2] = string:tokens(Addr2,":"),
- gen_server:cast(?MODULE, {conninfo, {masterMaster, {IP1,Port1}, {IP2,Port2}}}).
-replicaPairs(Addr1,Addr2) ->
+ gen_server:cast(?MODULE, {conninfo,Pool, {masterMaster, {IP1,Port1}, {IP2,Port2}}}).
+replicaPairs(Pool,Addr1,Addr2) ->
[IP1,Port1] = string:tokens(Addr1,":"),
[IP2,Port2] = string:tokens(Addr2,":"),
- gen_server:cast(?MODULE, {conninfo, {replicaPairs, {IP1,Port1}, {IP2,Port2}}}).
+ gen_server:cast(?MODULE, {conninfo,Pool, {replicaPairs, {IP1,Port1}, {IP2,Port2}}}).
datetime_to_now(Loctime) ->
Secs = calendar:datetime_to_gregorian_seconds(Loctime) - 719528 * 24 * 60 * 60,
{Secs div 1000000, Secs rem 1000000,0}.
-ensureIndex(DB,Bin) ->
- gen_server:cast(?MODULE, {ensure_index, DB, Bin}).
+ensureIndex(Pool,DB,Bin) ->
+ gen_server:cast(?MODULE, {ensure_index,Pool, DB, Bin}).
clearIndexCache() ->
gen_server:cast(?MODULE, {clear_indexcache}).
-exec_cursor(Col, Quer) ->
- case gen_server:call(?MODULE, {getread}) of
+exec_cursor(Pool,Col, Quer) ->
+ case gen_server:call(?MODULE, {getread,Pool}) of
undefined ->
not_connected;
PID ->
@@ -139,12 +139,12 @@ exec_cursor(Col, Quer) ->
not_connected
end
end.
-exec_getmore(Col, C) ->
+exec_getmore(Pool,Col, C) ->
case erlang:is_process_alive(C#cursor.pid) of
false ->
{done, <<>>};
true ->
- case gen_server:call(?MODULE, {getread}) of
+ case gen_server:call(?MODULE, {getread,Pool}) of
undefined ->
not_connected;
PID ->
@@ -166,16 +166,16 @@ exec_getmore(Col, C) ->
end
end
end.
-exec_delete(Collection, D) ->
- case gen_server:call(?MODULE, {getwrite}) of
+exec_delete(Pool,Collection, D) ->
+ case gen_server:call(?MODULE, {getwrite,Pool}) of
undefined ->
not_connected;
PID ->
PID ! {delete, Collection, D},
ok
end.
-exec_find(Collection, Quer) ->
- case gen_server:call(?MODULE, {getread}) of
+exec_find(Pool,Collection, Quer) ->
+ case gen_server:call(?MODULE, {getread,Pool}) of
undefined ->
not_connected;
PID ->
@@ -190,25 +190,25 @@ exec_find(Collection, Quer) ->
not_connected
end
end.
-exec_insert(Collection, D) ->
- case gen_server:call(?MODULE, {getwrite}) of
+exec_insert(Pool,Collection, D) ->
+ case gen_server:call(?MODULE, {getwrite,Pool}) of
undefined ->
not_connected;
PID ->
PID ! {insert, Collection, D},
ok
end.
-exec_update(Collection, D) ->
- case gen_server:call(?MODULE, {getwrite}) of
+exec_update(Pool,Collection, D) ->
+ case gen_server:call(?MODULE, {getwrite,Pool}) of
undefined ->
not_connected;
PID ->
PID ! {update, Collection, D},
ok
end.
-exec_cmd(DB, Cmd) ->
+exec_cmd(Pool,DB, Cmd) ->
Quer = #search{ndocs = 1, nskip = 0, criteria = mongodb:encode(Cmd)},
- case exec_find(<<DB/binary, ".$cmd">>, Quer) of
+ case exec_find(Pool,<<DB/binary, ".$cmd">>, Quer) of
undefined ->
not_connected;
<<>> ->
@@ -234,6 +234,10 @@ startgfs(P) ->
% IMPLEMENTATION
%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+% Process dictionary:
+% {PoolName, #conn{}}
+% {ConnectionPID, PoolName}
+
% read = connection used for reading (find) from mongo server
% write = connection used for writing (insert,update) to mongo server
% single: same as replicaPairs (single server is always master and used for read and write)
@@ -241,15 +245,26 @@ startgfs(P) ->
% replicaPairs: read = write = master
% masterMaster: read = master1, write = master2
% timer is reconnect timer if some connection is missing
+-record(conn, {read, write, timer, conninfo, cb}).
% indexes is ensureIndex cache (an ets table).
--record(mngd, {read, write, conninfo, indexes, timer, hashed_hostn, oid_index = 1, conn_established_cb}).
+-record(mngd, {indexes, hashed_hostn, oid_index = 1}).
-define(R2P(Record), rec2prop(Record, record_info(fields, mngd))).
-define(P2R(Prop), prop2rec(Prop, mngd, #mngd{}, record_info(fields, mngd))).
-handle_call({getread}, _, P) ->
- {reply, P#mngd.read, P};
-handle_call({getwrite}, _, P) ->
- {reply, P#mngd.write, P};
+handle_call({getread, C}, _, P) ->
+ case get(C) of
+ undefined ->
+ undefined;
+ CC ->
+ {reply, CC#conn.read, P}
+ end;
+handle_call({getwrite, C}, _, P) ->
+ case get(C) of
+ undefined ->
+ undefined;
+ CC ->
+ {reply, CC#conn.write, P}
+ end;
handle_call({create_oid}, _, P) ->
WC = element(1,erlang:statistics(wall_clock)) rem 16#ffffffff,
% <<_:20/binary,PID:2/binary,_/binary>> = term_to_binary(self()),
@@ -267,18 +282,19 @@ handle_call(_, _, P) ->
deser_prop(P) ->
?P2R(P).
-startcon(undefined, Type, Addr, Port) when is_list(Port) ->
- startcon(undefined, Type, Addr, list_to_integer(Port));
-startcon(undefined, Type, Addr, Port) ->
+startcon(Name, undefined, Type, Addr, Port) when is_list(Port) ->
+ startcon(Name, undefined, Type, Addr, list_to_integer(Port));
+startcon(Name, undefined, Type, Addr, Port) ->
PID = spawn_link(fun() -> connection(true) end),
- PID ! {start, self(), Type, Addr, Port};
-startcon(PID, _, _, _) ->
+ put(PID,Name),
+ PID ! {start, Name, self(), Type, Addr, Port};
+startcon(_,PID, _, _, _) ->
PID.
-handle_cast({ensure_index, DB, Bin}, P) ->
+handle_cast({ensure_index,Pool, DB, Bin}, P) ->
case ets:lookup(P#mngd.indexes, {DB,Bin}) of
[] ->
- spawn(fun() -> exec_insert(<<DB/binary, ".system.indexes">>, #insert{documents = Bin}) end),
+ spawn(fun() -> exec_insert(Pool,<<DB/binary, ".system.indexes">>, #insert{documents = Bin}) end),
ets:insert(P#mngd.indexes, {{DB,Bin}});
_ ->
true
@@ -287,61 +303,113 @@ handle_cast({ensure_index, DB, Bin}, P) ->
handle_cast({clear_indexcache}, P) ->
ets:delete_all_objects(P#mngd.indexes),
{noreply, P};
-handle_cast({conninfo, Conn}, P) ->
- case P#mngd.read of
+handle_cast({conninfo, Pool, Info}, P) ->
+ case get(Pool) of
undefined ->
+ put(Pool,#conn{conninfo = Info});
+ #conn{read = undefined} = _PI ->
true;
- PID ->
- % reconnect
- PID ! {stop}
+ PI ->
+ PI#conn.read ! {stop}
end,
- {noreply, P#mngd{conninfo = Conn}};
-handle_cast({start_connection, SendBack}, P) ->
- handle_cast({start_connection}, P#mngd{conn_established_cb = SendBack});
-handle_cast({start_connection}, #mngd{conninfo = {masterMaster, {A1,P1},{A2,P2}}} = P) ->
+ {noreply, P};
+handle_cast({start_connection, Pool}, P) ->
+ handle_cast({start_connection,Pool,undefined}, P);
+handle_cast({start_connection, Pool, CB}, P) ->
+ case get(Pool) of
+ undefined ->
+ true;
+ PI ->
+ start_connection(Pool, PI#conn{cb = CB})
+ end,
+ {noreply, P};
+ % handle_cast({start_connection}, P#mngd{conn_established_cb = SendBack});
+% handle_cast({start_connection}, #mngd{conninfo = {masterMaster, {A1,P1},{A2,P2}}} = P) ->
+% case true of
+% _ when P#mngd.read /= P#mngd.write, P#mngd.read /= undefined, P#mngd.write /= undefined ->
+% Timer = ctimer(P#mngd.timer);
+% _ when P#mngd.read == P#mngd.write, P#mngd.read /= undefined ->
+% startcon(undefined, write, A2,P2),
+% Timer = P#mngd.timer;
+% _ ->
+% startcon(P#mngd.read, read, A1,P1),
+% startcon(P#mngd.write, write, A2,P2),
+% Timer = P#mngd.timer
+% % {noreply, P#mngd{read = startcon(P#mngd.read, A1,P1), write = startcon(P#mngd.write,A2,P2)}}
+% end,
+% {noreply, P#mngd{timer = Timer}};
+% handle_cast({start_connection}, #mngd{conninfo = {masterSlave, {A1,P1},{A2,P2}}} = P) ->
+% case true of
+% % All ok.
+% _ when P#mngd.read /= P#mngd.write, P#mngd.read /= undefined, P#mngd.write /= undefined ->
+% Timer = ctimer(P#mngd.timer);
+% % Read = write = master, try to connect to slave again
+% _ when P#mngd.read == P#mngd.write, P#mngd.read /= undefined ->
+% startcon(undefined, read, A2,P2),
+% Timer = P#mngd.timer;
+% % One or both of the connections is down
+% _ ->
+% startcon(P#mngd.read, read, A2,P2),
+% startcon(P#mngd.write, write, A1,P1),
+% Timer = P#mngd.timer
+% end,
+% {noreply, P#mngd{timer = Timer}};
+% handle_cast({start_connection}, #mngd{conninfo = {replicaPairs, {A1,P1},{A2,P2}}} = P) ->
+% case true of
+% _ when P#mngd.read /= undefined, P#mngd.write == P#mngd.read ->
+% {noreply, P#mngd{timer = ctimer(P#mngd.timer)}};
+% _ ->
+% startcon(undefined, ifmaster, A1,P1),
+% startcon(undefined, ifmaster, A2,P2),
+% {noreply, P}
+% end;
+handle_cast({print_info}, P) ->
+ io:format("~p~n~p~n", [get(),?R2P(P)]),
+ {noreply, P};
+handle_cast(_, P) ->
+ {noreply, P}.
+
+start_connection(Name, #conn{conninfo = {masterMaster, {A1,P1},{A2,P2}}} = P) ->
case true of
- _ when P#mngd.read /= P#mngd.write, P#mngd.read /= undefined, P#mngd.write /= undefined ->
- Timer = ctimer(P#mngd.timer);
- _ when P#mngd.read == P#mngd.write, P#mngd.read /= undefined ->
- startcon(undefined, write, A2,P2),
- Timer = P#mngd.timer;
+ _ when P#conn.read /= P#conn.write, P#conn.read /= undefined, P#conn.write /= undefined ->
+ Timer = ctimer(P#conn.timer);
+ _ when P#conn.read == P#conn.write, P#conn.read /= undefined ->
+ startcon(Name, undefined, write, A2,P2),
+ Timer = P#conn.timer;
_ ->
- startcon(P#mngd.read, read, A1,P1),
- startcon(P#mngd.write, write, A2,P2),
- Timer = P#mngd.timer
+ startcon(Name, P#conn.read, read, A1,P1),
+ startcon(Name, P#conn.write, write, A2,P2),
+ Timer = P#conn.timer
% {noreply, P#mngd{read = startcon(P#mngd.read, A1,P1), write = startcon(P#mngd.write,A2,P2)}}
end,
- {noreply, P#mngd{timer = Timer}};
-handle_cast({start_connection}, #mngd{conninfo = {masterSlave, {A1,P1},{A2,P2}}} = P) ->
+ put(Name,P#conn{timer = Timer});
+start_connection(Name, #conn{conninfo = {masterSlave, {A1,P1},{A2,P2}}} = P) ->
case true of
% All ok.
- _ when P#mngd.read /= P#mngd.write, P#mngd.read /= undefined, P#mngd.write /= undefined ->
- Timer = ctimer(P#mngd.timer);
+ _ when P#conn.read /= P#conn.write, P#conn.read /= undefined, P#conn.write /= undefined ->
+ Timer = ctimer(P#conn.timer);
% Read = write = master, try to connect to slave again
- _ when P#mngd.read == P#mngd.write, P#mngd.read /= undefined ->
- startcon(undefined, read, A2,P2),
- Timer = P#mngd.timer;
+ _ when P#conn.read == P#conn.write, P#conn.read /= undefined ->
+ startcon(Name, undefined, read, A2,P2),
+ Timer = P#conn.timer;
% One or both of the connections is down
_ ->
- startcon(P#mngd.read, read, A2,P2),
- startcon(P#mngd.write, write, A1,P1),
- Timer = P#mngd.timer
+ startcon(Name, P#conn.read, read, A2,P2),
+ startcon(Name, P#conn.write, write, A1,P1),
+ Timer = P#conn.timer
end,
- {noreply, P#mngd{timer = Timer}};
-handle_cast({start_connection}, #mngd{conninfo = {replicaPairs, {A1,P1},{A2,P2}}} = P) ->
+ put(Name,P#conn{timer = Timer});
+start_connection(Name, #conn{conninfo = {replicaPairs, {A1,P1},{A2,P2}}} = P) ->
case true of
- _ when P#mngd.read /= undefined, P#mngd.write == P#mngd.read ->
- {noreply, P#mngd{timer = ctimer(P#mngd.timer)}};
+ _ when P#conn.read /= undefined, P#conn.write == P#conn.read ->
+ put(Name,P#conn{timer = ctimer(P#conn.timer)});
+ % {noreply, P#conn{timer = ctimer(P#conn.timer)}};
_ ->
- startcon(undefined, ifmaster, A1,P1),
- startcon(undefined, ifmaster, A2,P2),
- {noreply, P}
+ startcon(Name, undefined, ifmaster, A1,P1),
+ startcon(Name, undefined, ifmaster, A2,P2)
end;
-handle_cast({print_info}, P) ->
- io:format("~p~n", [?R2P(P)]),
- {noreply, P};
-handle_cast(_, P) ->
- {noreply, P}.
+start_connection(_,_) ->
+ true.
ctimer(undefined) ->
undefined;
@@ -349,10 +417,10 @@ ctimer(T) ->
timer:cancel(T),
undefined.
-timer(undefined) ->
- {ok, Timer} = timer:send_interval(?RECONNECT_DELAY, {reconnect}),
+timer(undefined,Pool) ->
+ {ok, Timer} = timer:send_interval(?RECONNECT_DELAY, {reconnect,Pool}),
Timer;
-timer(T) ->
+timer(T,_) ->
T.
conn_callback(P) ->
@@ -368,48 +436,74 @@ conn_callback(P) ->
end
end.
-handle_info({conn_established, read, ConnProc}, P) ->
- conn_callback(P#mngd.conn_established_cb),
- {noreply, P#mngd{read = ConnProc}};
-handle_info({conn_established, write, ConnProc}, P) ->
- {noreply, P#mngd{write = ConnProc}};
-handle_info({reconnect}, P) ->
- handle_cast({start_connection}, P);
-handle_info({'EXIT', PID, _Reason}, #mngd{conninfo = {replicaPairs, _, _}} = P) ->
- case true of
- _ when P#mngd.read == PID; P#mngd.read == undefined ->
- {noreply, P#mngd{read = undefined, write = undefined, timer = timer(P#mngd.timer)}};
- _ ->
- {noreply, P}
- end;
-handle_info({'EXIT', PID, _Reason}, #mngd{conninfo = {masterSlave, _, _}} = P) ->
- case true of
- _ when P#mngd.read == PID, P#mngd.read /= P#mngd.write ->
- {noreply, P#mngd{read = P#mngd.write, timer = timer(P#mngd.timer)}};
- _ when P#mngd.read == PID ->
- {noreply, P#mngd{read = undefined, write = undefined, timer = timer(P#mngd.timer)}};
- _ when P#mngd.write == PID ->
- {noreply, P#mngd{write = undefined, timer = timer(P#mngd.timer)}};
- _ ->
- {noreply, P}
- end;
-handle_info({'EXIT', PID, _Reason}, #mngd{conninfo = {masterMaster, _, _}} = P) ->
- case true of
- _ when P#mngd.read == PID, P#mngd.write == PID ->
- {noreply, P#mngd{read = undefined, write = undefined, timer = timer(P#mngd.timer)}};
- _ when P#mngd.read == PID ->
- {noreply, P#mngd{read = P#mngd.write, timer = timer(P#mngd.timer)}};
- _ when P#mngd.write == PID ->
- {noreply, P#mngd{write = P#mngd.read, timer = timer(P#mngd.timer)}};
- _ ->
- {noreply, P}
- end;
+handle_info({conn_established, Pool, read, ConnProc}, P) ->
+ case get(Pool) of
+ undefined ->
+ true;
+ PI ->
+ put(ConnProc,Pool),
+ put(Pool,PI#conn{read = ConnProc}),
+ conn_callback(PI#conn.cb)
+ end,
+ {noreply, P};
+handle_info({conn_established, Pool, write, ConnProc}, P) ->
+ case get(Pool) of
+ undefined ->
+ true;
+ PI ->
+ put(ConnProc,Pool),
+ put(Pool,PI#conn{write = ConnProc}),
+ conn_callback(PI#conn.cb)
+ end,
+ {noreply, P};
+handle_info({reconnect, Pool}, P) ->
+ handle_cast({start_connection, Pool}, P);
+handle_info({'EXIT', PID,_}, P) ->
+ case get(PID) of
+ undefined ->
+ true;
+ Pool ->
+ erase(PID),
+ conndied(Pool,PID,get(Pool))
+ end,
+ {noreply, P};
+% handle_info({'EXIT', PID, _Reason}, #mngd{conninfo = {replicaPairs, _, _}} = P) ->
+% case true of
+% _ when P#mngd.read == PID; P#mngd.read == undefined ->
+% {noreply, P#mngd{read = undefined, write = undefined, timer = timer(P#mngd.timer)}};
+% _ ->
+% {noreply, P}
+% end;
+% handle_info({'EXIT', PID, _Reason}, #mngd{conninfo = {masterSlave, _, _}} = P) ->
+% case true of
+% _ when P#mngd.read == PID, P#mngd.read /= P#mngd.write ->
+% {noreply, P#mngd{read = P#mngd.write, timer = timer(P#mngd.timer)}};
+% _ when P#mngd.read == PID ->
+% {noreply, P#mngd{read = undefined, write = undefined, timer = timer(P#mngd.timer)}};
+% _ when P#mngd.write == PID ->
+% {noreply, P#mngd{write = undefined, timer = timer(P#mngd.timer)}};
+% _ ->
+% {noreply, P}
+% end;
+% handle_info({'EXIT', PID, _Reason}, #mngd{conninfo = {masterMaster, _, _}} = P) ->
+% case true of
+% _ when P#mngd.read == PID, P#mngd.write == PID ->
+% {noreply, P#mngd{read = undefined, write = undefined, timer = timer(P#mngd.timer)}};
+% _ when P#mngd.read == PID ->
+% {noreply, P#mngd{read = P#mngd.write, timer = timer(P#mngd.timer)}};
+% _ when P#mngd.write == PID ->
+% {noreply, P#mngd{write = P#mngd.read, timer = timer(P#mngd.timer)}};
+% _ ->
+% {noreply, P}
+% end;
handle_info({query_result, Src, <<_:32/binary, Res/binary>>}, P) ->
+ PI = get(get(Src)),
try mongodb:decode(Res) of
- [[{<<"ismaster">>, 1}|_]] when element(1,P#mngd.conninfo) == replicaPairs, P#mngd.read == undefined ->
+ [[{<<"ismaster">>, 1}|_]] when element(1,PI#conn.conninfo) == replicaPairs, PI#conn.read == undefined ->
link(Src),
- conn_callback(P#mngd.conn_established_cb),
- {noreply, P#mngd{read = Src, write = Src}};
+ conn_callback(PI#conn.cb),
+ put(get(Src),PI#conn{read = Src, write = Src}),
+ {noreply, P};
_X ->
% io:format("~p~n", [_X]),
Src ! {stop},
@@ -426,6 +520,38 @@ handle_info({query_result, Src, _}, P) ->
handle_info(_X, P) ->
io:format("~p~n", [_X]),
{noreply, P}.
+
+conndied(Name, PID, #conn{conninfo = {replicaPairs, _, _}} = P) ->
+ case true of
+ _ when P#conn.read == PID; P#conn.read == undefined ->
+ put(Name, P#conn{read = undefined, write = undefined, timer = timer(P#conn.timer, Name)});
+ _ ->
+ true
+ end;
+conndied(Name, PID, #conn{conninfo = {masterSlave, _, _}} = P) ->
+ case true of
+ _ when P#conn.read == PID, P#conn.read /= P#conn.write ->
+ put(Name, P#conn{read = P#conn.write, timer = timer(P#conn.timer, Name)});
+ _ when P#conn.read == PID ->
+ put(Name, P#conn{read = undefined, write = undefined, timer = timer(P#conn.timer, Name)});
+ _ when P#conn.write == PID ->
+ put(Name, P#conn{write = undefined, timer = timer(P#conn.timer, Name)});
+ _ ->
+ true
+ end;
+conndied(Name, PID, #conn{conninfo = {masterMaster, _, _}} = P) ->
+ case true of
+ _ when P#conn.read == PID, P#conn.write == PID ->
+ put(Name, P#conn{read = undefined, write = undefined, timer = timer(P#conn.timer, Name)});
+ _ when P#conn.read == PID ->
+ put(Name, P#conn{read = P#conn.write, timer = timer(P#conn.timer, Name)});
+ _ when P#conn.write == PID ->
+ put(Name, P#conn{write = P#conn.read, timer = timer(P#conn.timer, Name)});
+ _ ->
+ true
+ end;
+conndied(_,_,_) ->
+ true.
terminate(_, _) ->
ok.
@@ -463,13 +589,13 @@ gfs_proc(#gfs_state{mode = write} = P, Buf) ->
{start} ->
process_flag(trap_exit,true),
FileID = (P#gfs_state.file)#gfs_file.docid,
- exec_update(<<(P#gfs_state.collection)/binary, ".files">>, #update{selector = mongodb:encode([{<<"_id">>, {oid, FileID}}]),
+ exec_update(P#gfs_state.pool,<<(P#gfs_state.collection)/binary, ".files">>, #update{selector = mongodb:encode([{<<"_id">>, {oid, FileID}}]),
document = mongodb:encoderec(P#gfs_state.file)}),
Keys = [{<<"files_id">>, 1},{<<"n">>,1}],
Bin = mongodb:encode([{plaintext, <<"name">>, gen_prop_keyname(Keys, <<>>)},
{plaintext, <<"ns">>, <<(P#gfs_state.collection)/binary, ".chunks">>},
{<<"key">>, {bson, encode(Keys)}}]),
- ensureIndex(P#gfs_state.db, Bin),
+ ensureIndex(P#gfs_state.pool,P#gfs_state.db, Bin),
gfs_proc(P,<<>>)
% X ->
% io:format("Received unknown msg ~p~n", [X])
@@ -549,14 +675,14 @@ gfsflush(P, Bin, Out) ->
<<>>, <<Out/binary, (mongodb:encoderec(Chunk))/binary>>);
Rem when byte_size(Out) > 0 ->
File = P#gfs_state.file,
- exec_insert(<<(P#gfs_state.collection)/binary, ".chunks">>, #insert{documents = Out}),
+ exec_insert(P#gfs_state.pool,<<(P#gfs_state.collection)/binary, ".chunks">>, #insert{documents = Out}),
case P#gfs_state.closed of
true ->
- [{<<"md5">>, MD5}|_] = exec_cmd(P#gfs_state.db, [{<<"filemd5">>, {oid, FileID}},{<<"root">>, P#gfs_state.collection}]);
+ [{<<"md5">>, MD5}|_] = exec_cmd(P#gfs_state.pool,P#gfs_state.db, [{<<"filemd5">>, {oid, FileID}},{<<"root">>, P#gfs_state.collection}]);
false ->
MD5 = undefined
end,
- exec_update(<<(P#gfs_state.collection)/binary, ".files">>, #update{selector = mongodb:encode([{<<"_id">>, {oid, FileID}}]),
+ exec_update(P#gfs_state.pool,<<(P#gfs_state.collection)/binary, ".files">>, #update{selector = mongodb:encode([{<<"_id">>, {oid, FileID}}]),
document = mongodb:encoderec(File#gfs_file{length = P#gfs_state.length,
md5 = MD5})}),
gfsflush(P, Rem, <<>>);
@@ -625,7 +751,7 @@ connection(#con{state = free} = P, <<>>) ->
connection(P, <<>>);
{stop} ->
true;
- {start, Source, Type, IP, Port} ->
+ {start, Pool, Source, Type, IP, Port} ->
{A1,A2,A3} = now(),
random:seed(A1, A2, A3),
{ok, Sock} = gen_tcp:connect(IP, Port, [binary, {packet, 0}, {active, true}, {keepalive, true}]),
@@ -633,7 +759,7 @@ connection(#con{state = free} = P, <<>>) ->
ifmaster ->
self() ! {find, Source, <<"admin.$cmd">>, #search{nskip = 0, ndocs = 1, criteria = mongodb:encode([{<<"ismaster">>, 1}])}};
_ ->
- Source ! {conn_established, Type, self()}
+ Source ! {conn_established, Pool, Type, self()}
end,
connection(#con{sock = Sock}, <<>>);
{tcp_closed, _} ->
Please sign in to comment.
Something went wrong with that request. Please try again.