Skip to content

Commit

Permalink
Add an execution handler to run app callbacks in their own thread
Browse files Browse the repository at this point in the history
  • Loading branch information
havocp committed Jul 20, 2011
1 parent 5922fa8 commit 48c6d4b
Showing 1 changed file with 54 additions and 2 deletions.
56 changes: 54 additions & 2 deletions mongo-driver/src/main/scala/MongoConnection.scala
Expand Up @@ -24,14 +24,16 @@ import org.bson._
import org.bson.collection._
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel._
import org.jboss.netty.handler.execution._
import org.jboss.netty.util._
import java.nio.ByteOrder
import com.mongodb.async.wire._
import scala.collection.JavaConversions._
import com.mongodb.async.futures._
import org.jboss.netty.buffer._

import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{ ConcurrentHashMap, Executors }
import java.util.concurrent.{ ConcurrentHashMap, Executors, TimeUnit }
import scala.collection.mutable.{ ConcurrentMap, WeakHashMap }
import com.mongodb.async.util.{ ConcurrentQueue, CursorCleaningTimer }
import org.bson.types.ObjectId
Expand Down Expand Up @@ -71,8 +73,58 @@ abstract class MongoConnection extends Logging {
protected implicit val bootstrap = new ClientBootstrap(channelFactory)

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
// objectSizeEstimator tells netty how to compute size of ReplyMessage
// sitting in the pipeline
private val objectSizeEstimator = new DefaultObjectSizeEstimator {
override def estimateSize(o: AnyRef): Int = {
o match {
case r: ReplyMessage =>
r.header.messageLength
case _ =>
super.estimateSize(o)
}
}
}

/* The executor ensures that we use more than one thread, so apps
* that call back into hammersmith from a callback don't deadlock,
* and so apps can use CPU (e.g. decoding) without slowing down IO.
*
* Choosing OrderedMemoryAwareThreadPoolExecutor would keep
* all ReplyMessage, plus for example a channel closed message,
* in order. (The ordering is per-channel, not global.)
* However, by going unordered, we can decode replies and
* run app callbacks in parallel. That seems like a pretty
* big win; otherwise, we can only use one CPU to decode and
* process replies.
* Replies from a connection pool would be in
* undefined order anyhow from the app's perspective,
* since the app doesn't know which socket the request
* went to.
*/
private val appCallbackExecutor =
new MemoryAwareThreadPoolExecutor(Runtime.getRuntime.availableProcessors * 2, /* core and max threads */
1024 * 1024 * 64, /* max bytes to buffer per channel (should probably be several times max bson size) */
1024 * 1024 * 1024, /* max bytes to buffer per pool */
20, TimeUnit.SECONDS, /* time to keep idle threads alive */
objectSizeEstimator,
ThreadFactories("Hammersmith Reply Handler")) {
/* we don't want a max pool size because it means apps can deadlock by
* using up all threads and then calling back in to hammersmith.
* See ThreadPoolExecutor docs for core vs. max size.
* http://download.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html
* The constructor arg sets both core and max so we fix up max here.
*/
setMaximumPoolSize(Int.MaxValue)
}

private val appCallbackExecutionHandler =
new ExecutionHandler(appCallbackExecutor)

def getPipeline = {
val p = Channels.pipeline(new ReplyMessageDecoder(), handler)
val p = Channels.pipeline(new ReplyMessageDecoder(),
appCallbackExecutionHandler,
handler)
p
}
})
Expand Down

0 comments on commit 48c6d4b

Please sign in to comment.