Skip to content

Commit

Permalink
rewrite MongoCursor to a more extensible approach
Browse files Browse the repository at this point in the history
fix vibe-d#2036, fix vibe-d#1718

cursors now only need the DocType as template argument, the find specific arguments are now in the constructor. Aggregate now uses Cursors and find was altered to the new API. There is a deprecated compatibility alias which will work in most cases (except if only Q was given and differs from what R should be, which shouldn't even be done anyway)

The new public API for aggregate is a lot more like the shell now, but still supports the old way
  • Loading branch information
WebFreak001 committed Jan 14, 2018
1 parent d8eadf2 commit 8d9ab53
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 68 deletions.
164 changes: 147 additions & 17 deletions mongodb/vibe/db/mongo/collection.d
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import std.array;
import std.conv;
import std.exception;
import std.string;
import std.typecons : Tuple, tuple;
import std.typecons : Tuple, tuple, Nullable;


/**
Expand Down Expand Up @@ -113,10 +113,10 @@ struct MongoCollection {
See_Also: $(LINK http://www.mongodb.org/display/DOCS/Querying)
*/
MongoCursor!(T, R, U) find(R = Bson, T, U)(T query, U returnFieldSelector, QueryFlags flags = QueryFlags.None, int num_skip = 0, int num_docs_per_chunk = 0)
MongoCursor!R find(R = Bson, T, U)(T query, U returnFieldSelector, QueryFlags flags = QueryFlags.None, int num_skip = 0, int num_docs_per_chunk = 0)
{
assert(m_client !is null, "Querying uninitialized MongoCollection.");
return MongoCursor!(T, R, U)(m_client, m_fullPath, flags, num_skip, num_docs_per_chunk, query, returnFieldSelector);
return MongoCursor!R(m_client, m_fullPath, flags, num_skip, num_docs_per_chunk, query, returnFieldSelector);
}

/// ditto
Expand Down Expand Up @@ -303,25 +303,43 @@ struct MongoCollection {
*/
Bson aggregate(ARGS...)(ARGS pipeline)
{
import std.traits;
import std.traits : isArray;

static if (ARGS.length == 1 && isArray!(ARGS[0]))
alias Pipeline = ARGS[0];
else static struct Pipeline { ARGS args; }
auto convPipeline = pipeline;
else {
static struct Pipeline { @asArray ARGS pipeline; }

static struct CMD {
string aggregate;
@asArray Pipeline pipeline;
Bson[] convPipeline = serializeToBson(Pipeline(pipeline))["pipeline"].get!(Bson[]);
}

CMD cmd;
cmd.aggregate = m_name;
static if (ARGS.length == 1 && isArray!(ARGS[0]))
cmd.pipeline = pipeline[0];
else cmd.pipeline.args = pipeline;
return aggregate(convPipeline, AggregateOptions.init).array.serializeToBson;
}

/// ditto
MongoCursor!R aggregate(R = Bson, S = Bson)(S[] pipeline, AggregateOptions options)
{
assert(m_client !is null, "Querying uninitialized MongoCollection.");

Bson cmd = Bson.emptyObject;
foreach (member; __traits(allMembers, AggregateOptions)) {
alias T = typeof(mixin("options." ~ member));
static if (is(T : Nullable!U, U)) {
if (!mixin("options." ~ member).isNull)
cmd[member] = mixin("options." ~ member).serializeToBson;
}
}
cmd["aggregate"] = Bson(m_name);
cmd["pipeline"] = serializeToBson(pipeline);
cmd["cursor"] = serializeToBson(options.cursor);
auto ret = database.runCommand(cmd);
enforce(ret["ok"].get!double == 1, "Aggregate command failed: "~ret["errmsg"].opt!string);
return ret["result"];
R[] existing;
static if (is(R == Bson))
existing = ret["cursor"]["firstBatch"].get!(Bson[]);
else
existing = ret["cursor"]["firstBatch"].deserializeBson!(R[]);
return MongoCursor!R(m_client, ret["cursor"]["ns"].get!string, ret["cursor"]["id"].get!long, existing);
}

/// Example taken from the MongoDB documentation
Expand All @@ -338,7 +356,7 @@ struct MongoCollection {
}
}

/// The same example, but using an array of arguments
/// The same example, but using an array of arguments with custom options
unittest {
import vibe.db.mongo.mongo;

Expand All @@ -351,7 +369,9 @@ struct MongoCollection {
"total": Bson(["$sum": Bson("$amount")])]]);
args ~= serializeToBson(["$sort": ["total": -1]]);

auto results = db["coll"].aggregate(args);
AggregateOptions options;
options.cursor.batchSize = 10; // prefetch the first 10 results
auto results = db["coll"].aggregate(args, options);
}
}

