Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' of https://github.com/twitter/finagle

  • Loading branch information...
commit 0ea4d76315c3ed0f94d8585f029130e9eddbb114 2 parents 9d0b0cd + feb5c35
@bmatheny authored
Showing with 453 additions and 155 deletions.
  1. +9 −1 ChangeLog
  2. +1 −1  finagle-core/src/main/scala/com/twitter/finagle/Exceptions.scala
  3. +5 −0 finagle-core/src/main/scala/com/twitter/finagle/Filter.scala
  4. +20 −0 finagle-core/src/main/scala/com/twitter/finagle/Service.scala
  5. +79 −50 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
  6. +12 −1 finagle-core/src/main/scala/com/twitter/finagle/channel/BrokerChannelHandler.scala
  7. +24 −2 finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala
  8. +7 −2 finagle-core/src/main/scala/com/twitter/finagle/service/ExpiringService.scala
  9. +33 −9 finagle-core/src/main/scala/com/twitter/finagle/service/FailureAccrualFactory.scala
  10. +2 −2 finagle-core/src/main/scala/com/twitter/finagle/service/TimeoutFilter.scala
  11. +16 −1 finagle-core/src/test/scala/com/twitter/finagle/channel/ServiceToChannelHandlerSpec.scala
  12. +1 −1  finagle-core/src/test/scala/com/twitter/finagle/service/ExpiringServiceSpec.scala
  13. +14 −11 finagle-core/src/test/scala/com/twitter/finagle/service/FailureAccrualFactorySpec.scala
  14. +1 −1  finagle-kestrel/src/test/scala/com/twitter/finagle/kestrel/unit/MultiReader.scala
  15. +0 −4 finagle-memcached/src/main/java/com/twitter/finagle/memcached/java/Client.java
  16. +144 −38 finagle-memcached/src/main/scala/com/twitter/finagle/memcached/Client.scala
  17. +69 −15 finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/ClientSpec.scala
  18. +2 −2 project/build.properties
  19. +3 −3 project/build/Project.scala
  20. +3 −3 project/release.properties
  21. +8 −8 project/versions.properties
