Skip to content

Commit

Permalink
Add InsertOption_KeepGoing to keep going after error on bulk insert. …
Browse files Browse the repository at this point in the history
…SERVER-509
  • Loading branch information
RedBeard0531 committed May 23, 2011
1 parent 0e28f89 commit b690e23
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 81 deletions.
10 changes: 4 additions & 6 deletions client/dbclient.cpp
Expand Up @@ -683,12 +683,11 @@ namespace mongo {
return n;
}

void DBClientBase::insert( const string & ns , BSONObj obj ) {
void DBClientBase::insert( const string & ns , BSONObj obj , int flags) {
Message toSend;

BufBuilder b;
int opts = 0;
b.appendNum( opts );
b.appendNum( flags );
b.appendStr( ns );
obj.appendSelfToBufBuilder( b );

Expand All @@ -697,12 +696,11 @@ namespace mongo {
say( toSend );
}

void DBClientBase::insert( const string & ns , const vector< BSONObj > &v ) {
void DBClientBase::insert( const string & ns , const vector< BSONObj > &v , int flags) {
Message toSend;

BufBuilder b;
int opts = 0;
b.appendNum( opts );
b.appendNum( flags );
b.appendStr( ns );
for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i )
i->appendSelfToBufBuilder( b );
Expand Down
14 changes: 10 additions & 4 deletions client/dbclient.h
Expand Up @@ -103,6 +103,12 @@ namespace mongo {
RemoveOption_Broadcast = 1 << 1
};


enum InsertOptions {
/** With muli-insert keep processing inserts if one fails */
InsertOption_KeepGoing = 1 << 0
};

class DBClientBase;

/**
Expand Down Expand Up @@ -354,9 +360,9 @@ namespace mongo {
/** don't use this - called automatically by DBClientCursor for you */
virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0;

virtual void insert( const string &ns, BSONObj obj ) = 0;
virtual void insert( const string &ns, BSONObj obj , int flags=0) = 0;

virtual void insert( const string &ns, const vector< BSONObj >& v ) = 0;
virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0) = 0;

virtual void remove( const string &ns , Query query, bool justOne = 0 ) = 0;

Expand Down Expand Up @@ -757,12 +763,12 @@ namespace mongo {
/**
insert an object into the database
*/
virtual void insert( const string &ns , BSONObj obj );
virtual void insert( const string &ns , BSONObj obj , int flags=0);

/**
insert a vector of objects into the database
*/
virtual void insert( const string &ns, const vector< BSONObj >& v );
virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0);

/**
remove matching objects from the database
Expand Down
8 changes: 4 additions & 4 deletions client/dbclient_rs.cpp
Expand Up @@ -519,12 +519,12 @@ namespace mongo {

// ------------- simple functions -----------------

void DBClientReplicaSet::insert( const string &ns , BSONObj obj ) {
checkMaster()->insert(ns, obj);
void DBClientReplicaSet::insert( const string &ns , BSONObj obj , int flags) {
checkMaster()->insert(ns, obj, flags);
}

void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v ) {
checkMaster()->insert(ns, v);
void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v , int flags) {
checkMaster()->insert(ns, v, flags);
}

void DBClientReplicaSet::remove( const string &ns , Query obj , bool justOne ) {
Expand Down
4 changes: 2 additions & 2 deletions client/dbclient_rs.h
Expand Up @@ -191,11 +191,11 @@ namespace mongo {
/** throws userassertion "no master found" */
virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);

virtual void insert( const string &ns , BSONObj obj );
virtual void insert( const string &ns , BSONObj obj , int flags=0);

/** insert multiple objects. Note that single object insert is asynchronous, so this version
is only nominally faster and not worth a special effort to try to use. */
virtual void insert( const string &ns, const vector< BSONObj >& v );
virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0);

virtual void remove( const string &ns , Query obj , bool justOne = 0 );

Expand Down
6 changes: 3 additions & 3 deletions client/syncclusterconnection.cpp
Expand Up @@ -240,7 +240,7 @@ namespace mongo {
return c;
}

