diff --git a/src/main/com/mongodb/DBApiLayer.java b/src/main/com/mongodb/DBApiLayer.java index 29dbf7475e4..58e87e704a9 100644 --- a/src/main/com/mongodb/DBApiLayer.java +++ b/src/main/com/mongodb/DBApiLayer.java @@ -98,6 +98,90 @@ String _removeRoot( String ns ){ return ns.substring( _root.length() + 1 ); } + void _cleanCursors( boolean force ) + throws MongoException { + + List l = null; + + // check without synchronisation ( double check pattern will avoid having two threads do the cleanup ) + // maybe the whole cleanCursor logic should be moved to a background thread anyway + int sz = _deadCursorIds.size(); + + if ( sz == 0 ) + return; + + if ( ! force && sz < NUM_CURSORS_BEFORE_KILL ) + return; + + synchronized ( _deadCursorIdsLock ){ + sz = _deadCursorIds.size(); + + if ( sz == 0 ) + return; + + if ( ! force && sz < NUM_CURSORS_BEFORE_KILL ) + return; + + l = _deadCursorIds; + _deadCursorIds = new LinkedList(); + } + + Bytes.LOGGER.info( "going to kill cursors : " + l.size() ); + + Map> m = new HashMap>(); + for ( DeadCursor c : l ){ + List x = m.get( c.host ); + if ( x == null ){ + x = new LinkedList(); + m.put( c.host , x ); + } + x.add( c.id ); + } + + for ( Map.Entry> e : m.entrySet() ){ + try { + killCursors( e.getKey() , e.getValue() ); + } + catch ( Throwable t ){ + Bytes.LOGGER.log( Level.WARNING , "can't clean cursors" , t ); + synchronized ( _deadCursorIdsLock ){ + for ( Long x : e.getValue() ) + _deadCursorIds.add( new DeadCursor( x , e.getKey() ) ); + } + } + } + } + + void killCursors( ServerAddress addr , List all ) + throws MongoException { + if ( all == null || all.size() == 0 ) + return; + + OutMessage om = new OutMessage( _mongo , 2007 ); + om.writeInt( 0 ); // reserved + + om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() ) ); + + int soFar = 0; + int totalSoFar = 0; + for (Long l : all) { + om.writeLong(l); + + totalSoFar++; + soFar++; + + if ( soFar >= NUM_CURSORS_PER_BATCH ){ + _connector.say( this , om ,com.mongodb.WriteConcern.NONE ); + om = new OutMessage( _mongo , 2007 ); + om.writeInt( 0 ); // reserved + om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() - totalSoFar ) ); + soFar = 0; + } + } + + _connector.say( this , om ,com.mongodb.WriteConcern.NONE , addr ); + } + class MyCollection extends DBCollection { MyCollection( String name ){ @@ -184,77 +268,6 @@ public WriteResult remove( DBObject o , com.mongodb.WriteConcern concern ) return _connector.say( _db , om , concern ); } - void _cleanCursors() - throws MongoException { - - List l = null; - - // check without synchronisation ( double check pattern will avoid having two threads do the cleanup ) - // maybe the whole cleanCursor logic should be moved to a background thread anyway - int sz = _deadCursorIds.size(); - - if ( sz == 0 ) - return; - - if ( sz % 20 != 0 && sz < NUM_CURSORS_BEFORE_KILL ) - return; - - synchronized ( _deadCursorIdsLock ){ - sz = _deadCursorIds.size(); - - if ( sz == 0 ) - return; - - if ( sz % 20 != 0 && sz < NUM_CURSORS_BEFORE_KILL ) - return; - - l = _deadCursorIds; - _deadCursorIds = new LinkedList(); - } - - Bytes.LOGGER.info( "going to kill cursors : " + l.size() ); - - try { - killCursors( l ); - } - catch ( Throwable t ){ - Bytes.LOGGER.log( Level.WARNING , "can't clean cursors" , t ); - synchronized ( _deadCursorIdsLock ){ - _deadCursorIds.addAll( l ); - } - } - } - - void killCursors( List all ) - throws MongoException { - if ( all == null || all.size() == 0 ) - return; - - OutMessage om = new OutMessage( _mongo , 2007 ); - om.writeInt( 0 ); // reserved - - om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() ) ); - - int soFar = 0; - int totalSoFar = 0; - for (Long l : all) { - om.writeLong(l); - - totalSoFar++; - soFar++; - - if ( soFar >= NUM_CURSORS_PER_BATCH ){ - _connector.say( _db , om ,com.mongodb.WriteConcern.NONE ); - om = new OutMessage( _mongo , 2007 ); - om.writeInt( 0 ); // reserved - om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() - totalSoFar ) ); - soFar = 0; - } - } - - _connector.say( _db , om ,com.mongodb.WriteConcern.NONE ); - } - @Override Iterator __find( DBObject ref , DBObject fields , int numToSkip , int batchSize , int options ) throws MongoException { @@ -264,7 +277,7 @@ Iterator __find( DBObject ref , DBObject fields , int numToSkip , int if ( willTrace() ) trace( "find: " + _fullNameSpace + " " + JSON.serialize( ref ) ); - _cleanCursors(); + _cleanCursors( false ); OutMessage query = OutMessage.query( _mongo , options , _fullNameSpace , numToSkip , batchSize , ref , fields ); @@ -403,7 +416,7 @@ public String toString(){ protected void finalize() throws Throwable { if ( _curResult != null && _curResult.cursor() != 0 ){ synchronized ( _deadCursorIdsLock ){ - _deadCursorIds.add( _curResult.cursor() ); + _deadCursorIds.add( new DeadCursor( _curResult.cursor() , _host ) ); } } super.finalize(); @@ -413,6 +426,12 @@ public long totalBytes(){ return _totalBytes; } + public long getCursorId(){ + if ( _curResult == null ) + return 0; + return _curResult._cursor; + } + int numGetMores(){ return _numGetMores; } @@ -421,6 +440,14 @@ List getSizes(){ return Collections.unmodifiableList( _sizes ); } + void close(){ + synchronized ( _deadCursorIdsLock ){ + _deadCursorIds.add( new DeadCursor( _curResult.cursor() , _host ) ); + } + _cleanCursors( true ); + _curResult = null; + _cur = null; + } Response _curResult; @@ -434,6 +461,17 @@ List getSizes(){ private int _numGetMores = 0; private List _sizes = new ArrayList(); } // class Result + + class DeadCursor { + + DeadCursor( long a , ServerAddress b ){ + id = a; + host = b; + } + + final long id; + final ServerAddress host; + } final String _root; final String _rootPlusDot; @@ -441,7 +479,7 @@ List getSizes(){ final Map _collections = Collections.synchronizedMap( new HashMap() ); final String _deadCursorIdsLock = "DBApiLayer-_deadCursorIdsLock-" + Math.random(); - List _deadCursorIds = new LinkedList(); + List _deadCursorIds = new LinkedList(); static final List EMPTY = Collections.unmodifiableList( new LinkedList() ); } diff --git a/src/main/com/mongodb/DBConnector.java b/src/main/com/mongodb/DBConnector.java index 15b15c7e0d2..e08a53383f4 100644 --- a/src/main/com/mongodb/DBConnector.java +++ b/src/main/com/mongodb/DBConnector.java @@ -26,6 +26,7 @@ public interface DBConnector { public void requestEnsureConnection(); public WriteResult say( DB db , OutMessage m , WriteConcern concern ) throws MongoException; + public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded ) throws MongoException; public Response call( DB db , DBCollection coll , OutMessage m ) throws MongoException; public Response call( DB db , DBCollection coll , OutMessage m , ServerAddress hostNeeded ) throws MongoException; diff --git a/src/main/com/mongodb/DBCursor.java b/src/main/com/mongodb/DBCursor.java index d458245dd6a..e8f6ef2adc5 100644 --- a/src/main/com/mongodb/DBCursor.java +++ b/src/main/com/mongodb/DBCursor.java @@ -235,30 +235,16 @@ public DBCursor skip( int n ){ /** The cursor (id) on the server; 0 = no cursor */ public long getCursorId() { - if (_it instanceof Result) { - Response curRes = ((Result)_it)._curResult; - if ( curRes != null ) - return curRes._cursor; - } + if ( _it instanceof Result ) + return ((Result)_it).getCursorId(); return 0; } /** kill the current cursor on the server. */ - public boolean kill() { - long cursorId = getCursorId(); - if ( cursorId > 0 ) { - if ( _it instanceof Result ) { - Result res = (Result)_it; - ((MyCollection) res._collection).killCursors( Arrays.asList( cursorId ) ); - - //null the current results so it doesn't get killed again. - res._curResult = null; - } - } - - //TODO: how can we tell if it was successful? getLastError? - return true; + public void close() { + if ( _it instanceof Result ) + ((Result)_it).close(); } /** diff --git a/src/main/com/mongodb/DBTCPConnector.java b/src/main/com/mongodb/DBTCPConnector.java index 64a0a39ef1f..f09187d46b7 100644 --- a/src/main/com/mongodb/DBTCPConnector.java +++ b/src/main/com/mongodb/DBTCPConnector.java @@ -138,12 +138,17 @@ WriteResult _checkWriteError( DB db , MyPort mp , DBPort port , WriteConcern con public WriteResult say( DB db , OutMessage m , WriteConcern concern ) throws MongoException { + return say( db , m , concern , null ); + } + + public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded ) + throws MongoException { _checkClosed(); checkMaster( false , true ); MyPort mp = _myPort.get(); - DBPort port = mp.get( true , false , null ); + DBPort port = mp.get( true , false , hostNeeded ); port.checkAuth( db ); try {