Skip to content

Commit

Permalink
fix cursor pinning to server, clean DBCursor.close() JAVA-183 JAVA-178
Browse files Browse the repository at this point in the history
  • Loading branch information
erh committed Oct 27, 2010
1 parent ac897e5 commit f553a18
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 94 deletions.
186 changes: 112 additions & 74 deletions src/main/com/mongodb/DBApiLayer.java
Expand Up @@ -98,6 +98,90 @@ String _removeRoot( String ns ){
return ns.substring( _root.length() + 1 );
}

void _cleanCursors( boolean force )
throws MongoException {

List<DeadCursor> l = null;

// check without synchronisation ( double check pattern will avoid having two threads do the cleanup )
// maybe the whole cleanCursor logic should be moved to a background thread anyway
int sz = _deadCursorIds.size();

if ( sz == 0 )
return;

if ( ! force && sz < NUM_CURSORS_BEFORE_KILL )
return;

synchronized ( _deadCursorIdsLock ){
sz = _deadCursorIds.size();

if ( sz == 0 )
return;

if ( ! force && sz < NUM_CURSORS_BEFORE_KILL )
return;

l = _deadCursorIds;
_deadCursorIds = new LinkedList<DeadCursor>();
}

Bytes.LOGGER.info( "going to kill cursors : " + l.size() );

Map<ServerAddress,List<Long>> m = new HashMap<ServerAddress,List<Long>>();
for ( DeadCursor c : l ){
List<Long> x = m.get( c.host );
if ( x == null ){
x = new LinkedList<Long>();
m.put( c.host , x );
}
x.add( c.id );
}

for ( Map.Entry<ServerAddress,List<Long>> e : m.entrySet() ){
try {
killCursors( e.getKey() , e.getValue() );
}
catch ( Throwable t ){
Bytes.LOGGER.log( Level.WARNING , "can't clean cursors" , t );
synchronized ( _deadCursorIdsLock ){
for ( Long x : e.getValue() )
_deadCursorIds.add( new DeadCursor( x , e.getKey() ) );
}
}
}
}

void killCursors( ServerAddress addr , List<Long> all )
throws MongoException {
if ( all == null || all.size() == 0 )
return;

OutMessage om = new OutMessage( _mongo , 2007 );
om.writeInt( 0 ); // reserved

om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() ) );

int soFar = 0;
int totalSoFar = 0;
for (Long l : all) {
om.writeLong(l);

totalSoFar++;
soFar++;

if ( soFar >= NUM_CURSORS_PER_BATCH ){
_connector.say( this , om ,com.mongodb.WriteConcern.NONE );
om = new OutMessage( _mongo , 2007 );
om.writeInt( 0 ); // reserved
om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() - totalSoFar ) );
soFar = 0;
}
}

_connector.say( this , om ,com.mongodb.WriteConcern.NONE , addr );
}


class MyCollection extends DBCollection {
MyCollection( String name ){
Expand Down Expand Up @@ -184,77 +268,6 @@ public WriteResult remove( DBObject o , com.mongodb.WriteConcern concern )
return _connector.say( _db , om , concern );
}

void _cleanCursors()
throws MongoException {

List<Long> l = null;

// check without synchronisation ( double check pattern will avoid having two threads do the cleanup )
// maybe the whole cleanCursor logic should be moved to a background thread anyway
int sz = _deadCursorIds.size();

if ( sz == 0 )
return;

if ( sz % 20 != 0 && sz < NUM_CURSORS_BEFORE_KILL )
return;

synchronized ( _deadCursorIdsLock ){
sz = _deadCursorIds.size();

if ( sz == 0 )
return;

if ( sz % 20 != 0 && sz < NUM_CURSORS_BEFORE_KILL )
return;

l = _deadCursorIds;
_deadCursorIds = new LinkedList<Long>();
}

Bytes.LOGGER.info( "going to kill cursors : " + l.size() );

try {
killCursors( l );
}
catch ( Throwable t ){
Bytes.LOGGER.log( Level.WARNING , "can't clean cursors" , t );
synchronized ( _deadCursorIdsLock ){
_deadCursorIds.addAll( l );
}
}
}

void killCursors( List<Long> all )
throws MongoException {
if ( all == null || all.size() == 0 )
return;

OutMessage om = new OutMessage( _mongo , 2007 );
om.writeInt( 0 ); // reserved

om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() ) );

int soFar = 0;
int totalSoFar = 0;
for (Long l : all) {
om.writeLong(l);

totalSoFar++;
soFar++;

if ( soFar >= NUM_CURSORS_PER_BATCH ){
_connector.say( _db , om ,com.mongodb.WriteConcern.NONE );
om = new OutMessage( _mongo , 2007 );
om.writeInt( 0 ); // reserved
om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() - totalSoFar ) );
soFar = 0;
}
}