void SyncClusterConnection::insert( const string &ns, BSONObj obj ) {
void SyncClusterConnection::insert( const string &ns, BSONObj obj , int flags) {

uassert( 13119 , (string)"SyncClusterConnection::insert obj has to have an _id: " + obj.jsonString() ,
ns.find( ".system.indexes" ) != string::npos || obj["_id"].type() );
Expand All @@ -250,13 +250,13 @@ namespace mongo {
throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg );

for ( size_t i=0; i<_conns.size(); i++ ) {
_conns[i]->insert( ns , obj );
_conns[i]->insert( ns , obj , flags);
}

_checkLast();
}

void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v ) {
void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v , int flags) {
uassert( 10023 , "SyncClusterConnection bulk insert not implemented" , 0);
}

Expand Down
4 changes: 2 additions & 2 deletions client/syncclusterconnection.h
Expand Up @@ -67,9 +67,9 @@ namespace mongo {

virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn, int options );

virtual void insert( const string &ns, BSONObj obj );
virtual void insert( const string &ns, BSONObj obj, int flags=0);

virtual void insert( const string &ns, const vector< BSONObj >& v );
virtual void insert( const string &ns, const vector< BSONObj >& v, int flags=0);

virtual void remove( const string &ns , Query query, bool justOne );

Expand Down
38 changes: 24 additions & 14 deletions db/instance.cpp
Expand Up @@ -572,6 +572,7 @@ namespace mongo {

void receivedInsert(Message& m, CurOp& op) {
DbMessage d(m);
const bool keepGoing = d.reservedField() & InsertOption_KeepGoing;
const char *ns = d.getns();
op.debug().ns = ns;

Expand All @@ -587,23 +588,32 @@ namespace mongo {
if( d.moreJSObjs() ) {
int n = 0;
while ( d.moreJSObjs() ) {
BSONObj js = d.nextJsObj();
uassert( 10059 , "object to insert too large", js.objsize() <= BSONObjMaxUserSize);

{
// check no $ modifiers
BSONObjIterator i( js );
while ( i.more() ) {
BSONElement e = i.next();
uassert( 13511 , "object to insert can't have $ modifiers" , e.fieldName()[0] != '$' );
try {
BSONObj js = d.nextJsObj();

uassert( 10059 , "object to insert too large", js.objsize() <= BSONObjMaxUserSize);

{
// check no $ modifiers
BSONObjIterator i( js );
while ( i.more() ) {
BSONElement e = i.next();
uassert( 13511 , "object to insert can't have $ modifiers" , e.fieldName()[0] != '$' );
}
}
}

theDataFileMgr.insertWithObjMod(ns, js, false);
logOp("i", ns, js);
++n;
theDataFileMgr.insertWithObjMod(ns, js, false);
logOp("i", ns, js);
++n;

getDur().commitIfNeeded();
getDur().commitIfNeeded();
} catch (const UserException&){
if (!keepGoing || !d.moreJSObjs()){
globalOpCounters.incInsertInWriteLock(n);
throw;
}
// otherwise ignore and keep going
}
}
globalOpCounters.incInsertInWriteLock(n);
}
Expand Down
23 changes: 23 additions & 0 deletions dbtests/directclienttests.cpp
Expand Up @@ -69,12 +69,35 @@ namespace DirectClientTests {
}
};

class InsertMany : ClientBase {
public:
virtual void run(){
vector<BSONObj> objs;
objs.push_back(BSON("_id" << 1));
objs.push_back(BSON("_id" << 1));
objs.push_back(BSON("_id" << 2));


client().dropCollection(ns);
client().insert(ns, objs);
ASSERT_EQUALS(client().getLastErrorDetailed()["code"].numberInt(), 11000);
ASSERT_EQUALS((int)client().count(ns), 1);

client().dropCollection(ns);
client().insert(ns, objs, InsertOption_KeepGoing);
ASSERT_EQUALS(client().getLastErrorDetailed()["code"].numberInt(), 11000);
ASSERT_EQUALS((int)client().count(ns), 2);
}

};

class All : public Suite {
public:
All() : Suite( "directclient" ) {
}
void setupTests() {
add< Capped >();
add< InsertMany >();
}
} myall;
}
4 changes: 2 additions & 2 deletions s/strategy.cpp
Expand Up @@ -67,13 +67,13 @@ namespace mongo {
dbcon.done();
}

void Strategy::insert( const Shard& shard , const char * ns , const BSONObj& obj ) {
void Strategy::insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags ) {
ShardConnection dbcon( shard , ns );
if ( dbcon.setVersion() ) {
dbcon.done();
throw StaleConfigException( ns , "for insert" );
}
dbcon->insert( ns , obj );
dbcon->insert( ns , obj , flags);
dbcon.done();
}
}
2 changes: 1 addition & 1 deletion s/strategy.h
Expand Up @@ -36,7 +36,7 @@ namespace mongo {
void doWrite( int op , Request& r , const Shard& shard , bool checkVersion = true );
void doQuery( Request& r , const Shard& shard );

void insert( const Shard& shard , const char * ns , const BSONObj& obj );
void insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags=0);

};

