Skip to content

Commit

Permalink
Add Collection.NewIter method.
Browse files Browse the repository at this point in the history
This is a refactoring of the first batch + cursor logic that is used
in the Pipe and Repair methods, so it may be used both internally in
other places and also externally when people depend on functionality
not yet implemented in the driver.
  • Loading branch information
niemeyer committed Dec 21, 2014
1 parent 513c45d commit baa44ca
Showing 1 changed file with 75 additions and 57 deletions.
132 changes: 75 additions & 57 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1855,13 +1855,6 @@ func (c *Collection) Repair() *Iter {
cloned := session.Clone()
cloned.SetMode(Strong, false)
defer cloned.Close()
c = c.With(cloned)

iter := &Iter{
session: session,
timeout: -1,
}
iter.gotReply.L = &iter.m

var result struct {
Cursor struct {
Expand All @@ -1874,28 +1867,10 @@ func (c *Collection) Repair() *Iter {
RepairCursor: c.Name,
Cursor: &repairCmdCursor{batchSize},
}
iter.err = c.Database.Run(cmd, &result)
if iter.err != nil {
return iter
}
docs := result.Cursor.FirstBatch
for i := range docs {
iter.docData.Push(docs[i].Data)
}
if result.Cursor.Id != 0 {
socket, err := cloned.acquireSocket(true)
if err != nil {
// Cloned session is in strong mode, and the query
// above succeeded. Should have a reserved socket.
panic("internal error: " + err.Error())
}
iter.server = socket.Server()
socket.Release()
iter.op.cursorId = result.Cursor.Id
iter.op.collection = c.FullName
iter.op.replyFunc = iter.replyFunc()
}
return iter

clonedc := c.With(cloned)
err := clonedc.Database.Run(cmd, &result)
return clonedc.NewIter(session, result.Cursor.FirstBatch, result.Cursor.Id, err)
}

// FindId is a convenience helper equivalent to:
Expand Down Expand Up @@ -1957,7 +1932,6 @@ func (c *Collection) Pipe(pipeline interface{}) *Pipe {
// Iter executes the pipeline and returns an iterator capable of going
// over all the generated results.
func (p *Pipe) Iter() *Iter {

// Clone session and set it to strong mode so that the server
// used for the query may be safely obtained afterwards, if
// necessary for iteration when a cursor is received.
Expand All @@ -1966,12 +1940,6 @@ func (p *Pipe) Iter() *Iter {
defer cloned.Close()
c := p.collection.With(cloned)

iter := &Iter{
session: p.session,
timeout: -1,
}
iter.gotReply.L = &iter.m

var result struct {
// 2.4, no cursors.
Result []bson.Raw
Expand All @@ -1989,34 +1957,84 @@ func (p *Pipe) Iter() *Iter {
AllowDisk: p.allowDisk,
Cursor: &pipeCmdCursor{p.batchSize},
}
iter.err = c.Database.Run(cmd, &result)
if e, ok := iter.err.(*QueryError); ok && e.Message == `unrecognized field "cursor` {
err := c.Database.Run(cmd, &result)
if e, ok := err.(*QueryError); ok && e.Message == `unrecognized field "cursor` {
cmd.Cursor = nil
cmd.AllowDisk = false
iter.err = c.Database.Run(cmd, &result)
err = c.Database.Run(cmd, &result)
}
firstBatch := result.Result
if firstBatch == nil {
firstBatch = result.Cursor.FirstBatch
}
return c.NewIter(p.session, firstBatch, result.Cursor.Id, err)

This comment has been minimized.

Copy link
@jyemin

jyemin Dec 22, 2014

See DRIVERS-198: use the 'ns' value from result.Cursor for OP_GET_MORE, regardless of the command. While in current server versions the value returned by the aggregate command will be equal to c.FullName, the server does not guarantee that.

This comment has been minimized.

Copy link
@niemeyer

niemeyer Dec 22, 2014

Author Contributor

Thanks, I'll keep this in mind and will change once we have some leg room. For now, I'll preserve this logic as I know it was working with several MongoDB releases. We're already fiddling more than I'm comfortable with for being a holiday period pre-release.

}

// NewIter returns a newly created iterator with the provided parameters.
// Using this method is not recommended unless the desired functionality
// is not yet exposed via a more convenient interface (Find, Pipe, etc).
//
// The optional session parameter associates the lifetime of the returned
// iterator to an arbitrary session. If nil, the iterator will be bound to
// c's session.
//
// Documents in firstBatch will be individually provided by the returned
// iterator before documents from cursorId are made available. If cursorId
// is zero, only the documents in firstBatch are provided.
//
// If err is not nil, the iterator's Err method will report it after
// exhausting documents in firstBatch.
//
// NewIter must be called right after the cursor id is obtained, and must not
// be called on a collection in Eventual mode, because the cursor id is
// associated with the specific server that returned it. The session parameter
// may be in any mode or state, though.
//
func (c *Collection) NewIter(session *Session, firstBatch []bson.Raw, cursorId int64, err error) *Iter {
var server *mongoServer
csession := c.Database.Session
csession.m.RLock()
socket := csession.masterSocket
if socket == nil {
socket = csession.slaveSocket
}
if iter.err != nil {
return iter
if socket != nil {
server = socket.Server()
}
docs := result.Result
if docs == nil {
docs = result.Cursor.FirstBatch
csession.m.RUnlock()

if server == nil {
if csession.Mode() == Eventual {
panic("Collection.NewIter called in Eventual mode")
}
panic("Collection.NewIter called on a fresh session with no associated server")
}
for i := range docs {
iter.docData.Push(docs[i].Data)

if session == nil {
session = csession
}
if result.Cursor.Id != 0 {
socket, err := cloned.acquireSocket(true)
if err != nil {
// Cloned session is in strong mode, and the query
// above succeeded. Should have a reserved socket.
panic("internal error: " + err.Error())

iter := &Iter{
session: session,
server: server,
timeout: -1,
err: err,
}
iter.gotReply.L = &iter.m
for _, doc := range firstBatch {
iter.docData.Push(doc.Data)
}
if cursorId != 0 {
socket, err := c.Database.Session.acquireSocket(true)
if err == nil {
iter.server = socket.Server()

This comment has been minimized.

Copy link
@jyemin

jyemin Dec 22, 2014

Can you use the local variable "server" from line 1994?

This comment has been minimized.

Copy link
@niemeyer

niemeyer Dec 22, 2014

Author Contributor

The refactoring makes this line unnecessary, actually. Iter is created with server set above. Thanks.

This comment has been minimized.

Copy link
@jyemin

jyemin Dec 22, 2014

Yes, I see that now.

socket.Release()
iter.op.cursorId = cursorId
iter.op.collection = c.FullName
iter.op.replyFunc = iter.replyFunc()
} else if iter.err == nil {
iter.err = err
}
iter.server = socket.Server()
socket.Release()
iter.op.cursorId = result.Cursor.Id
iter.op.collection = c.FullName
iter.op.replyFunc = iter.replyFunc()
}
return iter
}
Expand Down

0 comments on commit baa44ca

Please sign in to comment.