Permalink
Browse files

Use a plain ThreadPoolExecutor instead of the limited-resources netty…

… ones

Limiting memory or number of threads can create deadlocks.
Unfortunately it can also lead to high resource usage, but
only in the case where you would have deadlocked anyway,
probably, so using some memory probably beats locking up.
  • Loading branch information...
1 parent 48c6d4b commit 16fb3103851af00d6aa7780c182169e68c41aa45 @havocp committed Jul 20, 2011
Showing with 27 additions and 35 deletions.
  1. +27 −35 mongo-driver/src/main/scala/MongoConnection.scala
@@ -19,7 +19,6 @@ package com.mongodb.async
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import java.net.InetSocketAddress
-
import org.bson._
import org.bson.collection._
import org.jboss.netty.bootstrap.ClientBootstrap
@@ -31,9 +30,8 @@ import com.mongodb.async.wire._
import scala.collection.JavaConversions._
import com.mongodb.async.futures._
import org.jboss.netty.buffer._
-
import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{ ConcurrentHashMap, Executors, TimeUnit }
+import java.util.concurrent.{ ConcurrentHashMap, Executors, SynchronousQueue, ThreadPoolExecutor, TimeUnit }
import scala.collection.mutable.{ ConcurrentMap, WeakHashMap }
import com.mongodb.async.util.{ ConcurrentQueue, CursorCleaningTimer }
import org.bson.types.ObjectId
@@ -73,50 +71,44 @@ abstract class MongoConnection extends Logging {
protected implicit val bootstrap = new ClientBootstrap(channelFactory)
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- // objectSizeEstimator tells netty how to compute size of ReplyMessage
- // sitting in the pipeline
- private val objectSizeEstimator = new DefaultObjectSizeEstimator {
- override def estimateSize(o: AnyRef): Int = {
- o match {
- case r: ReplyMessage =>
- r.header.messageLength
- case _ =>
- super.estimateSize(o)
- }
- }
- }
-
/* The executor ensures that we use more than one thread, so apps
* that call back into hammersmith from a callback don't deadlock,
* and so apps can use CPU (e.g. decoding) without slowing down IO.
*
- * Choosing OrderedMemoryAwareThreadPoolExecutor would keep
- * all ReplyMessage, plus for example a channel closed message,
- * in order. (The ordering is per-channel, not global.)
- * However, by going unordered, we can decode replies and
+ * This executor does nothing to preserve order of message
+ * processing.
+ * By going unordered, we can decode replies and
* run app callbacks in parallel. That seems like a pretty
* big win; otherwise, we can only use one CPU to decode and
* process replies.
- * Replies from a connection pool would be in
+ * (Replies from a connection pool would be in
* undefined order anyhow from the app's perspective,
* since the app doesn't know which socket the request
- * went to.
+ * went to.)
+ *
+ * Netty comes with a MemoryAwareThreadPoolExecutor and
+ * OrderedMemoryAwareThreadPoolExecutor. These have
+ * two problems. First, they use a queue,
+ * which means they never go above CorePoolSize
+ * (the queue has a fixed upper limit). This would create
+ * a deadlock if CorePoolSize threads are busy and the app
+ * calls back to Hammersmith. Moreover, the memory limit
+ * can create a deadlock if it stops accepting more messages
+ * and the app calls back to Hammersmith. Basically we can never
+ * stop processing messages or there's a deadlock.
+ * So we don't use the executors from Netty, instead using a plain
+ * ThreadPoolExecutor.
+ *
+ * Actors could be better than threads, in the future. Unlike
+ * invoking a callback, sending a message to an actor should not
+ * tie up the pipeline and risk deadlock.
*/
private val appCallbackExecutor =
- new MemoryAwareThreadPoolExecutor(Runtime.getRuntime.availableProcessors * 2, /* core and max threads */
- 1024 * 1024 * 64, /* max bytes to buffer per channel (should probably be several times max bson size) */
- 1024 * 1024 * 1024, /* max bytes to buffer per pool */
+ new ThreadPoolExecutor(Runtime.getRuntime.availableProcessors * 2, /* core pool size */
+ Int.MaxValue, /* max pool size (must be infinite to avoid deadlocks) */
20, TimeUnit.SECONDS, /* time to keep idle threads alive */
- objectSizeEstimator,
- ThreadFactories("Hammersmith Reply Handler")) {
- /* we don't want a max pool size because it means apps can deadlock by
- * using up all threads and then calling back in to hammersmith.
- * See ThreadPoolExecutor docs for core vs. max size.
- * http://download.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html
- * The constructor arg sets both core and max so we fix up max here.
- */
- setMaximumPoolSize(Int.MaxValue)
- }
+ new SynchronousQueue[Runnable], /* queue that doesn't queue; we must make a thread, or we could deadlock */
+ ThreadFactories("Hammersmith Reply Handler"))
private val appCallbackExecutionHandler =
new ExecutionHandler(appCallbackExecutor)

0 comments on commit 16fb310

Please sign in to comment.