Skip to content
Browse files

Laying out Writes, support added for batch insert

  • Loading branch information...
1 parent a6f1127 commit 03ffdb486bf06fdc5be3239e716bdc5a7f8d65f1 @bwmcadams bwmcadams committed May 1, 2011
View
8 bson-driver/src/main/scala/BSONSerializer.scala
@@ -289,6 +289,14 @@ trait BSONSerializer extends BSONEncoder with Logging {
// TODO - Implement me! (Really on Mongo side of the wall) (DBrefs and the like...)
def putSpecial(name: String, o: SerializableBSONObject): Boolean = false
def handleSpecialObjects(name: String, o: SerializableBSONObject): Boolean = false
+
+ /** Size of the Buffer */
+ def size = _buf.size()
+
+ def seek(bytes: Int) = {
+ val pos = _buf.getPosition + bytes
+ log.debug("Seeking to %d", pos)
+ }
}
class DefaultBSONSerializer extends BSONSerializer
View
9 mongo-driver/src/main/scala/DB.scala
@@ -118,6 +118,7 @@ class DB(val name: String)(implicit val connection: MongoConnection) extends Log
callback(b.result())
}))
}
+
/**
* Creates a collection with a given name and options.
* If the collection does not exist, a new collection is created.
@@ -135,7 +136,7 @@ class DB(val name: String)(implicit val connection: MongoConnection) extends Log
* The callback will be invoked, when the collection is created, with an instance of the new collection.
*/
def createCollection(name: String, options: BSONDocument)(callback: Collection => Unit) = {
-
+ // TODO - Implement me
}
/**
@@ -181,6 +182,12 @@ class DB(val name: String)(implicit val connection: MongoConnection) extends Log
connection.findOne(name)(collection)(query, fields)(callback)
}
+ def findOneByID[A <: AnyRef](collection: String)(id: A)(callback: SingleDocQueryRequestFuture) {
+ connection.findOneByID(name)(collection)(id)(callback)
+ }
+
+
+
/**
* invokes the 'dbStats' command
*/
View
21 mongo-driver/src/main/scala/MongoConnection.scala
@@ -170,6 +170,27 @@ abstract class MongoConnection extends Logging {
findOne(db)(collection)(Document("_id" -> id))(callback)
+ // TODO - Support disabling add ID?
+ // TODO - Generate ID + Capture generated ID for callback
+ def insert[A >: Nothing <: Any](db: String)(collection: String)(docs: A*)
+ (callback: WriteRequestFuture)(implicit concern: WriteConcern = _writeConcern, evidence: (A) => BSONDocument) {
+ log.debug("Inserting: %s to %s.%s with WriteConcern: %s", docs, db, collection, concern)
+ // TODO - Check for invalid keys
+ // TODO - ID Gen ... DBApiLayer:221
+ send(InsertMessage(db + "." + collection, docs.map(evidence): _*), callback)
+ }
+
+ def update[A >: Nothing <: Any, B >: Nothing <: Any](db: String)(collection: String)(query: A, update: B, upsert: Boolean = false, multi: Boolean = false)
+ (callback: WriteRequestFuture)(implicit concern: WriteConcern = _writeConcern, evidenceA: (A) => BSONDocument, evidenceB: (B) => BSONDocument) {
+ // TODO - Check for invalid keys
+ send(UpdateMessage(db + "." + collection, query, update, upsert, multi), callback)
+ }
+
+ def remove[A >: Nothing <: Any](db: String)(collection: String)(obj: A, removeSingle: Boolean = false)
+ (callback: WriteRequestFuture)(implicit concern: WriteConcern = _writeConcern, evidence: (A) => BSONDocument) {
+ send(DeleteMessage(db + "." + collection, obj, removeSingle), callback)
+ }
+
val handler: MongoConnectionHandler
def connected_? = _connected.get
View
2 mongo-driver/src/main/scala/wire/DeleteMessage.scala
@@ -47,7 +47,7 @@ trait DeleteMessage extends MongoClientWriteMessage {
val query: BSONDocument // Query object for what to delete
- protected def writeMessage(enc: BSONSerializer) {
+ protected def writeMessage(enc: BSONSerializer)(implicit maxBSON: Int) {
enc.writeInt(ZERO)
enc.writeCString(namespace)
enc.writeInt(flags)
View
2 mongo-driver/src/main/scala/wire/GetMoreMessage.scala
@@ -37,7 +37,7 @@ trait GetMoreMessage extends MongoClientMessage {
val numberToReturn: Int // number of docs to return in first OP_REPLY batch
val cursorID: Long // CursorID from the OP_REPLY (DB Genned value)
- protected def writeMessage(enc: BSONSerializer) {
+ protected def writeMessage(enc: BSONSerializer)(implicit maxBSON: Int) {
enc.writeInt(ZERO)
enc.writeCString(namespace)
enc.writeInt(numberToReturn)
View
24 mongo-driver/src/main/scala/wire/InsertMessage.scala
@@ -20,6 +20,7 @@ package wire
import org.bson.BSONSerializer
import org.bson.util.Logging
import org.bson.collection.{Document , BSONDocument}
+import scala.collection.mutable.Queue
/**
* OP_INSERT Message
@@ -39,17 +40,32 @@ trait InsertMessage extends MongoClientWriteMessage {
val namespace: String // Full collection name (dbname.collectionname)
val documents: Seq[BSONDocument] // One or more documents to insert into the collection
- protected def writeMessage(enc: BSONSerializer) {
+ protected def writeMessage(enc: BSONSerializer)(implicit maxBSON: Int) {
enc.writeInt(ZERO)
enc.writeCString(namespace)
- // TODO - Limit batch insert size which should be 4 * MaxBSON
- for (doc <- documents) enc.putObject(doc)
+ /**
+ * The limit for batch insert is 4 x MaxBSON
+ */
+ log.debug("Docs Length: %s", documents.length)
+ val q = Queue(documents: _*)
+ for (doc <- q) {
+ val total = enc.size
+ val n = enc.putObject(doc)
+ log.debug("Total: %d, Last Doc Size: %d", total, n)
+ // If we went over the size, backtrack and start a new message
+ if (total >= (4 * maxBSON)) {
+ log.info("Exceeded MaxBSON, kicking in a new batch.")
+ enc.seek(-n)
+ /* TODO - This recursion may be bad and wonky... */
+ InsertMessage(namespace, (doc +: q): _*).build(enc)
+ }
+ }
}
}
object InsertMessage extends Logging {
- def apply(ns: String, docs: Seq[Document]) = new InsertMessage {
+ def apply(ns: String, docs: BSONDocument*) = new InsertMessage {
val namespace = ns
val documents = docs
}
View
2 mongo-driver/src/main/scala/wire/KillCursorsMessage.scala
@@ -37,7 +37,7 @@ trait KillCursorsMessage extends MongoClientMessage {
val numCursors: Int // The number of cursorIDs in the message
val cursorIDs: Seq[Long] // Sequence of cursorIDs to close
- protected def writeMessage(enc: BSONSerializer) {
+ protected def writeMessage(enc: BSONSerializer)(implicit maxBSON: Int) {
enc.writeInt(ZERO)
enc.writeInt(numCursors)
for (_id <- cursorIDs) enc.writeLong(_id)
View
2 mongo-driver/src/main/scala/wire/QueryMessage.scala
@@ -79,7 +79,7 @@ trait QueryMessage extends MongoClientWriteMessage {
val query: BSONDocument // BSON Document representing the query
val returnFields: Option[BSONDocument] = None // Optional BSON Document for fields to return
- protected def writeMessage(enc: BSONSerializer) {
+ protected def writeMessage(enc: BSONSerializer)(implicit maxBSON: Int) {
enc.writeInt(flags)
enc.writeCString(namespace)
enc.writeInt(numberToSkip)
View
2 mongo-driver/src/main/scala/wire/ReplyMessage.scala
@@ -44,7 +44,7 @@ trait ReplyMessage extends MongoServerMessage {
val numReturned: Int // Number of documents in the reply.
val documents: Seq[BSONDocument] // Sequence of documents
- protected def writeMessage(enc: BSONSerializer) =
+ protected def writeMessage(enc: BSONSerializer)(implicit maxBSON: Int) =
throw new UnsupportedOperationException("This message is not capable of being written. "
+ "Replies come only from the server.")
View
2 mongo-driver/src/main/scala/wire/UpdateMessage.scala
@@ -51,7 +51,7 @@ trait UpdateMessage extends MongoClientWriteMessage {
val query: BSONDocument // The query document to select from mongo
val update: BSONDocument // The document specifying the update to perform
- protected def writeMessage(enc: BSONSerializer) {
+ protected def writeMessage(enc: BSONSerializer)(implicit maxBSON: Int) {
enc.writeInt(ZERO)
enc.writeCString(namespace)
enc.writeInt(flags)
View
19 mongo-driver/src/main/scala/wire/WireProtocol.scala
@@ -26,9 +26,9 @@ package wire
import org.bson.util.Logging
import java.util.concurrent.atomic.AtomicInteger
-import org.bson.io.PoolOutputBuffer
import org.bson._
import java.io._
+import org.bson.io.{OutputBuffer , PoolOutputBuffer}
/**
* Request OpCodes for communicating with MongoDB Servers
@@ -182,15 +182,19 @@ abstract class MongoMessage extends Logging {
// def apply(channel: Channel) = write
- def write(out: OutputStream) = {
+ def write(out: OutputStream)(implicit maxBSON: Int) = {
// TODO - Reuse / pool Serializers for performance via reset()
- val enc = new DefaultBSONSerializer
-
val buf = new PoolOutputBuffer()
+ val enc = new DefaultBSONSerializer
enc.set(buf)
-
val sizePos = buf.getPosition
+ build(enc)
+ log.trace("Finishing writing core message, final length of '%s'", buf.size)
+ buf.writeInt(sizePos, buf.getPosition - sizePos)
+ buf.pipe(out)
+ }
+ protected def build(enc: BSONSerializer)(implicit maxBSON: Int) = {
enc.writeInt(0) // Length, will set later; for now, placehold
enc.writeInt(requestID)
@@ -199,9 +203,6 @@ abstract class MongoMessage extends Logging {
log.trace("OpCode (%s) Int Type: %s", opCode, opCode.id)
writeMessage(enc)
- log.trace("Finishing writing core message, final length of '%s'", buf.size)
- buf.writeInt(sizePos, buf.getPosition - sizePos)
- buf.pipe(out)
}
/**
@@ -210,7 +211,7 @@ abstract class MongoMessage extends Logging {
* write() puts in the header, writeMessage does a message
* specific writeout
*/
- protected def writeMessage(enc: BSONSerializer)
+ protected def writeMessage(enc: BSONSerializer)(implicit maxBSON: Int)
}
/**

0 comments on commit 03ffdb4

Please sign in to comment.
Something went wrong with that request. Please try again.