Permalink
Browse files

Merge branch 'master' of git@github.com:mongodb/mongo

  • Loading branch information...
2 parents 14adf2b + dd12623 commit 98d5685751fd8e95922fe0da6271f5e7d8ac88ba dwight committed Nov 2, 2011
Showing with 110 additions and 5 deletions.
  1. +82 −0 jstests/slowNightly/mr_shard_version.js
  2. +6 −3 s/writeback_listener.cpp
  3. +22 −2 scripting/bench.cpp
@@ -0,0 +1,82 @@
+// Test for SERVER-4158 (version changes during mapreduce)
+
+var st = new ShardingTest({ shards : 2, mongos : 1, verbose : 2 })
+
+//Stop balancer, since it'll just get in the way of these
+st.stopBalancer()
+
+var coll = st.s.getCollection( jsTest.name() + ".coll" )
+
+var numDocs = 1000000
+var numKeys = 1000
+var numTests = 3
+
+for( var i = 0; i < numDocs; i++ ){
+ coll.insert({ _id : i, key : "" + ( i % numKeys ), value : i % numKeys })
+}
+
+var halfId = coll.find().itcount() / 2
+
+// Shard collection in half
+st.shardColl( coll, { _id : 1 }, { _id : halfId } )
+
+st.printShardingStatus()
+
+jsTest.log( "Collection now initialized with keys and values..." )
+
+jsTest.log( "Starting migrations..." )
+
+var migrateOp = { op : "command", ns : "admin", command : { moveChunk : "" + coll } }
+
+var checkMigrate = function(){ print( "Result of migrate : " ); printjson( this ) }
+
+var ops = {}
+for( var i = 0; i < st._shardServers.length; i++ ){
+ for( var j = 0; j < 2; j++ ){
+ ops[ "" + (i * 2 + j) ] = { op : "command", ns : "admin",
+ command : { moveChunk : "" + coll,
+ find : { _id : ( j == 0 ? 0 : halfId ) },
+ to : st._shardServers[i].shardName } } // , check : checkMigrate }
+ // TODO: Deadlock due to global V8Lock between scopes if we stop with a js check
+ }
+}
+
+var bid = benchStart({ ops : ops,
+ host : st.s.host,
+ parallel : 1,
+ handleErrors : false })
+
+jsTest.log( "Starting m/r..." )
+
+var map = function(){ emit( this.key, this.value ) }
+var reduce = function(k, values){
+ var total = 0
+ for( var i = 0; i < values.length; i++ ) total += values[i]
+ return total
+}
+
+var outputColl = st.s.getCollection( jsTest.name() + ".mrOutput" )
+
+jsTest.log( "Output coll : " + outputColl )
+
+for( var t = 0; t < numTests; t++ ){
+
+ var results = coll.mapReduce( map, reduce, { out : { replace : outputColl.getName() } })
+
+ // Assert that the results are actually correct, all keys have values of (numDocs / numKeys) x key
+ var output = outputColl.find().sort({ _id : 1 }).toArray()
+
+ // printjson( output )
+
+ assert.eq( output.length, numKeys )
+
+ for( var i = 0; i < output.length; i++ )
+ assert.eq( parseInt( output[i]._id ) * ( numDocs / numKeys ), output[i].value )
+
+}
+
+jsTest.log( "Finishing parallel migrations..." )
+
+printjson( benchFinish( bid ) )
+
+st.stop()
View
@@ -165,13 +165,16 @@ namespace mongo {
DBConfigPtr db = grid.getDBConfig( ns );
ShardChunkVersion needVersion( data["version"] );
+ // TODO: The logic here could be refactored, but keeping to the original codepath for safety for now
+ ChunkManagerPtr manager = db->getChunkManagerIfExists( ns );
+
LOG(1) << "connectionId: " << cid << " writebackId: " << wid << " needVersion : " << needVersion.toString()
- << " mine : " << db->getChunkManager( ns )->getVersion().toString()
+ << " mine : " << ( manager ? manager->getVersion().toString() : "(unknown)" )
<< endl;
LOG(1) << m.toString() << endl;
- if ( needVersion.isSet() && needVersion <= db->getChunkManager( ns )->getVersion() ) {
+ if ( needVersion.isSet() && manager && needVersion <= manager->getVersion() ) {
// this means when the write went originally, the version was old
// if we're here, it means we've already updated the config, so don't need to do again
//db->getChunkManager( ns , true ); // SERVER-1349
@@ -180,7 +183,7 @@ namespace mongo {
// we received a writeback object that was sent to a previous version of a shard
// the actual shard may not have the object the writeback operation is for
// we need to reload the chunk manager and get the new shard versions
- db->getChunkManager( ns , true );
+ manager = db->getChunkManager( ns , true );
}
// do request and then call getLastError
View
@@ -135,6 +135,7 @@ namespace mongo {
}
+ // TODO: Make recursively fixing
static void fixQuery( BSONObjBuilder& b , const BSONObj& obj ) {
BSONObjIterator i( obj );
while ( i.more() ) {
@@ -221,6 +222,23 @@ namespace mongo {
if( ! config->hideResults || e["showResult"].trueValue() ) log() << "Result from benchRun thread [findOne] : " << result << endl;
}
+ if ( op == "command" ) {
+
+ BSONObj result;
+ // TODO
+ /* bool ok = */ conn->runCommand( ns , /* fixQuery( */ e["command"].Obj() /* ) */, result, e["options"].numberInt() );
+
+ if( check ){
+ int err = scope->invoke( scopeFunc , 0 , &result, 1000 * 60 , false );
+ if( err ){
+ log() << "Error checking in benchRun thread [command]" << causedBy( scope->getError() ) << endl;
+ return;
+ }
+ }
+
+ if( ! config->hideResults || e["showResult"].trueValue() ) log() << "Result from benchRun thread [command] : " << result << endl;
+
+ }
else if( op == "find" || op == "query" ) {
int limit = e["limit"].eoo() ? 0 : e["limit"].Int();
@@ -485,8 +503,10 @@ namespace mongo {
log() << "Ending! (waiting for " << threads.size() << " threads)" << endl;
- scoped_lock lock( config._mutex );
- config.active = false;
+ {
+ scoped_lock lock( config._mutex );
+ config.active = false;
+ }
for ( unsigned i = 0; i < threads.size(); i++ ) threads[i]->join();

0 comments on commit 98d5685

Please sign in to comment.