Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

[split] finagle: daemonize worker & timer threads; warn of unclosed c…

…lients
  • Loading branch information...
commit cd1c3e718f6762adef8f12badf0b39e6be119616 1 parent 7f119f4
@mariusae mariusae authored
View
19 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
@@ -108,8 +108,8 @@ object ClientBuilder {
new ReferenceCountedChannelFactory(
new LazyRevivableChannelFactory(() =>
new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(new NamedPoolThreadFactory("FinagleClientBoss")),
- Executors.newCachedThreadPool(new NamedPoolThreadFactory("FinagleClientIO"))
+ Executors.newCachedThreadPool(new NamedPoolThreadFactory("FinagleClientBoss", true)),
+ Executors.newCachedThreadPool(new NamedPoolThreadFactory("FinagleClientIO", true))
)
)
)
@@ -503,20 +503,20 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
/**
* Encrypt the connection with SSL. The Engine to use can be passed into the client.
- * This allows the user to use client certificates
+ * This allows the user to use client certificates
* No SSL Hostname Validation is performed
*/
def tls(sslContext : SSLContext): This =
- withConfig(_.copy(_tls = Some({ () => Ssl.client(sslContext) }, None)))
-
+ withConfig(_.copy(_tls = Some({ () => Ssl.client(sslContext) }, None)))
+
/**
* Encrypt the connection with SSL. The Engine to use can be passed into the client.
- * This allows the user to use client certificates
+ * This allows the user to use client certificates
* SSL Hostname Validation is performed, on the passed in hostname
*/
def tls(sslContext : SSLContext, hostname : Option[String]): This =
- withConfig(_.copy(_tls = Some({ () => Ssl.client(sslContext) }, hostname)))
-
+ withConfig(_.copy(_tls = Some({ () => Ssl.client(sslContext) }, hostname)))
+
/**
* Do not perform TLS validation. Probably dangerous.
*/
@@ -761,6 +761,9 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
factory = tracingFilter(tracer) andThen factory
factory = codec.prepareServiceFactory(factory)
+ Registry.add(factory, config.toString)
+ closeNotifier onClose Registry.del(factory)
+
factory
}
View
23 finagle-core/src/main/scala/com/twitter/finagle/builder/Registry.scala
@@ -0,0 +1,23 @@
+package com.twitter.finagle.builder
+
+import java.util.{Collections, IdentityHashMap}
+import scala.collection.JavaConverters._
+
+object Registry {
+ private[this] val active = Collections.synchronizedMap(new IdentityHashMap[Object, String])
+
+ Runtime.getRuntime().addShutdownHook(new Thread {
+ override def run() {
+ for (desc <- active.asScala.values)
+ System.err.printf("[Finagle/not closed] %s\n", desc)
+ }
+ })
+
+ def add(x: Object, desc: String) = {
+ active.put(x, desc)
+ }
+
+ def del(x: Object) = {
+ active.remove(x)
+ }
+}
View
100 finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
@@ -65,8 +65,8 @@ object ServerBuilder {
new ReferenceCountedChannelFactory(
new LazyRevivableChannelFactory(() =>
new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(new NamedPoolThreadFactory("FinagleServerBoss")),
- Executors.newCachedThreadPool(new NamedPoolThreadFactory("FinagleServerIO"))
+ Executors.newCachedThreadPool(new NamedPoolThreadFactory("FinagleServerBoss", true)),
+ Executors.newCachedThreadPool(new NamedPoolThreadFactory("FinagleServerIO", true))
)
)
)
@@ -623,56 +623,56 @@ private[builder] class MkServer[Req, Rep](
}
})
- def apply(): Server = {
- val serverChannel = bootstrap.bind(config.bindTo)
-
+ def apply(): Server = new Server {
+ private[this] val serverChannel = bootstrap.bind(config.bindTo)
Timer.register(closeNotifier)
- new Server {
- def close(timeout: Duration = Duration.MaxValue) = {
- // According to NETTY-256, the following sequence of operations
- // has no race conditions.
- //
- // - close the server socket (awaitUninterruptibly)
- // - close all open channels (awaitUninterruptibly)
- // - releaseExternalResources
- //
- // We modify this a little bit, to allow for graceful draining,
- // closing open channels only after the grace period.
- //
- // The next step here is to do a half-closed socket: we want to
- // suspend reading, but not writing to a socket. This may be
- // important for protocols that do any pipelining, and may
- // queue in their codecs.
-
- // On cursory inspection of the relevant Netty code, this
- // should never block (it is little more than a close() syscall
- // on the FD).
- serverChannel.close().awaitUninterruptibly()
-
- // At this point, no new channels may be created.
- for (h <- activeHandlers.toArray)
- h.drain()
-
- // Wait for all channels to shut down.
- Future.join(activeHandlers map(_.onShutdown) toSeq).get(timeout)
-
- // Force close any remaining connections. Don't wait for
- // success. Buffer channels into an array to avoid
- // deadlocking.
-
- for (h <- activeHandlers.toArray)
- h.close()
-
- bootstrap.releaseExternalResources()
-
- // Notify all registered resources that service is closing so they can
- // perform their own cleanup.
- closer.close()
- }
+ Registry.add(this, config.toString)
+ closeNotifier onClose Registry.del(this)
+
+ def close(timeout: Duration = Duration.MaxValue) = {
+ // According to NETTY-256, the following sequence of operations
+ // has no race conditions.
+ //
+ // - close the server socket (awaitUninterruptibly)
+ // - close all open channels (awaitUninterruptibly)
+ // - releaseExternalResources
+ //
+ // We modify this a little bit, to allow for graceful draining,
+ // closing open channels only after the grace period.
+ //
+ // The next step here is to do a half-closed socket: we want to
+ // suspend reading, but not writing to a socket. This may be
+ // important for protocols that do any pipelining, and may
+ // queue in their codecs.
+
+ // On cursory inspection of the relevant Netty code, this
+ // should never block (it is little more than a close() syscall
+ // on the FD).
+ serverChannel.close().awaitUninterruptibly()
+
+ // At this point, no new channels may be created.
+ for (h <- activeHandlers.toArray)
+ h.drain()
+
+ // Wait for all channels to shut down.
+ Future.join(activeHandlers map(_.onShutdown) toSeq).get(timeout)
+
+ // Force close any remaining connections. Don't wait for
+ // success. Buffer channels into an array to avoid
+ // deadlocking.
+
+ for (h <- activeHandlers.toArray)
+ h.close()
+
+ bootstrap.releaseExternalResources()
+
+ // Notify all registered resources that service is closing so they can
+ // perform their own cleanup.
+ closer.close()
+ }
- def localAddress: SocketAddress = serverChannel.getLocalAddress()
+ def localAddress: SocketAddress = serverChannel.getLocalAddress()
- override def toString = "Server(%s)".format(config.toString)
- }
+ override def toString = "Server(%s)".format(config.toString)
}
}
View
3  finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala
@@ -38,7 +38,8 @@ private[finagle] object Timer {
// threads themselves.)
implicit val default: ReferenceCountedTimer = {
def factory() = {
- val underlying = new Timer(new HashedWheelTimer(10, TimeUnit.MILLISECONDS))
+ val underlying = new Timer(new HashedWheelTimer(
+ new NamedPoolThreadFactory("FinagleTimer", true), 10, TimeUnit.MILLISECONDS))
new ThreadStoppingTimer(underlying, timerStoppingExecutor)
}
Please sign in to comment.
Something went wrong with that request. Please try again.