Skip to content
Browse files

first commit

  • Loading branch information...
0 parents commit b446d3a26b0b358f440870aa828d63949b13df8d @SergejJurecko committed
Showing with 1,335 additions and 0 deletions.
  1. +203 −0 LICENSE.txt
  2. +31 −0 README.rdoc
  3. +8 −0 erlmongo.app
  4. +46 −0 erlmongo.hrl
  5. +11 −0 erlmongo_app.erl
  6. +242 −0 mongoapi.erl
  7. +775 −0 mongodb.erl
  8. +19 −0 mongodb_supervisor.erl
203 LICENSE.txt
@@ -0,0 +1,203 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
31 README.rdoc
@@ -0,0 +1,31 @@
+= Info
+
+Erlmongo is a pretty complete Erlang driver for mongodb. Gridfs stuff is not supported yet, but it will be done eventually.
+
+It supports records and proplists as datatypes. Strings can be lists or binaries, but strings received from mongodb (as a result of find) will be binaries.
+The only built in limitation is in regards to record field names. They need to start with [a-z] (because of records) and be in latin1 (performance reason). Record values can of course be anything. It's a stupid idea to use non ascii characters as field names anyway.
+
+I haven't used erlmongo in production yet, so all the bugs might not be ironed out and there are a few inconsistencies with the api (I'll fix them in the near future).
+
+= Examples
+ rr("erlmongo.hrl").
+ % Set mongodb server info. singleServer() is the same as singleServer("localhost:27017")
+ mongodb:singleServer().
+ mongodb:connect().
+ % Create an interface for test database (it has to be a binary)
+ Mong = mongoapi:new(<<"test">>).
+
+ % Save a new document
+ Mong:save(#mydoc{name = "MyDocument", i = 10}).
+ % Return the document, but only the "i" field (+ _id which always gets returned)
+ Mong:findOne(#mydoc{i = 10}, [#mydoc.i]).
+
+
+ % Set Index. First parameter is so that the driver knows what collection
+ % we mean. If you have an already constructed record laying around use that.
+ % No need to construct a new record just so the driver can read the name.
+ % Second parameter the index we wish to create. 1 = ascending, -1 = descending.
+ Mong:ensureIndex(#mydoc{}, [{#mydoc.i, 1}, {#mydoc.name, -1}])
+
+
+
8 erlmongo.app
@@ -0,0 +1,8 @@
+{application, erlmongo, [{description, "Erlang driver for mongodb"},
+ {vsn, "0.1"},
+ {modules, [erlmongo_app, mongodb, mongoapi, mongodb_supervisor]},
+ {registered, [mongodb, mongodb_supervisor]},
+ {applications, [kernel, stdlib]},
+ {mod, {erlmongo_app, []}},
+ {start_phases, []}
+ ]}.
46 erlmongo.hrl
@@ -0,0 +1,46 @@
+% recindex is index of record in RECTABLE define, it has to be the first record field
+% docid is _id in mongodb, it has to be named docid and it has to be the second field in the record
+-record(mydoc, {recindex = 1, docid, name, i}).
+% A table of records used with mongodb.
+-define(RECTABLE, {record_info(fields, mydoc)}).
+
+
+-export([rec2prop/2, prop2rec/4]).
+
+% Convert record to prop list
+rec2prop(Rec, RecordFields) ->
+ loop_rec(RecordFields, 1, Rec, []).
+
+loop_rec([H|T], N, Rec, L) ->
+ loop_rec(T, N+1, Rec, [{H, element(N+1, Rec)}|L]);
+loop_rec([], _, _, L) ->
+ L.
+
+% convert prop list to record
+prop2rec(Prop, RecName, DefRec, RecordFields) ->
+ loop_fields(erlang:make_tuple(tuple_size(DefRec), RecName), RecordFields, DefRec, Prop, 2).
+
+loop_fields(Tuple, [Field|T], DefRec, Props, N) ->
+ case lists:keysearch(Field, 1, Props) of
+ {value, {_, Val}} ->
+ loop_fields(setelement(N, Tuple, Val), T, DefRec, Props, N+1);
+ false ->
+ loop_fields(setelement(N, Tuple, element(N, DefRec)), T, DefRec, Props, N+1)
+ end;
+loop_fields(Tuple, [], _, _, _) ->
+ Tuple.
+
+
+% mongo
+-define(QUER_OPT_NONE, 0).
+-define(QUER_OPT_CURSOR, 2).
+-define(QUER_OPT_SLAVEOK, 4).
+-define(QUER_OPT_NOTIMEOUT, 16).
+
+-record(cursor, {id, pid, limit = 0}).
+-record(update, {upsert = 1, selector = <<>>, document = <<>>}).
+-record(insert, {documents = []}).
+-record(quer, {ndocs = 1, nskip = 0, quer = <<>>, field_selector = <<>>, opts = ?QUER_OPT_NONE}).
+-record(delete, {selector = <<>>}).
+-record(killc, {cur_ids = <<>>}).
+
11 erlmongo_app.erl
@@ -0,0 +1,11 @@
+-module(erlmongo_app).
+-behavior(application).
+-export([start/2, stop/1]).
+
+
+start(_Type, _Args) ->
+ mongodb_supervisor:start_link().
+
+
+stop(_State) ->
+ ok.
242 mongoapi.erl
@@ -0,0 +1,242 @@
+-module(mongoapi, [DB]).
+% -export([save/1,findOne/2,findOne/1,find/1,find/2,find/3,find/4, update/2, insert/1]).
+-compile(export_all).
+-include_lib("erlmongo.hrl").
+
+name([_|_] = Collection) ->
+ name(list_to_binary(Collection));
+name(<<_/binary>> = Collection) ->
+ <<DB/binary, ".", Collection/binary>>;
+name(Collection) when is_atom(Collection) ->
+ name(atom_to_binary(Collection, latin1)).
+
+remove(Rec, Selector) when is_tuple(Rec) ->
+ mongodb:exec_delete(name(element(1,Rec)), #delete{selector = mongodb:encoderec_selector(Rec, Selector)});
+remove(Col, Selector) ->
+ mongodb:exec_delete(name(Col), #delete{selector = mongodb:encode(Selector)}).
+
+save(Collection, [_|_] = L) ->
+ % io:format("Save on ~p~n", [L]),
+ case lists:keysearch(<<"_id">>, 1, L) of
+ false ->
+ mongodb:exec_insert(name(Collection), #insert{documents = mongodb:encode(L)});
+ {value, {_, ID}} ->
+ mongodb:exec_update(name(Collection), #update{selector = mongodb:encode([{"_id", ID}]), document = mongodb:encode(L)})
+ end.
+save(Rec) ->
+ case element(3, Rec) of
+ undefined ->
+ mongodb:exec_insert(name(element(1,Rec)), #insert{documents = mongodb:encoderec(Rec)});
+ ID ->
+ mongodb:exec_update(name(element(1,Rec)), #update{selector = mongodb:encode([{"_id", ID}]), document = mongodb:encoderec(Rec)})
+ end.
+
+
+update(Collection, [_|_] = Selector, [_|_] = Doc) ->
+ mongodb:exec_update(name(Collection), #update{selector = mongodb:encode(Selector), document = mongodb:encode(Doc)}).
+% update([{#mydoc.name, "docname"}], #mydoc{})
+update(Selector, Rec) ->
+ mongodb:exec_update(name(element(1,Rec)), #update{selector = mongodb:encoderec_selector(Rec, Selector), document = mongodb:encoderec(Rec)}).
+
+insert(Col, [_|_] = L) ->
+ mongodb:exec_insert(name(Col), #insert{documents = mongodb:encode(L)}).
+insert(Rec) ->
+ mongodb:exec_insert(name(element(1,Rec)), #insert{documents = mongodb:encoderec(Rec)}).
+
+batchInsert(Col, [[_|_]|_] = LRecs) ->
+ DocBin = lists:foldl(fun(L, Bin) -> <<Bin/binary, (mongodb:encode(L))/binary>> end, <<>>, LRecs),
+ mongodb:exec_insert(name(Col), #insert{documents = DocBin}).
+batchInsert(LRecs) ->
+ [FRec|_] = LRecs,
+ DocBin = lists:foldl(fun(Rec, Bin) -> <<Bin/binary, (mongodb:encoderec(Rec))/binary>> end, <<>>, LRecs),
+ mongodb:exec_insert(name(element(1,FRec)), #insert{documents = DocBin}).
+
+findOne(Col, []) ->
+ find(Col, [], undefined, 0, 1);
+findOne(Col, [_|_] = Query) when is_tuple(Col) == false ->
+ find(Col, Query, undefined, 0, 0);
+findOne(Query, Selector) when is_tuple(Query) ->
+ [Res] = find(Query, Selector, 0, 1),
+ Res.
+
+findOne(Query) when is_tuple(Query) ->
+ [Res] = find(Query, undefined, 0, 1),
+ Res.
+findOne(Col, [_|_] = Query, [_|_] = Selector) ->
+ [Res] = find(Col, Query, Selector, 0, 1),
+ Res.
+
+% opts: [reverse, {sort, SortyBy}, explain, {hint, Hint}, snapshot]
+% SortBy: {key, Val} or a list of keyval tuples -> {i,1} (1 = ascending, -1 = descending)
+% Hint: key
+findOpt(Col, Query, Selector, Opts, From, Limit) ->
+ find(Col, translateopts(undefined, Opts,[]) ++ [{<<"query">>, Query}], Selector, From, Limit).
+% SortBy examples: {#mydoc.name, 1}, [{#mydoc.i, 1},{#mydoc.name,-1}]
+% Hint example: #mydoc.name
+findOpt(Query, Selector, Opts, From, Limit) ->
+ Quer = #quer{ndocs = Limit, nskip = From, field_selector = mongodb:encoderec_selector(Query, Selector),
+ quer = mongodb:encode(translateopts(Query, Opts,[]) ++ [{<<"query">>, {bson, mongodb:encoderec(Query)}}])},
+ case mongodb:exec_find(name(element(1,Query)), Quer) of
+ false ->
+ false;
+ Result ->
+ mongodb:decoderec(Query, Result)
+ end.
+
+cursor(Query, Selector, Opts, From, Limit) ->
+ Quer = #quer{ndocs = Limit, nskip = From, field_selector = mongodb:encoderec_selector(Query, Selector),
+ quer = mongodb:encode(translateopts(Query, Opts,[]) ++ [{<<"query">>, {bson, mongodb:encoderec(Query)}}]),
+ opts = ?QUER_OPT_CURSOR},
+ case mongodb:exec_cursor(name(element(1,Query)), Quer) of
+ false ->
+ false;
+ {done, Result} ->
+ {done, mongodb:decoderec(Query, Result)};
+ {Cursor, Result} ->
+ {ok, Cursor, mongodb:decoderec(Query, Result)}
+ end.
+getMore(Rec, Cursor) ->
+ case mongodb:exec_getmore(Cursor) of
+ false ->
+ false;
+ {done, Result} ->
+ {done, mongodb:decoderec(Rec, Result)};
+ {ok, Result} ->
+ {ok, mongodb:decoderec(Rec, Result)}
+ end.
+closeCursor(Cur) ->
+ Cur#cursor.pid ! {cleanup},
+ ok.
+
+translateopts(undefined, [{sort, [_|_] = SortBy}|T], L) ->
+ translateopts(undefined, T, [{<<"orderby">>, SortBy}|L]);
+translateopts(undefined, [{sort, {Key,Val}}|T], L) ->
+ translateopts(undefined, T, [{<<"orderby">>, [{Key,Val}]}|L]);
+translateopts(Rec, [{sort, [_|_] = SortBy}|T], L) ->
+ translateopts(Rec, T, [{<<"orderby">>, {bson, mongodb:encoderec_selector(Rec, SortBy)}}|L]);
+translateopts(Rec, [{sort, {Key,Val}}|T], L) ->
+ translateopts(Rec, T, [{<<"orderby">>, {bson, mongodb:encoderec_selector(Rec, [{Key,Val}])}}|L]);
+translateopts(Rec, [reverse|T], L) ->
+ translateopts(Rec, T, [{<<"orderby">>, [{<<"$natural">>, -1}]}|L]);
+translateopts(Rec, [explain|T], L) ->
+ translateopts(Rec, T, [{<<"$explain">>, true}|L]);
+translateopts(Rec, [snapshot|T], L) ->
+ translateopts(Rec, T, [{<<"$snapshot">>, true}|L]);
+translateopts(undefined, [hint, Hint|T], L) ->
+ translateopts(undefined, T, [{<<"$hint">>, [{Hint, 1}]}|L]);
+translateopts(Rec, [hint, Hint|T], L) ->
+ translateopts(Rec, T, [{<<"$hint">>, {bson, mongodb:encoderec_selector([Hint])}}|L]);
+translateopts(_, [], L) ->
+ L.
+
+find(Col, Query, Selector, From, Limit) ->
+ Quer = #quer{ndocs = Limit, nskip = From, quer = mongodb:encode(Query), field_selector = mongodb:encode(Selector)},
+ case mongodb:exec_find(name(Col), Quer) of
+ false ->
+ false;
+ Res ->
+ mongodb:decode(Res)
+ end.
+find(Query, Selector, From, Limit) ->
+ Quer = #quer{ndocs = Limit, nskip = From, quer = mongodb:encoderec(Query), field_selector = mongodb:encoderec_selector(Query, Selector)},
+ case mongodb:exec_find(name(element(1,Query)), Quer) of
+ false ->
+ false;
+ Result ->
+ mongodb:decoderec(Query, Result)
+ end.
+
+ensureIndex(Rec, Keys) ->
+ Bin = mongodb:encode([{plaintext, <<"name">>, mongodb:gen_keyname(Rec, Keys)},
+ {plaintext, <<"ns">>, name(element(1,Rec))},
+ {<<"key">>, {bson, mongodb:encoderec_selector(Rec, Keys)}}]),
+ mongodb:exec_insert(<<DB/binary, ".system.indexes">>, #insert{documents = Bin}).
+
+deleteIndexes([_|_] = Collection) ->
+ deleteIndexes(list_to_binary(Collection));
+deleteIndexes(<<_/binary>> = Collection) ->
+ mongodb:exec_cmd(DB, [{plaintext, <<"deleteIndexes">>, Collection}, {plaintext, <<"index">>, <<"*">>}]).
+
+deleteIndex(Rec, Key) ->
+ mongodb:exec_cmd(DB,[{plaintext, <<"deleteIndexes">>, atom_to_binary(element(1,Rec), latin1)},
+ {plaintext, <<"index">>, mongodb:gen_keyname(Rec,Key)}]).
+
+count([_|_] = Col) ->
+ count(list_to_binary(Col));
+count(<<_/binary>> = Col) ->
+ case mongodb:exec_cmd(DB, [{plaintext, <<"count">>, Col}, {plaintext, <<"ns">>, DB}]) of
+ [{<<"n">>, Val}|_] ->
+ round(Val);
+ _ ->
+ false
+ end;
+count(Col) when is_tuple(Col) ->
+ count(atom_to_binary(Col, latin1)).
+
+
+addUser(U, P) when is_binary(U) ->
+ addUser(binary_to_list(U),P);
+addUser(U, P) when is_binary(P) ->
+ addUser(U,binary_to_list(P));
+addUser(Username, Password) ->
+ save(<<"system.users">>, [{<<"user">>, Username},
+ {<<"pwd">>, bin_to_hexstr(erlang:md5(Username ++ ":mongo:" ++ Password))}]).
+
+bin_to_hexstr(Bin) ->
+ lists:flatten([io_lib:format("~2.16.0B", [X]) || X <- binary_to_list(Bin)]).
+
+% Runs $cmd. Parameters can be just a string it will be converted into {string,1}
+runCmd({_,_} = T) ->
+ runCmd([T]);
+runCmd([{_,_}|_] = L) ->
+ mongodb:exec_cmd(DB, L);
+runCmd([_|_] = L) ->
+ runCmd([{L,1}]);
+runCmd(<<_/binary>> = L) ->
+ runCmd(binary_to_list(L)).
+
+repairDatabase() ->
+ runCmd([{"repairDatabase", 1}]).
+dropDatabase() ->
+ runCmd([{"dropDatabase", 1}]).
+cloneDatabase(From) when is_list(From); is_binary(From) ->
+ runCmd([{"clone", From}]).
+
+dropCollection(C) when is_tuple(C) ->
+ dropCollection(atom_to_binary(element(1,C),latin1));
+dropCollection(Collection) ->
+ runCmd([{"drop", Collection}]).
+
+createCollection(Name) ->
+ createCollection(Name, []).
+% Options: idindex, noidindex, capped, {size, MaxSizeBytes}, {max, MaxElements}
+createCollection(Name, L) when is_tuple(Name) ->
+ createCollection(atom_to_binary(element(1,Name), latin1), L);
+createCollection(Name, L) ->
+ runCmd([{<<"create">>, Name}] ++ translatecolopts(L, [])).
+
+translatecolopts([idindex|T], O) ->
+ translatecolopts(T, [{<<"autoIndexId">>, true}|O]);
+translatecolopts([noidindex|T], O) ->
+ translatecolopts(T, [{<<"autoIndexId">>, false}|O]);
+translatecolopts([capped|T], O) ->
+ translatecolopts(T, [{<<"capped">>, true}|O]);
+translatecolopts([{size, MaxSize}|T], O) ->
+ translatecolopts(T, [{<<"size">>, MaxSize}|O]);
+translatecolopts([{max, Max}|T], O) ->
+ translatecolopts(T, [{<<"max">>, Max}|O]);
+translatecolopts([], O) ->
+ O.
+
+setProfilingLevel(L) when is_integer(L) ->
+ case true of
+ _ when L > 0 ->
+ createCollection(<<"system.profile">>, [capped, {size, 131072}]);
+ _ when L >= 0, L =< 2 ->
+ true
+ end,
+ runCmd([{"profile", L}]).
+getProfilingLevel() ->
+ runCmd([{"profile", -1}]).
+
+
775 mongodb.erl
@@ -0,0 +1,775 @@
+-module(mongodb).
+-export([deser_prop/1,reload/0, print_info/0, start/0, stop/0, init/1, handle_call/3,
+ handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+-export([connect/0, exec_cursor/2, exec_delete/2, exec_cmd/2, exec_insert/2, exec_find/2, exec_update/2, exec_getmore/2,
+ encoderec/1, encoderec_selector/2, gen_keyname/2, decoderec/2, encode/1, decode/1,
+ singleServer/1, singleServer/0, masterSlave/2,masterMaster/2, replicaPairs/2]).
+-include_lib("erlmongo.hrl").
+% -define(RIN, record_info(fields, enctask)).
+
+
+% -compile(export_all).
+-define(MONGO_PORT, 27017).
+-define(RECONNECT_DELAY, 1000).
+
+-define(OP_REPLY, 1).
+-define(OP_MSG, 1000).
+-define(OP_UPDATE, 2001).
+-define(OP_INSERT, 2002).
+-define(OP_QUERY, 2004).
+-define(OP_GET_MORE, 2005).
+-define(OP_DELETE, 2006).
+-define(OP_KILL_CURSORS, 2007).
+
+
+reload() ->
+ gen_server:call(?MODULE, {reload_module}).
+ % code:purge(?MODULE),
+ % code:load_file(?MODULE),
+ % spawn(fun() -> register() end).
+
+start() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+stop() ->
+ gen_server:call(?MODULE, stop).
+
+% register() ->
+% supervisor:start_child(supervisor, {?MODULE, {?MODULE, start, []}, permanent, 1000, worker, [?MODULE]}).
+
+print_info() ->
+ gen_server:cast(?MODULE, {print_info}).
+
+
+% SPEED TEST
+% loop(N) ->
+% io:format("~p~n", [now()]),
+% t(N, true),
+% io:format("~p~n", [now()]).
+
+% t(0, _) ->
+% true;
+% t(N, R) ->
+% % encoderec(#mydoc{name = <<"IZ_RECORDA">>, i = 12}),
+% % decoderec(#mydoc{}, R),
+% ensureIndex(#mydoc{}, [{#mydoc.name, -1},{#mydoc.i, 1}]),
+% t(N-1, R).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% API
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+connect() ->
+ gen_server:cast(?MODULE, {start_connection}).
+singleServer() ->
+ gen_server:cast(?MODULE, {conninfo, {replicaPairs, {"localhost",?MONGO_PORT}, {"localhost",?MONGO_PORT}}}).
+singleServer(Addr) ->
+ [Addr,Port] = string:tokens(Addr,":"),
+ % gen_server:cast(?MODULE, {conninfo, {single, {Addr,Port}}}).
+ gen_server:cast(?MODULE, {conninfo, {replicaPairs, {Addr,Port}, {Addr,Port}}}).
+masterSlave(Addr1, Addr2) ->
+ [Addr1,Port1] = string:tokens(Addr1,":"),
+ [Addr2,Port2] = string:tokens(Addr2,":"),
+ gen_server:cast(?MODULE, {conninfo, {masterSlave, {Addr1,Port1}, {Addr2,Port2}}}).
+masterMaster(Addr1,Addr2) ->
+ [Addr1,Port1] = string:tokens(Addr1,":"),
+ [Addr2,Port2] = string:tokens(Addr2,":"),
+ gen_server:cast(?MODULE, {conninfo, {masterMaster, {Addr1,Port1}, {Addr2,Port2}}}).
+replicaPairs(Addr1,Addr2) ->
+ [Addr1,Port1] = string:tokens(Addr1,":"),
+ [Addr2,Port2] = string:tokens(Addr2,":"),
+ gen_server:cast(?MODULE, {conninfo, {replicaPairs, {Addr1,Port1}, {Addr2,Port2}}}).
+
+exec_cursor(Col, Quer) ->
+ case gen_server:call(?MODULE, {getread}) of
+ false ->
+ false;
+ PID ->
+ PID ! {find, self(), Col, Quer},
+ receive
+ {query_result, _Src, <<_ReqID:32/little, _RespTo:32/little, 1:32/little, 0:32,
+ CursorID:64/little, _From:32/little, _NDocs:32/little, Result/binary>>} ->
+ % io:format("cursor ~p from ~p ndocs ~p, ressize ~p ~n", [_CursorID, _From, _NDocs, byte_size(Result)]),
+ % io:format("~p~n", [Result]),
+ case CursorID of
+ 0 ->
+ {done, Result};
+ _ ->
+ PID = spawn_link(fun() -> cursorcleanup(true) end),
+ PID ! {start, CursorID},
+ {#cursor{id = CursorID, limit = Quer#quer.ndocs, pid = PID}, Result}
+ end
+ after 1000 ->
+ false
+ end
+ end.
+exec_getmore(Col, C) ->
+ case gen_server:call(?MODULE, {getread}) of
+ false ->
+ false;
+ PID ->
+ PID ! {getmore, self(), Col, C},
+ receive
+ {query_result, _Src, <<_ReqID:32/little, _RespTo:32/little, 1:32/little, 0:32,
+ CursorID:64/little, _From:32/little, _NDocs:32/little, Result/binary>>} ->
+ % io:format("cursor ~p from ~p ndocs ~p, ressize ~p ~n", [_CursorID, _From, _NDocs, byte_size(Result)]),
+ % io:format("~p~n", [Result]),
+ case CursorID of
+ 0 ->
+ C#cursor.pid ! {stop},
+ {done, Result};
+ _ ->
+ {ok, Result}
+ end
+ after 1000 ->
+ false
+ end
+ end.
+exec_delete(Collection, D) ->
+ case gen_server:call(?MODULE, {getwrite}) of
+ false ->
+ false;
+ PID ->
+ PID ! {delete, Collection, D}
+ end,
+ ok.
+exec_find(Collection, Quer) ->
+ case gen_server:call(?MODULE, {getread}) of
+ false ->
+ false;
+ PID ->
+ PID ! {find, self(), Collection, Quer},
+ receive
+ {query_result, _Src, <<_ReqID:32/little, _RespTo:32/little, 1:32/little, 0:32,
+ _CursorID:64/little, _From:32/little, _NDocs:32/little, Result/binary>>} ->
+ % io:format("cursor ~p from ~p ndocs ~p, ressize ~p ~n", [_CursorID, _From, _NDocs, byte_size(Result)]),
+ % io:format("~p~n", [Result]),
+ Result
+ after 1000 ->
+ false
+ end
+ end.
+exec_insert(Collection, D) ->
+ case gen_server:call(?MODULE, {getwrite}) of
+ false ->
+ false;
+ PID ->
+ PID ! {insert, Collection, D}
+ end,
+ ok.
+exec_update(Collection, D) ->
+ case gen_server:call(?MODULE, {getwrite}) of
+ false ->
+ false;
+ PID ->
+ PID ! {update, Collection, D}
+ end,
+ ok.
+exec_cmd(DB, Cmd) ->
+ Quer = #quer{ndocs = 1, nskip = 0, quer = mongodb:encode(Cmd)},
+ case exec_find(<<DB/binary, ".$cmd">>, Quer) of
+ false ->
+ false;
+ Result ->
+ mongodb:decode(Result)
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% IMPLEMENTATION
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+% read = connection used for reading (find) from mongo server
+% write = connection used for writing (insert,update) to mongo server
+% single: same as replicaPairs (single server is always master and used for read and write)
+% masterSlave: read = slave, write = master
+% replicaPairs: read = write = master
+% masterMaster: read = master1, write = master2
+% timer is reconnect timer if some connection is missing
+% indexes is ensureIndex cache (an ets table).
+-record(mngd, {read, write, conninfo, indexes, timer}).
+-define(R2P(Record), rec2prop(Record, record_info(fields, mngd))).
+-define(P2R(Prop), prop2rec(Prop, mngd, #mngd{}, record_info(fields, mngd))).
+
+handle_call({getread}, _, P) ->
+ {reply, P#mngd.read, P};
+handle_call({getwrite}, _, P) ->
+ {reply, P#mngd.write, P};
+handle_call(stop, _, P) ->
+ {stop, shutdown, stopped, P};
+handle_call({reload_module}, _, P) ->
+ code:purge(?MODULE),
+ code:load_file(?MODULE),
+ {reply, ok, ?MODULE:deser_prop(?R2P(P))};
+handle_call(_, _, P) ->
+ {reply, ok, P}.
+
+deser_prop(P) ->
+ ?P2R(P).
+
+startcon(undefined, Type, Addr,Port) ->
+ PID = spawn_link(fun() -> connection(true) end),
+ PID ! {start, self(), Type, Addr, Port};
+startcon(PID, _, _, _) ->
+ PID.
+
+handle_cast({ensure_index, Bin}, P) ->
+ case ets:lookup(P#mngd.indexes, Bin) of
+ [] ->
+ spawn(fun() -> exec_insert(<<"system.indexes">>, #insert{documents = Bin}) end),
+ ets:insert(P#mngd.indexes, {Bin});
+ _ ->
+ true
+ end,
+ {noreply, P};
+handle_cast({clear_indexcache}, P) ->
+ ets:delete_all_objects(P#mngd.indexes),
+ {noreply, P};
+handle_cast({conninfo, Conn}, P) ->
+ {noreply, P#mngd{conninfo = Conn}};
+handle_cast({start_connection}, #mngd{conninfo = {masterMaster, {A1,P1},{A2,P2}}} = P) ->
+ case true of
+ _ when P#mngd.read /= P#mngd.write, P#mngd.read /= undefined, P#mngd.write /= undefined ->
+ Timer = ctimer(P#mngd.timer);
+ _ when P#mngd.read == P#mngd.write, P#mngd.read /= undefined ->
+ startcon(undefined, write, A2,P2),
+ Timer = P#mngd.timer;
+ _ ->
+ startcon(P#mngd.read, read, A1,P1),
+ startcon(P#mngd.write, write, A2,P2),
+ Timer = P#mngd.timer
+ % {noreply, P#mngd{read = startcon(P#mngd.read, A1,P1), write = startcon(P#mngd.write,A2,P2)}}
+ end,
+ {noreply, P#mngd{timer = Timer}};
+handle_cast({start_connection}, #mngd{conninfo = {masterSlave, {A1,P1},{A2,P2}}} = P) ->
+ case true of
+ % All ok.
+ _ when P#mngd.read /= P#mngd.write, P#mngd.read /= undefined, P#mngd.write /= undefined ->
+ Timer = ctimer(P#mngd.timer);
+ % Read = write = master, try to connect to slave again
+ _ when P#mngd.read == P#mngd.write, P#mngd.read /= undefined ->
+ startcon(undefined, read, A2,P2),
+ Timer = P#mngd.timer;
+ % One or both of the connections is down
+ _ ->
+ startcon(P#mngd.read, read, A2,P2),
+ startcon(P#mngd.write, write, A1,P1),
+ Timer = P#mngd.timer
+ end,
+ {noreply, P#mngd{timer = Timer}};
+handle_cast({start_connection}, #mngd{conninfo = {replicaPairs, {A1,P1},{A2,P2}}} = P) ->
+ case true of
+ _ when P#mngd.read /= undefined, P#mngd.write == P#mngd.read ->
+ {noreply, P#mngd{timer = ctimer(P#mngd.timer)}};
+ _ ->
+ startcon(undefined, ifmaster, A1,P1),
+ startcon(undefined, ifmaster, A2,P2),
+ {noreply, P}
+ end;
+handle_cast({print_info}, P) ->
+ io:format("~p~n", [?R2P(P)]),
+ {noreply, P};
+handle_cast(_, P) ->
+ {noreply, P}.
+
+ctimer(undefined) ->
+ undefined;
+ctimer(T) ->
+ timer:cancel(T),
+ undefined.
+
+timer(undefined) ->
+ {ok, Timer} = timer:send_interval(?RECONNECT_DELAY, {reconnect}),
+ Timer;
+timer(T) ->
+ T.
+
+handle_info({conn_established, read, ConnProc}, P) ->
+ {noreply, P#mngd{read = ConnProc}};
+handle_info({conn_established, write, ConnProc}, P) ->
+ {noreply, P#mngd{write = ConnProc}};
+handle_info({reconnect}, P) ->
+ handle_cast({start_connection}, P);
+handle_info({'EXIT', PID, _Reason}, #mngd{conninfo = {replicaPairs, _, _}} = P) ->
+ case true of
+ _ when P#mngd.read == PID ->
+ {noreply, P#mngd{read = undefined, write = undefined, timer = timer(P#mngd.timer)}};
+ _ ->
+ {noreply, P}
+ end;
+handle_info({'EXIT', PID, _Reason}, #mngd{conninfo = {masterSlave, _, _}} = P) ->
+ case true of
+ _ when P#mngd.read == PID, P#mngd.read /= P#mngd.write ->
+ {noreply, P#mngd{read = P#mngd.write, timer = timer(P#mngd.timer)}};
+ _ when P#mngd.read == PID ->
+ {noreply, P#mngd{read = undefined, write = undefined, timer = timer(P#mngd.timer)}};
+ _ when P#mngd.write == PID ->
+ {noreply, P#mngd{write = undefined, timer = timer(P#mngd.timer)}};
+ _ ->
+ {noreply, P}
+ end;
+handle_info({'EXIT', PID, _Reason}, #mngd{conninfo = {masterMaster, _, _}} = P) ->
+ case true of
+ _ when P#mngd.read == PID, P#mngd.write == PID ->
+ {noreply, P#mngd{read = undefined, write = undefined, timer = timer(P#mngd.timer)}};
+ _ when P#mngd.read == PID ->
+ {noreply, P#mngd{read = P#mngd.write, timer = timer(P#mngd.timer)}};
+ _ when P#mngd.write == PID ->
+ {noreply, P#mngd{write = P#mngd.read, timer = timer(P#mngd.timer)}};
+ _ ->
+ {noreply, P}
+ end;
+handle_info({query_result, Src, <<_:32/binary, Res/binary>>}, P) ->
+ try mongodb:decode(Res) of
+ [{<<"ismaster">>, 1}|_] when element(1,P#mngd.conninfo) == replicaPairs, P#mngd.read == undefined ->
+ link(Src),
+ {noreply, P#mngd{read = Src, write = Src}};
+ _ ->
+ Src ! {stop},
+ {noreply, P}
+ catch
+ error:_ ->
+ Src ! {stop},
+ {noreply, P}
+ end;
+handle_info({query_result, Src, _}, P) ->
+ Src ! {stop},
+ {noreply, P};
+handle_info(_X, P) ->
+ io:format("~p~n", [_X]),
+ {noreply, P}.
+
+terminate(_, _) ->
+ ok.
+code_change(_, P, _) ->
+ {ok, P}.
+init([]) ->
+ % timer:send_interval(1000, {timeout}),
+ process_flag(trap_exit, true),
+ {ok, #mngd{indexes = ets:new(mongoIndexes, [set, private])}}.
+
+% find_master([{A,P}|T]) ->
+% Q = #quer{ndocs = 1, nskip = 0, quer = mongodb:encode([{<<"ismaster">>, 1}])},
+%
+
+
+
+-record(ccd, {cursor = 0}).
+% Just for cleanup
+cursorcleanup(P) ->
+ receive
+ {stop} ->
+ true;
+ {cleanup} ->
+ case gen_server:call(?MODULE, {get_conn}) of
+ false ->
+ false;
+ PID ->
+ PID ! {killcursor, #killc{cur_ids = <<(P#ccd.cursor):64/little>>}},
+ true
+ end;
+ {'EXIT', _PID, _Why} ->
+ self() ! {cleanup},
+ cursorcleanup(P);
+ {start, Cursor} ->
+ process_flag(trap_exit, true),
+ cursorcleanup(#ccd{cursor = Cursor})
+ end.
+
+
+-record(con, {sock, source, buffer = <<>>, state = free}).
+% Waiting for request
+connection(true) ->
+ connection(#con{});
+connection(#con{state = free} = P) ->
+ receive
+ {find, Source, Collection, Query} ->
+ QBin = constr_query(Query, Collection),
+ ok = gen_tcp:send(P#con.sock, QBin),
+ connection(P#con{state = waiting, source = Source});
+ {insert, Collection, Doc} ->
+ Bin = constr_insert(Doc, Collection),
+ ok = gen_tcp:send(P#con.sock, Bin),
+ connection(P);
+ {update, Collection, Doc} ->
+ Bin = constr_update(Doc, Collection),
+ ok = gen_tcp:send(P#con.sock, Bin),
+ connection(P);
+ {delete, Col, D} ->
+ Bin = constr_delete(D, Col),
+ ok = gen_tcp:send(P#con.sock, Bin),
+ connection(P);
+ {getmore, Source, Col, C} ->
+ Bin = constr_getmore(C, Col),
+ ok = gen_tcp:send(P#con.sock, Bin),
+ connection(P#con{state = waiting, source = Source});
+ {killcursor, C} ->
+ Bin = constr_killcursors(C),
+ ok = gen_tcp:send(P#con.sock, Bin),
+ connection(P);
+ {tcp, _, _Bin} ->
+ connection(P);
+ {stop} ->
+ true;
+ {start, Source, Type, IP, Port} ->
+ {A1,A2,A3} = now(),
+ random:seed(A1, A2, A3),
+ {ok, Sock} = gen_tcp:connect(IP, Port, [binary, {packet, 0}, {active, true}, {keepalive, true}]),
+ case Type of
+ ifmaster ->
+ self() ! {find, Source, <<"admin.$cmd">>, #quer{nskip = 0, ndocs = 1, quer = mongodb:encode([{<<"ismaster">>, 1}])}};
+ _ ->
+ Source ! {conn_established, Type, self()}
+ end,
+ connection(#con{sock = Sock});
+ {tcp_closed, _} ->
+ exit(stop)
+ end;
+% waiting for response
+connection(P) ->
+ receive
+ {tcp, _, Bin} ->
+ <<Size:32/little, Packet/binary>> = <<(P#con.buffer)/binary, Bin/binary>>,
+ % io:format("Received size ~p~n", [Size]),
+ case Size of
+ _ when Size == byte_size(Packet) + 4 ->
+ P#con.source ! {query_result, self(), Packet},
+ connection(P#con{state = free, buffer = <<>>});
+ _ ->
+ connection(P#con{buffer = <<(P#con.buffer)/binary, Bin/binary>>})
+ end;
+ {stop} ->
+ true;
+ {tcp_closed, _} ->
+ exit(stop)
+ after 2000 ->
+ exit(stop)
+ end.
+
+constr_header(Len, ID, RespTo, OP) ->
+ <<(Len+16):32/little, ID:32/little, RespTo:32/little, OP:32/little>>.
+
+constr_update(U, Name) ->
+ Update = <<0:32, Name/binary, 0:8,
+ (U#update.upsert):32/little, (U#update.selector)/binary, (U#update.document)/binary>>,
+ Header = constr_header(byte_size(Update), random:uniform(4000000000), 0, ?OP_UPDATE),
+ <<Header/binary, Update/binary>>.
+
+constr_insert(U, Name) ->
+ Insert = <<0:32, Name/binary, 0:8, (U#insert.documents)/binary>>,
+ Header = constr_header(byte_size(Insert), random:uniform(4000000000), 0, ?OP_INSERT),
+ <<Header/binary, Insert/binary>>.
+
+constr_query(U, Name) ->
+ Query = <<(U#quer.opts):32/little, Name/binary, 0:8, (U#quer.nskip):32/little, (U#quer.ndocs):32/little,
+ (U#quer.quer)/binary, (U#quer.field_selector)/binary>>,
+ Header = constr_header(byte_size(Query), random:uniform(4000000000), 0, ?OP_QUERY),
+ <<Header/binary,Query/binary>>.
+
+constr_getmore(U, Name) ->
+ GetMore = <<0:32, Name/binary, 0:8, (U#cursor.limit):32/little, (U#cursor.id):62/little>>,
+ Header = constr_header(byte_size(GetMore), random:uniform(4000000000), 0, ?OP_GET_MORE),
+ <<Header/binary, GetMore/binary>>.
+
+constr_delete(U, Name) ->
+ Delete = <<0:32, Name/binary, 0:8, 0:32, (U#delete.selector)/binary>>,
+ Header = constr_header(byte_size(Delete), random:uniform(4000000000), 0, ?OP_DELETE),
+ <<Header/binary, Delete/binary>>.
+
+constr_killcursors(U) ->
+ Kill = <<0:32, (byte_size(U#killc.cur_ids) div 8):32, (U#killc.cur_ids)/binary>>,
+ Header = constr_header(byte_size(Kill), random:uniform(4000000000), 0, ?OP_KILL_CURSORS),
+ <<Header/binary, Kill/binary>>.
+
+
+
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%
+% BSON encoding/decoding
+% most of it taken and modified from the mongo-erlang-driver project by Elias Torres
+%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+encoderec(Rec) ->
+ [_|Fields] = element(element(2, Rec), ?RECTABLE),
+ encoderec(Rec, Fields, 3, <<>>).
+encoderec(Rec, [FieldName|T], N, Bin) ->
+ case element(N, Rec) of
+ undefined ->
+ encoderec(Rec, T, N+1, Bin);
+ Val ->
+ case FieldName of
+ docid ->
+ encoderec(Rec, T, N+1, <<Bin/binary, (encode_element({<<"_id">>, Val}))/binary>>);
+ _ ->
+ encoderec(Rec, T, N+1, <<Bin/binary, (encode_element({atom_to_binary(FieldName, latin1), Val}))/binary>>)
+ end
+ end;
+encoderec(_, [], _, Bin) ->
+ <<(byte_size(Bin)+5):32/little, Bin/binary, 0:8>>.
+
+encoderec_selector(_, undefined) ->
+ <<>>;
+encoderec_selector(Rec, SelectorList) ->
+ [_|Fields] = element(element(2, Rec), ?RECTABLE),
+ encoderec_selector(SelectorList, Fields, 3, <<>>).
+
+% SelectorList is either a list of indexes in the record tuple, or a list of {TupleIndex, TupleVal}. Use the index to get the name
+% from the list of names.
+encoderec_selector([{FieldIndex, Val}|Fields], [FieldName|FieldNames], FieldIndex, Bin) ->
+ case FieldName of
+ docid ->
+ encoderec_selector(Fields, FieldNames, FieldIndex+1, <<Bin/binary, (encode_element({<<"_id">>, Val}))/binary>>);
+ _ ->
+ encoderec_selector(Fields, FieldNames, FieldIndex+1, <<Bin/binary, (encode_element({atom_to_binary(FieldName,latin1), Val}))/binary>>)
+ end;
+encoderec_selector([FieldIndex|Fields], [FieldName|FieldNames], FieldIndex, Bin) ->
+ case FieldName of
+ docid ->
+ encoderec_selector(Fields, FieldNames, FieldIndex+1, <<Bin/binary, (encode_element({<<"_id">>, 1}))/binary>>);
+ _ ->
+ encoderec_selector(Fields, FieldNames, FieldIndex+1, <<Bin/binary, (encode_element({atom_to_binary(FieldName,latin1), 1}))/binary>>)
+ end;
+encoderec_selector(Indexes, [_|Names], Index, Bin) ->
+ encoderec_selector(Indexes, Names, Index+1, Bin);
+encoderec_selector([], _, _, Bin) ->
+ <<(byte_size(Bin)+5):32/little, Bin/binary, 0:8>>.
+
+gen_keyname(Rec, Keys) ->
+ [_|Fields] = element(element(2, Rec), ?RECTABLE),
+ gen_keyname(Keys, Fields, 3, <<>>).
+
+gen_keyname([{KeyIndex, KeyVal}|Keys], [FieldName|Fields], KeyIndex, Name) ->
+ case is_integer(KeyVal) of
+ true ->
+ Add = <<(list_to_binary(integer_to_list(KeyVal)))/binary>>;
+ false ->
+ Add = <<>>
+ end,
+ gen_keyname(Keys, Fields, KeyIndex+1, <<Name/binary, "_", (atom_to_binary(FieldName, latin1))/binary, "_", Add/binary>>);
+gen_keyname([], _, _, <<"_", Name/binary>>) ->
+ Name;
+gen_keyname(Keys, [_|Fields], KeyIndex, Name) ->
+ gen_keyname(Keys, Fields, KeyIndex+1, Name).
+
+
+decoderec(Rec, <<>>) ->
+ % Rec;
+ erlang:make_tuple(tuple_size(Rec), undefined, [{1, element(1,Rec)}, {2, element(2,Rec)}]);
+decoderec(Rec, Bin) ->
+ [_|Fields] = element(element(2, Rec), ?RECTABLE),
+ decode_records([], Bin, tuple_size(Rec), element(1,Rec), element(2, Rec), Fields).
+
+decode_records(RecList, <<_ObjSize:32/little, Bin/binary>>, TupleSize, Name, TabIndex, Fields) ->
+ {FieldList, Remain} = rec_field_list([{1, Name},{2, TabIndex}], 3, Fields, Bin),
+ % io:format("~p~n", [FieldList]),
+ NewRec = erlang:make_tuple(TupleSize, undefined, FieldList),
+ decode_records([NewRec|RecList], Remain, TupleSize, Name, TabIndex, Fields);
+decode_records(R, <<>>, _, _, _, _) ->
+ lists:reverse(R).
+
+rec_field_list(RecVals, _, _, <<0:8, Rem/binary>>) ->
+ {RecVals, Rem};
+rec_field_list(RecVals, N, [Field|Fields], <<Type:8, Bin/binary>>) ->
+ {Name, ValRem} = decode_cstring(Bin, <<>>),
+ % io:format("~p ~p~n", [Name, Field]),
+ {Value, Remain} = decode_value(Type, ValRem),
+ case Field of
+ docid ->
+ FieldBinName = <<"_id">>;
+ _ ->
+ FieldBinName = atom_to_binary(Field, latin1)
+ end,
+ case FieldBinName of
+ Name ->
+ rec_field_list([{N, Value}|RecVals], N+1, Fields, Remain);
+ _ ->
+ rec_field_list(RecVals, N+1, Fields, <<Type:8, Bin/binary>>)
+ end.
+
+
+
+% bin_to_hexstr(Bin) ->
+% lists:flatten([io_lib:format("~2.16.0B", [X]) || X <- binary_to_list(Bin)]).
+%
+% hexstr_to_bin(S) ->
+% hexstr_to_bin(S, []).
+% hexstr_to_bin([], Acc) ->
+% list_to_binary(lists:reverse(Acc));
+% hexstr_to_bin([X,Y|T], Acc) ->
+% {ok, [V], []} = io_lib:fread("~16u", [X,Y]),
+% hexstr_to_bin(T, [V | Acc]).
+
+encode(undefined) ->
+ <<>>;
+encode(Items) ->
+ Bin = lists:foldl(fun(Item, B) -> <<B/binary, (encode_element(Item))/binary>> end, <<>>, Items),
+ <<(byte_size(Bin)+5):32/little-signed, Bin/binary, 0:8>>.
+
+encode_element({[_|_] = Name, Val}) ->
+ encode_element({list_to_binary(Name),Val});
+encode_element({Name, [{_,_}|_] = Items}) ->
+ Binary = encode(Items),
+ <<3, Name/binary, 0, Binary/binary>>;
+encode_element({Name, [_|_] = Value}) ->
+ ValueEncoded = encode_cstring(Value),
+ <<2, Name/binary, 0, (byte_size(ValueEncoded)):32/little-signed, ValueEncoded/binary>>;
+encode_element({Name, <<_/binary>> = Value}) ->
+ ValueEncoded = encode_cstring(Value),
+ <<2, Name/binary, 0, (byte_size(ValueEncoded)):32/little-signed, ValueEncoded/binary>>;
+encode_element({plaintext, Name, Val}) -> % exists for performance reasons.
+ <<2, Name/binary, 0, (byte_size(Val)+1):32/little-signed, Val/binary, 0>>;
+encode_element({Name, true}) ->
+ <<8, Name/binary, 0, 1:8>>;
+encode_element({Name, false}) ->
+ <<8, Name/binary, 0, 0:8>>;
+encode_element({Name, {list, []}}) ->
+ <<4, Name/binary, 0, 5:32/little-signed, 0:8>>;
+encode_element({Name, {list, Items}}) ->
+ ItemNames = [integer_to_list(Index) || Index <- lists:seq(0, length(Items)-1)],
+ ItemList = lists:zip(ItemNames, Items),
+ Binary = encode(ItemList),
+ <<4, Name/binary, 0, Binary/binary>>;
+encode_element({Name, {binary, 2, Data}}) ->
+ <<5, Name/binary, 0, (size(Data)+4):32/little-signed, 2:8, (size(Data)):32/little-signed, Data/binary>>;
+encode_element({Name, {binary, SubType, Data}}) ->
+ StringEncoded = encode_cstring(Name),
+ <<5, StringEncoded/binary, (size(Data)):32/little-signed, SubType:8, Data/binary>>;
+encode_element({Name, {oid, <<First:8/little-binary-unit:8, Second:4/little-binary-unit:8>>}}) ->
+ FirstReversed = lists:reverse(binary_to_list(First)),
+ SecondReversed = lists:reverse(binary_to_list(Second)),
+ OID = list_to_binary(lists:append(FirstReversed, SecondReversed)),
+ <<7, Name/binary, 0, OID/binary>>;
+encode_element({Name, Value}) when is_integer(Value) ->
+ <<18, Name/binary, 0, Value:64/little-signed>>;
+encode_element({Name, Value}) when is_float(Value) ->
+ <<1, (Name)/binary, 0, Value:64/little-signed-float>>;
+encode_element({Name, {bson, Bin}}) ->
+ <<3, Name/binary, 0, Bin/binary>>;
+encode_element({Name, {obj, []}}) ->
+ <<3, Name/binary, 0, (encode([]))/binary>>;
+encode_element({Name, {MegaSecs, Secs, MicroSecs}}) when is_integer(MegaSecs),is_integer(Secs),is_integer(MicroSecs) ->
+ Unix = MegaSecs * 1000000 + Secs,
+ Millis = Unix * 1000 + trunc(MicroSecs / 1000),
+ <<9, Name/binary, 0, Millis:64/little-signed>>;
+encode_element({Name, null}) ->
+ <<10, Name/binary>>;
+encode_element({Name, {regex, Expression, Flags}}) ->
+ ExpressionEncoded = encode_cstring(Expression),
+ FlagsEncoded = encode_cstring(Flags),
+ <<11, Name/binary, 0, ExpressionEncoded/binary, FlagsEncoded/binary>>;
+encode_element({Name, {ref, Collection, <<First:8/little-binary-unit:8, Second:4/little-binary-unit:8>>}}) ->
+ CollectionEncoded = encode_cstring(Collection),
+ FirstReversed = lists:reverse(binary_to_list(First)),
+ SecondReversed = lists:reverse(binary_to_list(Second)),
+ OID = list_to_binary(lists:append(FirstReversed, SecondReversed)),
+ <<12, Name/binary, 0, (byte_size(CollectionEncoded)):32/little-signed, CollectionEncoded/binary, OID/binary>>;
+encode_element({Name, {code, Code}}) ->
+ CodeEncoded = encode_cstring(Code),
+ <<13, Name/binary, 0, (byte_size(CodeEncoded)):32/little-signed, CodeEncoded/binary>>.
+
+encode_cstring(String) ->
+ <<(unicode:characters_to_binary(String))/binary, 0:8>>.
+
+%% Size has to be greater than 4
+decode(<<Size:32/little-signed, Rest/binary>> = Binary) when byte_size(Binary) >= Size, Size > 4 ->
+ decode(Rest, Size-4);
+
+decode(_BadLength) ->
+ throw({invalid_length}).
+
+decode(Binary, _Size) ->
+ case decode_next(Binary, []) of
+ {BSON, <<>>} ->
+ BSON;
+ {BSON, Rest} ->
+ [BSON | decode(Rest)]
+ end.
+
+decode_next(<<>>, Accum) ->
+ {lists:reverse(Accum), <<>>};
+decode_next(<<0:8, Rest/binary>>, Accum) ->
+ {lists:reverse(Accum), Rest};
+decode_next(<<Type:8/little, Rest/binary>>, Accum) ->
+ {Name, EncodedValue} = decode_cstring(Rest, <<>>),
+% io:format("Decoding ~p~n", [Type]),
+ {Value, Next} = decode_value(Type, EncodedValue),
+ decode_next(Next, [{Name, Value}|Accum]).
+
+decode_cstring(<<>> = _Binary, _Accum) ->
+ throw({invalid_cstring});
+decode_cstring(<<0:8, Rest/binary>>, Acc) ->
+ {Acc, Rest};
+decode_cstring(<<C:8, Rest/binary>>, Acc) ->
+ decode_cstring(Rest, <<Acc/binary, C:8>>).
+% decode_cstring(<<0:8,Rest/binary>>, Acc) ->
+% {lists:reverse(Acc),Rest};
+% decode_cstring(<<C/utf8,Rest/binary>>, Acc) ->
+% decode_cstring(Rest, [C|Acc]).
+
+decode_value(_Type = 1, <<Double:64/little-signed-float, Rest/binary>>) ->
+ {Double, Rest};
+decode_value(_Type = 2, <<Size:32/little-signed, Rest/binary>>) ->
+ StringSize = Size-1,
+ <<String:StringSize/binary, 0:8, Remain/binary>> = Rest,
+ {String, Remain};
+ % {String, RestNext} = decode_cstring(Rest, <<>>),
+ % ActualSize = byte_size(Rest) - byte_size(RestNext),
+ % case ActualSize =:= Size of
+ % false ->
+ % % ?debugFmt("* ~p =:= ~p -> false", [ActualSize, Size]),
+ % throw({invalid_length, expected, Size, ActualSize});
+ % true ->
+ % {String, RestNext}
+ % end;
+decode_value(_Type = 3, <<Size:32/little-signed, Rest/binary>> = Binary) when byte_size(Binary) >= Size ->
+ decode_next(Rest, []);
+decode_value(_Type = 4, <<Size:32/little-signed, Data/binary>> = Binary) when byte_size(Binary) >= Size ->
+ {Array, Rest} = decode_next(Data, []),
+ {{list,[Value || {_Key, Value} <- Array]}, Rest};
+decode_value(_Type = 5, <<_Size:32/little-signed, 2:8/little, BinSize:32/little-signed, BinData:BinSize/binary-little-unit:8, Rest/binary>>) ->
+ {{binary, 2, BinData}, Rest};
+decode_value(_Type = 5, <<Size:32/little-signed, SubType:8/little, BinData:Size/binary-little-unit:8, Rest/binary>>) ->
+ {{binary, SubType, BinData}, Rest};
+decode_value(_Type = 6, _Binary) ->
+ throw(encountered_undefined);
+decode_value(_Type = 7, <<First:8/little-binary-unit:8, Second:4/little-binary-unit:8, Rest/binary>>) ->
+ FirstReversed = lists:reverse(binary_to_list(First)),
+ SecondReversed = lists:reverse(binary_to_list(Second)),
+ OID = list_to_binary(lists:append(FirstReversed, SecondReversed)),
+ {{oid, OID}, Rest};
+decode_value(_Type = 8, <<0:8, Rest/binary>>) ->
+ {false, Rest};
+decode_value(_Type = 8, <<1:8, Rest/binary>>) ->
+ {true, Rest};
+decode_value(_Type = 9, <<Millis:64/little-signed, Rest/binary>>) ->
+ UnixTime = trunc(Millis / 1000),
+ MegaSecs = trunc(UnixTime / 1000000),
+ Secs = UnixTime - (MegaSecs * 1000000),
+ MicroSecs = (Millis - (UnixTime * 1000)) * 1000,
+ {{MegaSecs, Secs, MicroSecs}, Rest};
+decode_value(_Type = 10, Binary) ->
+ {null, Binary};
+decode_value(_Type = 11, Binary) ->
+ {Expression, RestWithFlags} = decode_cstring(Binary, <<>>),
+ {Flags, Rest} = decode_cstring(RestWithFlags, <<>>),
+ {{regex, Expression, Flags}, Rest};
+decode_value(_Type = 12, <<Size:32/little-signed, Data/binary>> = Binary) when size(Binary) >= Size ->
+ {NS, RestWithOID} = decode_cstring(Data, <<>>),
+ {{oid, OID}, Rest} = decode_value(7, RestWithOID),
+ {{ref, NS, OID}, Rest};
+decode_value(_Type = 13, <<_Size:32/little-signed, Data/binary>>) ->
+ {Code, Rest} = decode_cstring(Data, <<>>),
+ {{code, Code}, Rest};
+decode_value(_Type = 14, _Binary) ->
+ throw(encountered_ommitted);
+decode_value(_Type = 15, _Binary) ->
+ throw(encountered_ommitted);
+decode_value(_Type = 16, <<Integer:32/little-signed, Rest/binary>>) ->
+ {Integer, Rest};
+decode_value(_Type = 18, <<Integer:64/little-signed, Rest/binary>>) ->
+ {Integer, Rest};
+decode_value(_Type = 18, <<Integer:32/little-signed, Rest/binary>>) ->
+ {Integer, Rest}.
19 mongodb_supervisor.erl
@@ -0,0 +1,19 @@
+-module(mongodb_supervisor).
+-behavior(supervisor).
+-export([start_link/0, init/1]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+ {ok, {{one_for_one, 10, 1},
+ [
+ {mongodb,
+ {mongodb, start, []},
+ permanent,
+ 100,
+ worker,
+ [mongodb]}
+ ]
+ }}.

0 comments on commit b446d3a

Please sign in to comment.
Something went wrong with that request. Please try again.