From 48c6d4b486357d9af17bf4ee5f8042a4944c41bf Mon Sep 17 00:00:00 2001 From: Havoc Pennington Date: Tue, 19 Jul 2011 20:23:58 -0400 Subject: [PATCH] Add an execution handler to run app callbacks in their own thread --- .../src/main/scala/MongoConnection.scala | 56 ++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/mongo-driver/src/main/scala/MongoConnection.scala b/mongo-driver/src/main/scala/MongoConnection.scala index f4d83f8..b2fdf78 100644 --- a/mongo-driver/src/main/scala/MongoConnection.scala +++ b/mongo-driver/src/main/scala/MongoConnection.scala @@ -24,6 +24,8 @@ import org.bson._ import org.bson.collection._ import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.channel._ +import org.jboss.netty.handler.execution._ +import org.jboss.netty.util._ import java.nio.ByteOrder import com.mongodb.async.wire._ import scala.collection.JavaConversions._ @@ -31,7 +33,7 @@ import com.mongodb.async.futures._ import org.jboss.netty.buffer._ import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.{ ConcurrentHashMap, Executors } +import java.util.concurrent.{ ConcurrentHashMap, Executors, TimeUnit } import scala.collection.mutable.{ ConcurrentMap, WeakHashMap } import com.mongodb.async.util.{ ConcurrentQueue, CursorCleaningTimer } import org.bson.types.ObjectId @@ -71,8 +73,58 @@ abstract class MongoConnection extends Logging { protected implicit val bootstrap = new ClientBootstrap(channelFactory) bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + // objectSizeEstimator tells netty how to compute size of ReplyMessage + // sitting in the pipeline + private val objectSizeEstimator = new DefaultObjectSizeEstimator { + override def estimateSize(o: AnyRef): Int = { + o match { + case r: ReplyMessage => + r.header.messageLength + case _ => + super.estimateSize(o) + } + } + } + + /* The executor ensures that we use more than one thread, so apps + * that call back into hammersmith from a callback don't deadlock, + * and so apps can use CPU (e.g. decoding) without slowing down IO. + * + * Choosing OrderedMemoryAwareThreadPoolExecutor would keep + * all ReplyMessage, plus for example a channel closed message, + * in order. (The ordering is per-channel, not global.) + * However, by going unordered, we can decode replies and + * run app callbacks in parallel. That seems like a pretty + * big win; otherwise, we can only use one CPU to decode and + * process replies. + * Replies from a connection pool would be in + * undefined order anyhow from the app's perspective, + * since the app doesn't know which socket the request + * went to. + */ + private val appCallbackExecutor = + new MemoryAwareThreadPoolExecutor(Runtime.getRuntime.availableProcessors * 2, /* core and max threads */ + 1024 * 1024 * 64, /* max bytes to buffer per channel (should probably be several times max bson size) */ + 1024 * 1024 * 1024, /* max bytes to buffer per pool */ + 20, TimeUnit.SECONDS, /* time to keep idle threads alive */ + objectSizeEstimator, + ThreadFactories("Hammersmith Reply Handler")) { + /* we don't want a max pool size because it means apps can deadlock by + * using up all threads and then calling back in to hammersmith. + * See ThreadPoolExecutor docs for core vs. max size. + * http://download.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html + * The constructor arg sets both core and max so we fix up max here. + */ + setMaximumPoolSize(Int.MaxValue) + } + + private val appCallbackExecutionHandler = + new ExecutionHandler(appCallbackExecutor) + def getPipeline = { - val p = Channels.pipeline(new ReplyMessageDecoder(), handler) + val p = Channels.pipeline(new ReplyMessageDecoder(), + appCallbackExecutionHandler, + handler) p } })