Expand Down
98 changes: 55 additions & 43 deletions s/strategy_shard.cpp
Expand Up @@ -126,62 +126,74 @@ namespace mongo {
}

void _insert( Request& r , DbMessage& d, ChunkManagerPtr manager ) {
const int flags = d.reservedField();
bool keepGoing = flags & InsertOption_KeepGoing; // modified before assertion if should abort

while ( d.moreJSObjs() ) {
BSONObj o = d.nextJsObj();
if ( ! manager->hasShardKey( o ) ) {
try {
BSONObj o = d.nextJsObj();
if ( ! manager->hasShardKey( o ) ) {

bool bad = true;
bool bad = true;

if ( manager->getShardKey().partOfShardKey( "_id" ) ) {
BSONObjBuilder b;
b.appendOID( "_id" , 0 , true );
b.appendElements( o );
o = b.obj();
bad = ! manager->hasShardKey( o );
}
if ( manager->getShardKey().partOfShardKey( "_id" ) ) {
BSONObjBuilder b;
b.appendOID( "_id" , 0 , true );
b.appendElements( o );
o = b.obj();
bad = ! manager->hasShardKey( o );
}

if ( bad ) {
log() << "tried to insert object without shard key: " << r.getns() << " " << o << endl;
throw UserException( 8011 , "tried to insert object without shard key" );
}
if ( bad ) {
log() << "tried to insert object without shard key: " << r.getns() << " " << o << endl;
uasserted( 8011 , "tried to insert object without shard key" );
}

}
}

// Many operations benefit from having the shard key early in the object
o = manager->getShardKey().moveToFront(o);
// Many operations benefit from having the shard key early in the object
o = manager->getShardKey().moveToFront(o);

const int maxTries = 10;
const int maxTries = 10;

bool gotThrough = false;
for ( int i=0; i<maxTries; i++ ) {
try {
ChunkPtr c = manager->findChunk( o );
log(4) << " server:" << c->getShard().toString() << " " << o << endl;
insert( c->getShard() , r.getns() , o );
bool gotThrough = false;
for ( int i=0; i<maxTries; i++ ) {
try {
ChunkPtr c = manager->findChunk( o );
log(4) << " server:" << c->getShard().toString() << " " << o << endl;
insert( c->getShard() , r.getns() , o , flags);

r.gotInsert();
if ( r.getClientInfo()->autoSplitOk() )
c->splitIfShould( o.objsize() );
gotThrough = true;
break;
r.gotInsert();
if ( r.getClientInfo()->autoSplitOk() )
c->splitIfShould( o.objsize() );
gotThrough = true;
break;
}
catch ( StaleConfigException& e ) {
int logLevel = i < ( maxTries / 2 );
LOG( logLevel ) << "retrying insert because of StaleConfigException: " << e << " object: " << o << endl;
r.reset();

unsigned long long old = manager->getSequenceNumber();
manager = r.getChunkManager();

LOG( logLevel ) << " sequenece number - old: " << old << " new: " << manager->getSequenceNumber() << endl;

if (!manager) {
keepGoing = false;
uasserted(14804, "collection no longer sharded");
}
}
sleepmillis( i * 200 );
}
catch ( StaleConfigException& e ) {
int logLevel = i < ( maxTries / 2 );
LOG( logLevel ) << "retrying insert because of StaleConfigException: " << e << " object: " << o << endl;
r.reset();

unsigned long long old = manager->getSequenceNumber();
manager = r.getChunkManager();

LOG( logLevel ) << " sequenece number - old: " << old << " new: " << manager->getSequenceNumber() << endl;

uassert(14804, "collection no longer sharded", manager);

assert( inShutdown() || gotThrough ); // not caught below
} catch (const UserException&){
if (!keepGoing || !d.moreJSObjs()){
throw;
}
sleepmillis( i * 200 );
// otherwise ignore and keep going
}

assert( inShutdown() || gotThrough );
}
}

Expand Down

0 comments on commit b690e23

Please sign in to comment.