View
10 ChangeLog
@@ -1,5 +1,13 @@
-??
+1.10.0 2012/01/24
+
* http: Message.withX methods now have parameterized return types
+ * memcached: (Ketama) host ejection
+ * Noisier, more robust monitoring.
+ * zk cluster: avoid potential infinite loop
+ * http streaming: better detection, handling of dead channels
+ * http: always encode output as UTF-8
+ * stream: use offer/broker for duplex stream
+ * redis: imported finagle redis client from Tumblr. not yet published.
1.9.12 2012/01/05
View
2  finagle-core/src/main/scala/com/twitter/finagle/Exceptions.scala
@@ -4,7 +4,7 @@ import java.net.SocketAddress
import com.twitter.util.Duration
-trait SourcedException {
+trait SourcedException extends Exception {
var serviceName: String = "unspecified"
}
View
5 finagle-core/src/main/scala/com/twitter/finagle/Filter.scala
@@ -99,6 +99,11 @@ abstract class SimpleFilter[Req, Rep] extends Filter[Req, Rep, Req, Rep]
object Filter {
def identity[Req, Rep] = new SimpleFilter[Req, Rep] {
+ override def andThen[Req2, Rep2](next: Filter[Req, Rep, Req2, Rep2]) = next
+ override def andThen(service: Service[Req, Rep]) = service
+ override def andThen(factory: ServiceFactory[Req, Rep]) = factory
+
def apply(request: Req, service: Service[Req, Rep]) = service(request)
}
}
+
View
20 finagle-core/src/main/scala/com/twitter/finagle/Service.scala
@@ -79,6 +79,17 @@ trait ClientConnection {
def onClose: Future[Unit]
}
+object ClientConnection {
+ val nil: ClientConnection = new ClientConnection {
+ private[this] val unconnected =
+ new SocketAddress { override def toString = "unconnected" }
+ def remoteAddress = unconnected
+ def localAddress = unconnected
+ def close() {}
+ def onClose = new com.twitter.util.Promise[Unit]
+ }
+}
+
/**
* A simple proxy Service that forwards all calls to another Service.
* This is is useful if you to wrap-but-modify an existing service.
@@ -139,3 +150,12 @@ class FactoryToService[Req, Rep](factory: ServiceFactory[Req, Rep])
override def release() = factory.close()
override def isAvailable = factory.isAvailable
}
+
+
+/**
+ * A ServiceFactoryWrapper produces a ServiceFactory given a ServiceFactory through, tradionally
+ * by constructing a composing ServiceFactory.
+ */
+trait ServiceFactoryWrapper {
+ def andThen[Req, Rep](factory: ServiceFactory[Req, Rep]): ServiceFactory[Req, Rep]
+}
View
129 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
@@ -162,7 +162,7 @@ final case class ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionL
private val _logger : Option[Logger] = None,
private val _channelFactory : Option[ReferenceCountedChannelFactory] = None,
private val _tls : Option[(() => Engine, Option[String])] = None,
- private val _failureAccrualParams : Option[(Int, Duration)] = Some(5, 5.seconds),
+ private val _failureAccrual : Option[ServiceFactoryWrapper] = Some(FailureAccrualFactory.wrapper(5, 5.seconds)),
private val _maxOutstandingConnections: Option[Int] = None,
private val _tracerFactory : Tracer.Factory = () => NullTracer,
private val _hostConfig : ClientHostConfig = new ClientHostConfig)
@@ -199,7 +199,7 @@ final case class ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionL
val logger = _logger
val channelFactory = _channelFactory
val tls = _tls
- val failureAccrualParams = _failureAccrualParams
+ val failureAccrual = _failureAccrual
val maxOutstandingConnections = _maxOutstandingConnections
val tracerFactory = _tracerFactory
@@ -228,7 +228,7 @@ final case class ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionL
"logger" -> _logger,
"channelFactory" -> _channelFactory,
"tls" -> _tls,
- "failureAccrualParams" -> _failureAccrualParams,
+ "failureAccrual" -> _failureAccrual,
"maxOutstandingConnections" -> _maxOutstandingConnections,
"tracerFactory" -> Some(_tracerFactory)
)
@@ -254,7 +254,7 @@ final case class ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionL
}
}
-class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] private[builder](
+class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] private[finagle](
config: ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit]
) {
import ClientConfig._
@@ -535,8 +535,15 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
* a host failed. The second paramter specifies how long the host
* is dead for, once marked.
*/
- def failureAccrualParams(params: (Int, Duration)): This =
- withConfig(_.copy(_failureAccrualParams = Some(params)))
+ def failureAccrualParams(params: (Int, Duration)): This = {
+ val filter = FailureAccrualFactory.wrapper(params._1, params._2)
+ failureAccrual(filter)
+ }
+
+ def failureAccrual(failureAccrual: ServiceFactoryWrapper): This = {
+ withConfig(_.copy(_failureAccrual = Some(failureAccrual)))
+ }
+
/**
* Toggle transmission policy to "Fast-Fail", then if an host become unavailable we will fail
@@ -647,7 +654,7 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
future
}
- private[this] lazy val statsReceiver = {
+ private[finagle] lazy val statsReceiver = {
val statsReceiver = config.statsReceiver getOrElse NullStatsReceiver
config.name match {
case Some(name) => statsReceiver.scope(name)
@@ -655,6 +662,8 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
}
}
+ private[this] lazy val tracer = config.tracerFactory()
+
/**
* Construct a ServiceFactory. This is useful for stateful protocols
* (e.g., those that support transactions or authentication).
@@ -702,35 +711,19 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
bs, prepareService(codec) _, hostStatsReceiver)
factory = buildPool(factory, hostStatsReceiver)
- if (config.requestTimeout < Duration.MaxValue) {
- val filter = new TimeoutFilter[Req, Rep](
- config.requestTimeout,
- new IndividualRequestTimeoutException(config.requestTimeout))
-
- factory = filter andThen factory
- }
-
- config.failureAccrualParams foreach { case (numFailures, markDeadFor) =>
- factory = new FailureAccrualFactory(factory, numFailures, markDeadFor)
- }
+ factory = requestTimeoutFilter andThen factory
- config.maxOutstandingConnections foreach { n =>
- factory = new FailFastFactory(factory, n)
- }
+ factory = failureAccrualFactory(factory)
+ factory = failFastFactory(factory)
val statsFilter = new StatsFilter[Req, Rep](hostStatsReceiver)
- val monitorFilter = new MonitorFilter[Req, Rep]({
- config.monitor map { mf =>
- mf(config.name.get)
- } getOrElse NullMonitor
- })
+ factory = statsFilter andThen factory
- factory = monitorFilter andThen statsFilter andThen factory
+ factory = monitorFilter andThen factory
factory
}
- val tracer = config.tracerFactory()
var factory: ServiceFactory[Req, Rep] = if (config.cluster.get.isInstanceOf[SocketAddressCluster]) {
new HeapBalancer(hostFactories, statsReceiver.scope("loadbalancer"))
{
@@ -761,11 +754,7 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
*/
factory = new RefcountedFactory(factory)
- if (config.connectTimeout < Duration.MaxValue)
- factory = new TimeoutFactory(
- factory,
- config.connectTimeout,
- new ServiceTimeoutException(config.connectTimeout))
+ factory = connectTimeoutFactory(factory)
// We maintain a separate log of factory failures here so that
// factory failures are captured in the service failure
@@ -773,7 +762,7 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
// requests are never dispatched to the underlying stack, they
// don't get recorded there.
factory = new StatsFactoryWrapper(factory, statsReceiver)
- factory = (new TracingFilter(tracer)) andThen factory
+ factory = tracingFilter andThen factory
factory = codec.prepareFactory(factory)
factory
@@ -788,21 +777,12 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
): Service[Req, Rep] = {
var service: Service[Req, Rep] = new FactoryToService[Req, Rep](internalBuildFactory())
- // We keep the retrying filter at the very bottom: this allows us
- // to retry across multiple hosts, etc.
- config.retryPolicy foreach { retryPolicy =>
- val filter = new RetryingFilter[Req, Rep](retryPolicy, Timer.default, statsReceiver)
- service = filter andThen service
- }
-
- if (config.timeout < Duration.MaxValue) {
- val filter = new TimeoutFilter[Req, Rep](
- config.timeout,
- new GlobalRequestTimeoutException(config.timeout))
- service = filter andThen service
- }
-
- exceptionSourceFilter andThen service
+ // We keep the retrying filter after the load balancer so we can
+ // retry across different hosts rather than the same one repeatedly.
+ service = retryFilter andThen service
+ service = globalTimeoutFilter andThen service
+ service = exceptionSourceFilter andThen service
+ service
}
/**
@@ -819,5 +799,54 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
def unsafeBuildFactory(): ServiceFactory[Req, Rep] =
withConfig(_.validated).buildFactory()
- private[this] def exceptionSourceFilter = new ExceptionSourceFilter[Req, Rep](config.name.get)
+
+ protected def failureAccrualFactory(factory: ServiceFactory[Req, Rep]) =
+ config.failureAccrual map { _ andThen factory } getOrElse(factory)
+
+ protected def failFastFactory(factory: ServiceFactory[Req, Rep]) =
+ config.maxOutstandingConnections map { n =>
+ new FailFastFactory(factory, n)
+ } getOrElse(factory)
+
+ protected def monitorFilter =
+ config.monitor map { monitorFactory =>
+ new MonitorFilter[Req, Rep](monitorFactory(config.name.get))
+ } getOrElse(identityFilter)
+
+ protected def connectTimeoutFactory(factory: ServiceFactory[Req, Rep]) =
+ if (config.connectTimeout < Duration.MaxValue) {
+ val exception = new ServiceTimeoutException(config.connectTimeout)
+ new TimeoutFactory(factory, config.connectTimeout, exception)
+ } else {
+ factory
+ }
+
+ protected def exceptionSourceFilter = new ExceptionSourceFilter[Req, Rep](config.name.get)
+
+ protected def retryFilter =
+ config.retryPolicy map { retryPolicy =>
+ new RetryingFilter[Req, Rep](retryPolicy, Timer.default, statsReceiver)
+ } getOrElse(identityFilter)
+
+
+ protected def requestTimeoutFilter =
+ if (config.requestTimeout < Duration.MaxValue) {
+ val exception = new IndividualRequestTimeoutException(config.requestTimeout)
+ new TimeoutFilter[Req, Rep](config.requestTimeout, exception)
+ } else {
+ identityFilter
+ }
+
+ protected def globalTimeoutFilter =
+ if (config.timeout < Duration.MaxValue) {
+ val exception = new GlobalRequestTimeoutException(config.timeout)
+ new TimeoutFilter[Req, Rep](config.timeout, exception)
+ } else {
+ identityFilter
+ }
+
+ protected def tracingFilter = new TracingFilter[Req, Rep](tracer)
+
+ protected val identityFilter = Filter.identity[Req, Rep]
}
+
View
13 finagle-core/src/main/scala/com/twitter/finagle/channel/BrokerChannelHandler.scala
@@ -115,8 +115,19 @@ class BrokerChannelHandler extends SimpleChannelHandler {
upstreamBroker ! Message(e, ctx)
}
+
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
- upstreamBroker ! Exception(e, ctx)
+ // Exceptions are special: we always want to make sure we handle
+ // them, so we're stricter: the receiver must synchronize immediately,
+ // otherwise we proxy it upstream.
+ //
+ // This makes sure that exceptions always get propagated, even if
+ // the channel handler process has died (eg. it threw an unhandled
+ // exception).
+ val of = upstreamBroker.send(Exception(e, ctx)) orElse Offer.const {
+ super.exceptionCaught(ctx, e)
+ }
+ of.sync()
}
override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
View
26 finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala
@@ -22,6 +22,22 @@ private[finagle] object ServiceToChannelHandler {
type State = Value
val Idle, Busy, Draining, Shutdown = Value
}
+
+ private[ServiceToChannelHandler] def severity(exc: Throwable) = exc match {
+ case
+ _: java.nio.channels.ClosedChannelException
+ | _: javax.net.ssl.SSLException
+ | _: ReadTimeoutException
+ | _: WriteTimedOutException
+ | _: javax.net.ssl.SSLException => Level.FINEST
+ case e: java.io.IOException if (
+ e.getMessage == "Connection reset by peer" ||
+ e.getMessage == "Broken pipe" ||
+ e.getMessage == "Connection timed out" ||
+ e.getMessage == "No route to host"
+ ) => Level.FINEST
+ case _ => Level.WARNING
+ }
}
private[finagle] class ServiceToChannelHandler[Req, Rep](
@@ -36,11 +52,17 @@ private[finagle] class ServiceToChannelHandler[Req, Rep](
import ServiceToChannelHandler._
import State._
+ @volatile private[this] var clientConnection: ClientConnection = ClientConnection.nil
private[this] val state = new AtomicReference[State](Idle)
private[this] val onShutdownPromise = new Promise[Unit]
private[this] val monitor =
parentMonitor andThen Monitor.mk {
- case _ =>
+ case e =>
+ val msg = "Unhandled exception in connection with " +
+ clientConnection.remoteAddress.toString +
+ " , shutting down connection"
+
+ log.log(severity(e), msg, e)
shutdown()
true
}
@@ -124,7 +146,7 @@ private[finagle] class ServiceToChannelHandler[Req, Rep](
protected def channelConnected(ctx: ChannelHandlerContext, _onClose: Future[Unit]) {
val channel = ctx.getChannel
- val clientConnection = new ClientConnection {
+ clientConnection = new ClientConnection {
def remoteAddress = channel.getRemoteAddress
def localAddress = channel.getLocalAddress
def close() { channel.disconnect() }
View
9 finagle-core/src/main/scala/com/twitter/finagle/service/ExpiringService.scala
@@ -8,6 +8,8 @@ import com.twitter.finagle.{
ChannelClosedException, Service, ServiceClosedException,
ServiceProxy, WriteException}
import com.twitter.finagle.stats.{Counter, StatsReceiver, NullStatsReceiver}
+import java.util.concurrent.atomic.AtomicBoolean
+
/**
* A service wrapper that expires the self service after a
* certain amount of idle time. By default, expiring calls
@@ -29,6 +31,7 @@ class ExpiringService[Req, Rep](
private[this] val lifeCounter = stats.counter("lifetime")
private[this] var idleTask = startTimer(maxIdleTime, idleCounter)
private[this] var lifeTask = startTimer(maxLifeTime, lifeCounter)
+ private[this] val wasReleased = new AtomicBoolean(false)
private[this] def startTimer(duration: Option[Duration], counter: Counter) =
duration map { t: Duration =>
@@ -56,7 +59,8 @@ class ExpiringService[Req, Rep](
}
def expired(): Unit = {
- super.release()
+ if (wasReleased.compareAndSet(false, true))
+ super.release()
}
override def apply(req: Req): Future[Rep] = {
@@ -86,6 +90,7 @@ class ExpiringService[Req, Rep](
override def release() {
deactivate()
- super.release()
+ if (wasReleased.compareAndSet(false, true))
+ super.release()
}
}
View
42 finagle-core/src/main/scala/com/twitter/finagle/service/FailureAccrualFactory.scala
@@ -1,7 +1,17 @@
package com.twitter.finagle.service
-import com.twitter.util.{Time, Duration, Throw, Return}
-import com.twitter.finagle.{Service, ServiceFactory}
+import com.twitter.util.{Time, Duration, Throw, Return, TimerTask, Timer}
+import com.twitter.finagle.{Service, ServiceFactory, ServiceFactoryWrapper}
+import com.twitter.finagle.util.{Timer => FinagleTimer}
+
+object FailureAccrualFactory {
+ def wrapper(numFailures: Int, markDeadFor: Duration): ServiceFactoryWrapper = {
+ new ServiceFactoryWrapper {
+ def andThen[Req, Rep](factory: ServiceFactory[Req, Rep]) =
+ new FailureAccrualFactory(factory, numFailures, markDeadFor)
+ }
+ }
+}
/**
* A factory that does failure accrual, marking it unavailable when
@@ -13,21 +23,36 @@ import com.twitter.finagle.{Service, ServiceFactory}
class FailureAccrualFactory[Req, Rep](
underlying: ServiceFactory[Req, Rep],
numFailures: Int,
- markDeadFor: Duration)
+ markDeadFor: Duration,
+ timer: Timer = FinagleTimer.default)
extends ServiceFactory[Req, Rep]
{
+
private[this] var failureCount = 0
- private[this] var failedAt = Time.epoch
+ @volatile private[this] var markedDead = false
+ private[this] var reviveTimerTask: Option[TimerTask] = None
private[this] def didFail() = synchronized {
failureCount += 1
- if (failureCount >= numFailures)
- failedAt = Time.now
+ if (failureCount >= numFailures) markDead()
}
private[this] def didSucceed() = synchronized {
failureCount = 0
- failedAt = Time.epoch
+ }
+
+ protected def markDead() = synchronized {
+ if (!markedDead) {
+ markedDead = true
+ val timerTask = timer.schedule(markDeadFor.fromNow) { revive() }
+ reviveTimerTask = Some(timerTask)
+ }
+ }
+
+ protected def revive() = synchronized {
+ markedDead = false
+ reviveTimerTask foreach { _.cancel() }
+ reviveTimerTask = None
}
def make() =
@@ -48,8 +73,7 @@ class FailureAccrualFactory[Req, Rep](
}
} onFailure { _ => didFail() }
- override def isAvailable =
- underlying.isAvailable && synchronized { failedAt.untilNow >= markDeadFor }
+ override def isAvailable = !markedDead && underlying.isAvailable
override def close() = underlying.close()
View
4 finagle-core/src/main/scala/com/twitter/finagle/service/TimeoutFilter.scala
@@ -1,7 +1,7 @@
package com.twitter.finagle.service
import com.twitter.finagle.{
- Filter, Service, RequestTimeoutException, IndividualRequestTimeoutException}
+ SimpleFilter, Service, RequestTimeoutException, IndividualRequestTimeoutException}
import com.twitter.finagle.util.Timer
import com.twitter.finagle.tracing.Trace
import com.twitter.util
@@ -15,7 +15,7 @@ class TimeoutFilter[Req, Rep](
timeout: Duration,
exception: RequestTimeoutException,
timer: util.Timer = Timer.default)
- extends Filter[Req, Rep, Req, Rep] {
+ extends SimpleFilter[Req, Rep] {
def this(timeout: Duration) = this(timeout, new IndividualRequestTimeoutException(timeout))
def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
View
17 finagle-core/src/test/scala/com/twitter/finagle/channel/ServiceToChannelHandlerSpec.scala
@@ -6,6 +6,7 @@ import org.specs.Specification
import org.specs.mock.Mockito
import org.mockito.{Matchers, ArgumentCaptor}
+import java.net.InetSocketAddress
import org.jboss.netty.channel.{
ChannelHandlerContext, MessageEvent, Channel,
ChannelPipeline, DownstreamMessageEvent,
@@ -34,6 +35,9 @@ object ServiceToChannelHandlerSpec extends Specification with Mockito {
channel.close returns closeFuture
channel.isOpen returns true
channel.getCloseFuture returns closeFuture
+ val address = mock[InetSocketAddress]
+ address.toString returns "ADDRESS"
+ channel.getRemoteAddress returns address
val ctx = mock[ChannelHandlerContext]
channel.getPipeline returns pipeline
ctx.getChannel returns channel
@@ -87,7 +91,18 @@ object ServiceToChannelHandlerSpec extends Specification with Mockito {
handler.exceptionCaught(mock[ChannelHandlerContext], e)
there was one(service).release()
there was one(channel).close()
- org.mockito.Mockito.verifyZeroInteractions(log)
+ there was one(log).log(Level.WARNING, "Unhandled exception in connection with ADDRESS , shutting down connection", exc)
+ }
+
+ "a close exception was caught by Netty" in {
+ val exc = new java.nio.channels.ClosedChannelException
+ val e = mock[ExceptionEvent]
+ e.getCause returns exc
+ handler.exceptionCaught(mock[ChannelHandlerContext], e)
+ there was one(service).release()
+ there was one(channel).close()
+ there was no(log).log(Level.WARNING, "Unhandled exception in connection with ADDRESS , shutting down connection", exc)
+ there was one(log).log(Level.FINEST, "Unhandled exception in connection with ADDRESS , shutting down connection", exc)
}
"when the service handler throws (encoded)" in {
View
2  finagle-core/src/test/scala/com/twitter/finagle/service/ExpiringServiceSpec.scala
@@ -46,7 +46,7 @@ object ExpiringServiceSpec extends Specification with Mockito {
// Now attempt to release it once more:
service.release()
- there were two(underlying).release()
+ there was one(underlying).release()
}
}
View
25 finagle-core/src/test/scala/com/twitter/finagle/service/FailureAccrualFactorySpec.scala
@@ -7,7 +7,7 @@ import org.mockito.{Matchers, ArgumentCaptor}
import com.twitter.util.{Time, Future}
import com.twitter.conversions.time._
-import com.twitter.finagle.{Service, ServiceFactory}
+import com.twitter.finagle.{Service, ServiceFactory, MockTimer}
object FailureAccrualFactorySpec extends Specification with Mockito {
"a failing service" should {
@@ -19,7 +19,8 @@ object FailureAccrualFactorySpec extends Specification with Mockito {
underlying.isAvailable returns true
underlying.make() returns Future.value(underlyingService)
- val factory = new FailureAccrualFactory[Int, Int](underlying, 3, 10.seconds)
+ val timer = new MockTimer
+ val factory = new FailureAccrualFactory[Int, Int](underlying, 3, 10.seconds, timer)
val service = factory.make()()
there was one(underlying).make()
@@ -39,7 +40,7 @@ object FailureAccrualFactorySpec extends Specification with Mockito {
there were three(underlyingService)(123)
}
}
-
+
"be revived (for one request) after the markDeadFor duration" in {
Time.withCurrentTimeFrozen { timeControl =>
service(123)() must throwA[Exception]
@@ -49,6 +50,7 @@ object FailureAccrualFactorySpec extends Specification with Mockito {
service.isAvailable must beFalse
timeControl.advance(10.seconds)
+ timer.tick()
// Healthy again!
factory.isAvailable must beTrue
@@ -61,7 +63,7 @@ object FailureAccrualFactorySpec extends Specification with Mockito {
service.isAvailable must beFalse
}
}
-
+
"reset failure counters after an individual success" in {
Time.withCurrentTimeFrozen { timeControl =>
service(123)() must throwA[Exception]
@@ -71,6 +73,7 @@ object FailureAccrualFactorySpec extends Specification with Mockito {
service.isAvailable must beFalse
timeControl.advance(10.seconds)
+ timer.tick()
// Healthy again!
factory.isAvailable must beTrue
@@ -86,16 +89,16 @@ object FailureAccrualFactorySpec extends Specification with Mockito {
// Counts are now reset.
underlyingService(123) returns Future.exception(new Exception)
- service(123)() must throwA[Exception]
+ service(123)() must throwA[Exception]
factory.isAvailable must beTrue
service.isAvailable must beTrue
- service(123)() must throwA[Exception]
+ service(123)() must throwA[Exception]
factory.isAvailable must beTrue
service.isAvailable must beTrue
- service(123)() must throwA[Exception]
+ service(123)() must throwA[Exception]
factory.isAvailable must beFalse
service.isAvailable must beFalse
- }
+ }
}
}
@@ -134,14 +137,14 @@ object FailureAccrualFactorySpec extends Specification with Mockito {
underlying.isAvailable returns true
underlying.make() returns Future.exception(new Exception("i broked :-("))
val factory = new FailureAccrualFactory[Int, Int](underlying, 3, 10.seconds)
-
+
"fail after the given number of tries" in {
Time.withCurrentTimeFrozen { timeControl =>
factory.isAvailable must beTrue
factory.make()() must throwA[Exception]
- factory.isAvailable must beTrue
+ factory.isAvailable must beTrue
factory.make()() must throwA[Exception]
- factory.isAvailable must beTrue
+ factory.isAvailable must beTrue
factory.make()() must throwA[Exception]
factory.isAvailable must beFalse
}
View
2  finagle-kestrel/src/test/scala/com/twitter/finagle/kestrel/unit/MultiReader.scala
@@ -56,8 +56,8 @@ object MultiReaderSpec extends Specification with Mockito {
val handle = MultiReader(handles)
val messages = new ArrayBuffer[ReadMessage]
- (handle.messages??) must be_==(ms(2))
(handle.messages??) must be_==(ms(0))
+ (handle.messages??) must be_==(ms(2))
(handle.messages??) must be_==(ms(1))
}
View
4 finagle-memcached/src/main/java/com/twitter/finagle/memcached/java/Client.java
@@ -29,10 +29,6 @@ public static Client newInstance(Service<Command, Response> finagleClient) {
return new com.twitter.finagle.memcached.java.ClientBase(schmemcachedClient);
}
- public static KetamaClient newInstance(java.util.Map<KetamaClientKey, com.twitter.finagle.memcached.Client> input) {
- return new com.twitter.finagle.memcached.KetamaClient(input);
- }
-
/**
* Get a key from the server.
*/
View
182 finagle-memcached/src/main/scala/com/twitter/finagle/memcached/Client.scala
@@ -3,16 +3,24 @@ package com.twitter.finagle.memcached
import scala.collection.JavaConversions._
import scala.collection.immutable
+import _root_.java.net.InetSocketAddress
import _root_.java.util.{Map => JMap}
+import _root_.java.util.concurrent.ConcurrentHashMap
-import com.twitter.finagle.{ChannelException, RequestException, ServiceException}
+import com.twitter.conversions.time._
+import com.twitter.finagle.{
+ ChannelException, RequestException, ServiceException,
+ ServiceProxy, ServiceFactory, ServiceFactoryWrapper}
import com.twitter.finagle.builder.{ClientBuilder, ClientConfig}
import com.twitter.finagle.memcached.protocol.text.Memcached
import com.twitter.finagle.memcached.protocol._
import com.twitter.finagle.memcached.util.ChannelBufferUtils._
-import com.twitter.finagle.Service
+import com.twitter.finagle.service.{FailureAccrualFactory, FailedService}
+import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
+import com.twitter.finagle.{Service, ShardNotAvailableException}
import com.twitter.hashing._
-import com.twitter.util.{Time, Future, Bijection}
+import com.twitter.util.{Time, Future, Bijection, Duration}
+import com.twitter.concurrent.{Broker, Offer}
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.buffer.ChannelBuffers
@@ -467,14 +475,12 @@ trait PartitionedClient extends Client {
def cas(key: String, flags: Int, expiry: Time, value: ChannelBuffer, casUnique: ChannelBuffer) =
clientOf(key).cas(key, flags, expiry, value, casUnique)
-
def delete(key: String) = clientOf(key).delete(key)
def incr(key: String, delta: Long) = clientOf(key).incr(key, delta)
def decr(key: String, delta: Long) = clientOf(key).decr(key, delta)
def stats(args: Option[String]): Future[Seq[String]] =
throw new UnsupportedOperationException("No logical way to perform stats without a key")
-
}
object PartitionedClient {
@@ -491,48 +497,118 @@ object PartitionedClient {
}
case class KetamaClientKey(host: String, port: Int, weight: Int) {
- def toTuple = (host, port, weight)
+ private[memcached] def identifier = if (port == 11211) host else host + ":" + port
}
-class KetamaClient(clients: Map[(String, Int, Int), Client], keyHasher: KeyHasher = KeyHasher.KETAMA)
- extends PartitionedClient
-{
- def this(clients: JMap[KetamaClientKey, Client]) =
- this((Map() ++ clients) map { case (k, v) => (k.toTuple, v) })
- require(!clients.isEmpty, "At least one client must be provided")
+sealed abstract trait NodeHealth
+case class NodeMarkedDead(key: KetamaClientKey) extends NodeHealth
+case class NodeRevived(key: KetamaClientKey) extends NodeHealth
+
- private val NUM_REPS = 160
+class KetamaFailureAccrualFactory[Req, Rep](
+ underlying: ServiceFactory[Req, Rep],
+ numFailures: Int,
+ markDeadFor: Duration,
+ key: KetamaClientKey,
+ healthBroker: Broker[NodeHealth]
+) extends FailureAccrualFactory[Req, Rep](underlying, numFailures, markDeadFor) {
- protected val distributor = {
- val nodes = clients.map { case ((ip, port, weight), client) =>
- val identifier = if (port == 11211) ip else ip + ":" + port
- KetamaNode(identifier, weight, client)
- }.toList
- new KetamaDistributor(nodes, NUM_REPS)
+ override def markDead() = {
+ super.markDead()
+ healthBroker ! NodeMarkedDead(key)
}
- protected[memcached] def clientOf(key: String) = {
- distributor.nodeForHash(keyHasher.hashKey(key))
+ override def revive() = {
+ super.revive()
+ healthBroker ! NodeRevived(key)
+ }
+}
+
+object KetamaClient {
+ val NumReps = 160
+ private val shardNotAvailableDistributor = {
+ val failedService = new FailedService[Command, Response](new ShardNotAvailableException)
+ new SingletonDistributor(Client(failedService))
+ }
+}
+
+class KetamaClient private[memcached](
+ services: Map[KetamaClientKey, Service[Command, Response]],
+ health: Offer[NodeHealth],
+ keyHasher: KeyHasher,
+ numReps: Int,
+ statsReceiver: StatsReceiver = NullStatsReceiver
+) extends PartitionedClient {
+ require(!services.isEmpty, "At least one service must be provided")
+
+ private[this] val nodes = services map { case (clientKey, service) =>
+ clientKey -> KetamaNode(clientKey.identifier, clientKey.weight, Client(service))
+ } toMap
+
+ private[this] val pristineDistributor = buildDistributor(nodes.values toSeq)
+ @volatile private[this] var currentDistributor: Distributor[Client] = pristineDistributor
+
+ private[this] val ejected = new ConcurrentHashMap[Client, KetamaClientKey]
+ private[this] def liveNodes = (nodes -- ejected.values).values toSeq
+
+ private[this] val liveNodeGauge = statsReceiver.addGauge("live_nodes") { nodes.size - ejected.size }
+ private[this] val deadNodeGauge = statsReceiver.addGauge("dead_nodes") { ejected.size }
+ private[this] val ejectionCount = statsReceiver.counter("ejections")
+ private[this] val revivalCount = statsReceiver.counter("revivals")
+
+ private[this] def buildDistributor(nodes: Seq[KetamaNode[Client]]) =
+ new KetamaDistributor(nodes, numReps)
+
+ health foreach {
+ case NodeMarkedDead(key) =>
+ ejectNode(key)
+ ejectionCount.incr()
+ case NodeRevived(key) =>
+ reviveNode(key)
+ revivalCount.incr()
+ }
+
+ override def clientOf(key: String): Client = {
+ val hash = keyHasher.hashKey(key)
+ currentDistributor.nodeForHash(hash)
+ }
+
+ private[this] def rebuildDistributor(): Unit = synchronized {
+ val nodes = liveNodes
+ currentDistributor = if (nodes.isEmpty) {
+ KetamaClient.shardNotAvailableDistributor
+ } else {
+ buildDistributor(nodes)
+ }
+ }
+
+ private[this] def ejectNode(key: KetamaClientKey) {
+ val node = nodes(key)
+ if (ejected.putIfAbsent(node.handle, key) == null) {
+ rebuildDistributor()
+ }
+ }
+
+ private[this] def reviveNode(key: KetamaClientKey) {
+ val node = nodes(key)
+ if (ejected.remove(node.handle) != null) {
+ rebuildDistributor()
+ }
}
def release() {
- clients.values foreach { _.release() }
+ services.values foreach { _.release() }
}
}
case class KetamaClientBuilder(
_nodes: Seq[(String, Int, Int)],
_hashName: Option[String],
- _clientBuilder: Option[ClientBuilder[_, _, _, _, ClientConfig.Yes]]) {
-
- @deprecated("Use `KetamaClientBuilder()` instead")
- def this() = this(
- Nil, // nodes
- Some("ketama"), // hashName
- None // clientBuilder
- )
-
+ _clientBuilder: Option[ClientBuilder[_, _, _, _, ClientConfig.Yes]],
+ _numFailures: Int = 5,
+ _markDeadFor: Duration = 30.seconds
+ ) {
def nodes(nodes: Seq[(String, Int, Int)]): KetamaClientBuilder =
copy(_nodes = nodes)
@@ -546,16 +622,46 @@ case class KetamaClientBuilder(
def clientBuilder(clientBuilder: ClientBuilder[_, _, _, _, ClientConfig.Yes]): KetamaClientBuilder =
copy(_clientBuilder = Some(clientBuilder))
- def build(): PartitionedClient = {
- val builder = _clientBuilder getOrElse ClientBuilder().hostConnectionLimit(1)
+ def failureAccrualParams(numFailures: Int, markDeadFor: Duration): KetamaClientBuilder =
+ copy(_numFailures = numFailures, _markDeadFor = markDeadFor)
- val clients = Map() ++ _nodes.map { case (hostname, port, weight) =>
- val b = builder.hosts(hostname + ":" + port).codec(new Memcached)
- val client = Client(b.build())
- ((hostname, port, weight) -> client)
+
+ def build(): Client = {
+ val builder =
+ (_clientBuilder getOrElse ClientBuilder().hostConnectionLimit(1)).codec(Memcached())
+
+ val keys = _nodes map {
+ case (hostname, port, weight) => KetamaClientKey(hostname, port, weight)
}
+
+ val (failureAccrurers, healths) = keys map { key =>
+ val (wrapper, broker) = failureAccrualWrapper(key)
+ ((key, wrapper), broker)
+ } unzip
+
+ val clients = failureAccrurers map { case (key, fa) =>
+ val hostBuilder = builder
+ .hosts(new InetSocketAddress(key.host, key.port))
+ .failureAccrual(fa)
+ (key -> hostBuilder.build())
+ } toMap
+
val keyHasher = KeyHasher.byName(_hashName.getOrElse("ketama"))
- new KetamaClient(clients, keyHasher)
+ val allHealth = Offer.choose(healths: _*)
+ val statsReceiver = builder.statsReceiver.scope("memcached_client")
+ new KetamaClient(clients, allHealth, keyHasher, KetamaClient.NumReps, statsReceiver)
+ }
+
+ private[this] def failureAccrualWrapper(key: KetamaClientKey) = {
+ val broker = new Broker[NodeHealth]
+ val filter = new ServiceFactoryWrapper {
+ def andThen[Req, Rep](factory: ServiceFactory[Req, Rep]) = {
+ new KetamaFailureAccrualFactory(
+ factory, _numFailures, _markDeadFor, key, broker
+ )
+ }
+ }
+ (filter, broker.recv)
}
}
View
84 finagle-memcached/src/test/scala/com/twitter/finagle/memcached/unit/ClientSpec.scala
@@ -1,7 +1,12 @@
package com.twitter.finagle.memcached.unit
+import com.twitter.finagle.{Service, ServiceException, ShardNotAvailableException}
import com.twitter.finagle.memcached._
+import com.twitter.finagle.memcached.protocol._
import com.twitter.hashing.KeyHasher
+import com.twitter.concurrent.Broker
+import com.twitter.util.Future
+import org.jboss.netty.buffer.ChannelBuffers
import org.specs.mock.Mockito
import org.specs.Specification
import scala.collection.mutable
@@ -27,34 +32,83 @@ object ClientSpec extends Specification with Mockito {
expected.size mustEqual 99
// Build Ketama client
- val client = mock[Client]
val clients = Map(
- ("10.0.1.1", 11211, 600) -> mock[Client],
- ("10.0.1.2", 11211, 300) -> mock[Client],
- ("10.0.1.3", 11211, 200) -> mock[Client],
- ("10.0.1.4", 11211, 350) -> mock[Client],
- ("10.0.1.5", 11211, 1000) -> mock[Client],
- ("10.0.1.6", 11211, 800) -> mock[Client],
- ("10.0.1.7", 11211, 950) -> mock[Client],
- ("10.0.1.8", 11211, 100) -> mock[Client]
- )
- val ketamaClient = new KetamaClient(clients, KeyHasher.KETAMA)
+ ("10.0.1.1", 11211, 600) -> mock[Service[Command, Response]],
+ ("10.0.1.2", 11211, 300) -> mock[Service[Command, Response]],
+ ("10.0.1.3", 11211, 200) -> mock[Service[Command, Response]],
+ ("10.0.1.4", 11211, 350) -> mock[Service[Command, Response]],
+ ("10.0.1.5", 11211, 1000) -> mock[Service[Command, Response]],
+ ("10.0.1.6", 11211, 800) -> mock[Service[Command, Response]],
+ ("10.0.1.7", 11211, 950) -> mock[Service[Command, Response]],
+ ("10.0.1.8", 11211, 100) -> mock[Service[Command, Response]]
+ ) map { case ((h,p,w), v) => KetamaClientKey(h,p,w) -> v }
+ val broker = new Broker[NodeHealth]
+ val ketamaClient = new KetamaClient(clients, broker.recv, KeyHasher.KETAMA, 160)
"pick the correct node" in {
- // Test that ketamaClient.clientOf(key) == expected IP
- val mockClientToIp = clients.map { case (k,v) => v -> k._1 }
+ val ipToService = clients map { case (key, service) => key.host -> service } toMap
+ val rng = new scala.util.Random
for (testcase <- expected) {
val mockClient = ketamaClient.clientOf(testcase(0))
- val resultIp = mockClientToIp(mockClient)
- resultIp must be_==(testcase(3))
+ val expectedService = ipToService(testcase(3))
+ val randomResponse = Number(rng.nextLong)
+
+ expectedService.apply(any[Incr]) returns Future.value(randomResponse)
+
+ mockClient.incr("foo")().get mustEqual randomResponse.value
}
}
+
"release" in {
ketamaClient.release()
clients.values foreach { client =>
there was one(client).release()
}
}
+
+ "ejects dead clients" in {
+ val serviceA = smartMock[Service[Command,Response]]
+ val serviceB = smartMock[Service[Command,Response]]
+ val keyA = ("10.0.1.1", 11211, 100)
+ val keyB = ("10.0.1.2", 11211, 100)
+ val nodeKeyA = KetamaClientKey(keyA._1, keyA._2, keyA._3)
+ val nodeKeyB = KetamaClientKey(keyB._1, keyB._2, keyB._3)
+ val services = Map(
+ keyA -> serviceA,
+ keyB -> serviceB
+ ) map { case ((h,p,w), v) => KetamaClientKey(h,p,w) -> v }
+
+ val key = ChannelBuffers.wrappedBuffer("foo".getBytes)
+ val value = smartMock[Value]
+ value.key returns key
+ serviceA(any) returns Future.value(Values(Seq(value)))
+
+ val broker = new Broker[NodeHealth]
+ val ketamaClient = new KetamaClient(services, broker.recv, KeyHasher.KETAMA, 160)
+
+ ketamaClient.get("foo")()
+ there was one(serviceA).apply(any)
+
+ broker !! NodeMarkedDead(nodeKeyA)
+
+ "goes to secondary if primary is down" in {
+ serviceB(Get(Seq(key))) returns Future.value(Values(Seq(value)))
+ ketamaClient.get("foo")()
+ there was one(serviceB).apply(any)
+ }
+
+ "throws ShardNotAvailableException when no nodes available" in {
+ broker !! NodeMarkedDead(nodeKeyB)
+ ketamaClient.get("foo")() must throwA[ShardNotAvailableException]
+ }
+
+ "brings back the dead node" in {
+ serviceA(any) returns Future.value(Values(Seq(value)))
+ broker !! NodeRevived(nodeKeyA)
+ ketamaClient.get("foo")()
+ there was two(serviceA).apply(any)
+ }
+ }
}
"RubyMemCacheClient" should {
View
4 project/build.properties
@@ -1,8 +1,8 @@
#Project properties
-#Thu Jan 05 10:50:23 PST 2012
+#Tue Jan 24 11:37:13 PST 2012
project.organization=com.twitter
project.name=finagle
sbt.version=0.7.5
-project.version=1.9.13-SNAPSHOT
+project.version=1.10.1-SNAPSHOT
build.scala.versions=2.8.1
project.initialize=false
View
6 project/build/Project.scala
@@ -78,9 +78,9 @@ class Project(info: ProjectInfo) extends StandardParentProject(info)
/**
* finagle-redis is a redis codec contributed by Tumblr.
*/
- val redisProject = project(
- "finagle-redis", "finagle-redis",
- new RedisProject(_), coreProject)
+// val redisProject = project(
+// "finagle-redis", "finagle-redis",
+// new RedisProject(_), coreProject)
/**
* finagle-http contains an http codec.
View
6 project/release.properties
@@ -1,4 +1,4 @@
#Automatically generated by ReleaseManagement
-#Thu Jan 05 10:50:23 PST 2012
-version=1.9.12
-sha1=e798c99d160d7e30aacb0b46f52250b6b9301955
+#Tue Jan 24 11:37:13 PST 2012
+version=1.10.0
+sha1=71e0e193858c9702662215bcef688ca8c1a4c75c
View
16 project/versions.properties
@@ -1,14 +1,14 @@
#Automatically generated by ProjectDependencies
-#Thu Jan 05 10:36:45 PST 2012
-com.twitter/util-core=1.12.9
+#Tue Jan 24 11:23:03 PST 2012
+com.twitter/util-core=1.12.12
util|util-logging=com.twitter/util-logging
-com.twitter/util-codec=1.12.9
-com.twitter/util-logging=1.12.9
-com.twitter/ostrich=4.10.2
+com.twitter/util-codec=1.12.12
+com.twitter/util-logging=1.12.12
+com.twitter/ostrich=4.10.4
ostrich|ostrich=com.twitter/ostrich
-com.twitter/util-collection=1.12.9
+com.twitter/util-collection=1.12.12
util|util-hashing=com.twitter/util-hashing
-com.twitter/util-hashing=1.12.9
-util|util-collection=com.twitter/util-collection
+com.twitter/util-hashing=1.12.12
util|util-core=com.twitter/util-core
+util|util-collection=com.twitter/util-collection
util|util-codec=com.twitter/util-codec
Please sign in to comment.
Something went wrong with that request. Please try again.