Expand Down Expand Up @@ -556,3 +576,113 @@ unittest {
logInfo("User: %s", qusr.loginName);
}
}

/**
Specifies a level of isolation for read operations. For example, you can use read concern to only read data that has propagated to a majority of nodes in a replica set.
See_Also: $(LINK https://docs.mongodb.com/manual/reference/read-concern/)
*/
struct ReadConcern {
///
enum Level : string {
/// This is the default read concern level.
local = "local",
/// This is the default for reads against secondaries when afterClusterTime and "level" are unspecified. The query returns the the instance’s most recent data.
available = "available",
/// Available for replica sets that use WiredTiger storage engine.
majority = "majority",
/// Available for read operations on the primary only.
linearizable = "linearizable"
}

///
Level level;
}

/**
Collation allows users to specify language-specific rules for string comparison, such as rules for lettercase and accent marks.
See_Also: $(LINK https://docs.mongodb.com/manual/reference/collation/)
*/
struct Collation {
///
enum Alternate : string {
/// Whitespace and punctuation are considered base characters
nonIgnorable = "non-ignorable",
/// Whitespace and punctuation are not considered base characters and are only distinguished at strength levels greater than 3
shifted = "shifted",
}

///
enum MaxVariable : string {
/// Both whitespaces and punctuation are “ignorable”, i.e. not considered base characters.
punct = "punct",
/// Whitespace are “ignorable”, i.e. not considered base characters.
space = "space"
}

/**
The ICU locale
See_Also: See_Also: $(LINK https://docs.mongodb.com/manual/reference/collation-locales-defaults/#collation-languages-locales) for a list of supported locales.
To specify simple binary comparison, specify locale value of "simple".
*/
string locale;
/// The level of comparison to perform. Corresponds to ICU Comparison Levels.
int strength;
/// Flag that determines whether to include case comparison at strength level 1 or 2.
bool caseLevel;
/// A flag that determines sort order of case differences during tertiary level comparisons.
string caseFirst;
/// Flag that determines whether to compare numeric strings as numbers or as strings.
bool numericOrdering;
/// Field that determines whether collation should consider whitespace and punctuation as base characters for purposes of comparison.
Alternate alternate;
/// Field that determines up to which characters are considered ignorable when `alternate: "shifted"`. Has no effect if `alternate: "non-ignorable"`
MaxVariable maxVariable;
/**
Flag that determines whether strings with diacritics sort from back of the string, such as with some French dictionary ordering.
If `true` compare from back to front, otherwise front to back.
*/
bool backwards;
/// Flag that determines whether to check if text require normalization and to perform normalization. Generally, majority of text does not require this normalization processing.
bool normalization;
}

///
struct CursorInitArguments {
/// Specifies the initial batch size for the cursor.
int batchSize;
}

/**
Represents available options for an aggregate call
See_Also: $(LINK https://docs.mongodb.com/manual/reference/method/db.collection.aggregate/)
*/
struct AggregateOptions {
/// Specifies the initial batch size for the cursor.
CursorInitArguments cursor;
/// Specifies to return the information on the processing of the pipeline.
Nullable!bool explain;
/// Enables writing to temporary files. When set to true, aggregation operations can write data to the _tmp subdirectory in the dbPath directory.
Nullable!bool allowDiskUse;
/// Specifies a time limit in milliseconds for processing operations on a cursor. If you do not specify a value for maxTimeMS, operations will not time out.
Nullable!uint maxTimeMS;
/// Available only if you specify the $out aggregation operator.
Nullable!bool bypassDocumentValidation;
/// Specifies the read concern.
Nullable!ReadConcern readConcern;
///
Nullable!Collation collation;
/**
The index to use for the aggregation. The index is on the initial collection/view against which the aggregation is run.
Specify the index either by the index name or by the index specification document.
*/
Nullable!Bson hint;
/// Users can specify an arbitrary string to help trace the operation through the database profiler, currentOp, and logs.
Nullable!string comment;
}
Loading

0 comments on commit 8d9ab53

Please sign in to comment.