_connector.say( _db , om ,com.mongodb.WriteConcern.NONE );
}

@Override
Iterator<DBObject> __find( DBObject ref , DBObject fields , int numToSkip , int batchSize , int options )
throws MongoException {
Expand All @@ -264,7 +277,7 @@ Iterator<DBObject> __find( DBObject ref , DBObject fields , int numToSkip , int

if ( willTrace() ) trace( "find: " + _fullNameSpace + " " + JSON.serialize( ref ) );

_cleanCursors();
_cleanCursors( false );

OutMessage query = OutMessage.query( _mongo , options , _fullNameSpace , numToSkip , batchSize , ref , fields );

Expand Down Expand Up @@ -403,7 +416,7 @@ public String toString(){
protected void finalize() throws Throwable {
if ( _curResult != null && _curResult.cursor() != 0 ){
synchronized ( _deadCursorIdsLock ){
_deadCursorIds.add( _curResult.cursor() );
_deadCursorIds.add( new DeadCursor( _curResult.cursor() , _host ) );
}
}
super.finalize();
Expand All @@ -413,6 +426,12 @@ public long totalBytes(){
return _totalBytes;
}

public long getCursorId(){
if ( _curResult == null )
return 0;
return _curResult._cursor;
}

int numGetMores(){
return _numGetMores;
}
Expand All @@ -421,6 +440,14 @@ List<Integer> getSizes(){
return Collections.unmodifiableList( _sizes );
}

void close(){
synchronized ( _deadCursorIdsLock ){
_deadCursorIds.add( new DeadCursor( _curResult.cursor() , _host ) );
}
_cleanCursors( true );
_curResult = null;
_cur = null;
}


Response _curResult;
Expand All @@ -434,14 +461,25 @@ List<Integer> getSizes(){
private int _numGetMores = 0;
private List<Integer> _sizes = new ArrayList<Integer>();
} // class Result

class DeadCursor {

DeadCursor( long a , ServerAddress b ){
id = a;
host = b;
}

final long id;
final ServerAddress host;
}

final String _root;
final String _rootPlusDot;
final DBConnector _connector;
final Map<String,MyCollection> _collections = Collections.synchronizedMap( new HashMap<String,MyCollection>() );

final String _deadCursorIdsLock = "DBApiLayer-_deadCursorIdsLock-" + Math.random();
List<Long> _deadCursorIds = new LinkedList<Long>();
List<DeadCursor> _deadCursorIds = new LinkedList<DeadCursor>();

static final List<DBObject> EMPTY = Collections.unmodifiableList( new LinkedList<DBObject>() );
}
1 change: 1 addition & 0 deletions src/main/com/mongodb/DBConnector.java
Expand Up @@ -26,6 +26,7 @@ public interface DBConnector {
public void requestEnsureConnection();

public WriteResult say( DB db , OutMessage m , WriteConcern concern ) throws MongoException;
public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded ) throws MongoException;

public Response call( DB db , DBCollection coll , OutMessage m ) throws MongoException;
public Response call( DB db , DBCollection coll , OutMessage m , ServerAddress hostNeeded ) throws MongoException;
Expand Down
24 changes: 5 additions & 19 deletions src/main/com/mongodb/DBCursor.java
Expand Up @@ -235,30 +235,16 @@ public DBCursor skip( int n ){

/** The cursor (id) on the server; 0 = no cursor */
public long getCursorId() {
if (_it instanceof Result) {
Response curRes = ((Result)_it)._curResult;
if ( curRes != null )
return curRes._cursor;
}
if ( _it instanceof Result )
return ((Result)_it).getCursorId();

return 0;
}

/** kill the current cursor on the server. */
public boolean kill() {
long cursorId = getCursorId();
if ( cursorId > 0 ) {
if ( _it instanceof Result ) {
Result res = (Result)_it;
((MyCollection) res._collection).killCursors( Arrays.asList( cursorId ) );

//null the current results so it doesn't get killed again.
res._curResult = null;
}
}

//TODO: how can we tell if it was successful? getLastError?
return true;
public void close() {
if ( _it instanceof Result )
((Result)_it).close();
}

/**
Expand Down
7 changes: 6 additions & 1 deletion src/main/com/mongodb/DBTCPConnector.java
Expand Up @@ -138,12 +138,17 @@ WriteResult _checkWriteError( DB db , MyPort mp , DBPort port , WriteConcern con

public WriteResult say( DB db , OutMessage m , WriteConcern concern )
throws MongoException {
return say( db , m , concern , null );
}

public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded )
throws MongoException {

_checkClosed();
checkMaster( false , true );

MyPort mp = _myPort.get();
DBPort port = mp.get( true , false , null );
DBPort port = mp.get( true , false , hostNeeded );
port.checkAuth( db );

try {
Expand Down

0 comments on commit f553a18

Please sign in to comment.