diff --git a/.sbtopts b/.sbtopts new file mode 100644 index 0000000..6a552ca --- /dev/null +++ b/.sbtopts @@ -0,0 +1,4 @@ +-J-Xmx4G +-J-XX:MaxMetaspaceSize=1G +-J-XX:MaxPermSize=1G +-J-XX:+CMSClassUnloadingEnabled \ No newline at end of file diff --git a/src/main/scala/com/iheart/workpipeline/akka/helpers/MessageScheduler.scala b/src/main/scala/com/iheart/workpipeline/akka/helpers/MessageScheduler.scala index 6d9a1d4..fccec7d 100644 --- a/src/main/scala/com/iheart/workpipeline/akka/helpers/MessageScheduler.scala +++ b/src/main/scala/com/iheart/workpipeline/akka/helpers/MessageScheduler.scala @@ -12,4 +12,12 @@ trait MessageScheduler { import context.dispatcher context.system.scheduler.scheduleOnce(delay, receiver, msg) } + def maybeDelayedMsg(delayO: Option[FiniteDuration], msg: Any, receiver: ActorRef = self ): Option[Cancellable] = { + delayO.map(delayedMsg(_, msg, receiver)).orElse { + receiver ! msg + None + } + } + + } diff --git a/src/main/scala/com/iheart/workpipeline/akka/patterns/queue/QueueProcessor.scala b/src/main/scala/com/iheart/workpipeline/akka/patterns/queue/QueueProcessor.scala index 62ca6a0..12ce136 100644 --- a/src/main/scala/com/iheart/workpipeline/akka/patterns/queue/QueueProcessor.scala +++ b/src/main/scala/com/iheart/workpipeline/akka/patterns/queue/QueueProcessor.scala @@ -4,9 +4,13 @@ import java.time.{ZoneOffset, LocalDateTime} import akka.actor.SupervisorStrategy.Restart import akka.actor._ +import com.iheart.workpipeline.akka.patterns.queue.CommonProtocol.{WorkFailed, WorkTimedOut} +import com.iheart.workpipeline.akka.patterns.queue.Worker.Hold import com.iheart.workpipeline.metrics.{Metric, MetricsCollector, NoOpMetricsCollector} import com.iheart.workpipeline.akka.helpers.MessageScheduler import com.iheart.workpipeline.akka.patterns.CommonProtocol.{ShutdownSuccessfully, QueryStatus} +import com.iheart.workpipeline.collection.FiniteCollection._ + import QueueProcessor._ import Queue.{Retire} import scala.concurrent.duration._ @@ -16,7 +20,11 @@ trait QueueProcessor extends Actor with ActorLogging with MessageScheduler { val queue: QueueRef def delegateeProps: Props def settings: ProcessingWorkerPoolSettings + def resultChecker: ResultChecker val metricsCollector: MetricsCollector + type ResultHistory = Vector[Boolean] + val resultHistoryLength: Int + protected def onWorkError(resultHistory: ResultHistory, pool: WorkerPool) metricsCollector.send(Metric.PoolSize(settings.startingPoolSize)) @@ -25,57 +33,77 @@ trait QueueProcessor extends Actor with ActorLogging with MessageScheduler { case _: Exception => Restart } - def workerProp(queueRef: QueueRef, delegateeProps: Props): Props + def workerProp(queueRef: QueueRef, delegateeProps: Props): Props = + Worker.default(queue, delegateeProps)(resultChecker) def receive: Receive = { val workers = (1 to settings.startingPoolSize).map(createWorker).toSet settings.maxProcessingTime.foreach(delayedMsg(_, QueueMaxProcessTimeReached(queue))) context watch queue - monitoring(workers) + monitoring()(workers) } - def monitoring(pool: WorkerPool): Receive = { + def monitoring(resultHistory: ResultHistory = Vector.empty)(pool: WorkerPool): Receive = { + def workError(): Unit = { + val newHistory = resultHistory.enqueueFinite(false, resultHistoryLength) + context become monitoring(newHistory)(pool) + onWorkError(newHistory, pool) + } - case ScaleTo(newPoolSize, reason) => - log.info(s"Command to scale to $newPoolSize, currently at ${pool.size} due to ${reason.getOrElse("no reason given")}") - metricsCollector.send(Metric.PoolSize(newPoolSize)) + { + case ScaleTo(newPoolSize, reason) => + log.info(s"Command to scale to $newPoolSize, currently at ${pool.size} due to ${reason.getOrElse("no reason given")}") + metricsCollector.send(Metric.PoolSize(newPoolSize)) - val diff = newPoolSize - pool.size - if (diff > 0) - context become monitoring(pool ++ (1 to diff).map(createWorker)) - else if (diff < 0 && newPoolSize >= settings.minPoolSize) - pool.take(-diff).foreach(_ ! Worker.Retire) + val diff = newPoolSize - pool.size + if (diff > 0) + context become monitoring(resultHistory)(pool ++ (1 to diff).map(createWorker)) + else if (diff < 0 && newPoolSize >= settings.minPoolSize) + pool.take(-diff).foreach(_ ! Worker.Retire) - case MissionAccomplished(worker) => - removeWorker(pool, worker, monitoring, "successfully after all work is done") + case MissionAccomplished(worker) => + removeWorker(pool, worker, monitoring(resultHistory), "successfully after all work is done") - case Terminated(worker) if pool.contains(worker) => - removeWorker(pool, worker, monitoring, "unexpected termination when all workers retired") + case WorkCompleted(worker) => + context become monitoring(resultHistory.enqueueFinite(true, resultHistoryLength))(pool) + metricsCollector.send(Metric.WorkCompleted) - case Terminated(`queue`) => - log.info(s"Queue ${queue.path} is terminated") - self ! Shutdown(retireQueue = false) + case WorkFailed(_) => + workError() + metricsCollector.send(Metric.WorkFailed) - case QueueMaxProcessTimeReached(queue) => - log.warning(s"Queue ${queue.path} is still processing after max process time. Shutting Down") - self ! Shutdown(retireQueue = true) + case WorkTimedOut(_) => + workError() + metricsCollector.send(Metric.WorkTimedOut) - case qs: QueryStatus => qs reply RunningStatus(pool) + case Terminated(worker) if pool.contains(worker) => + removeWorker(pool, worker, monitoring(resultHistory), "unexpected termination when all workers retired") - case Shutdown(reportTo, timeout, retireQueue) => - log.info("Commanded to shutdown. Shutting down") - if(retireQueue) - queue ! Retire(timeout) - else //retire from the workers' side - pool.foreach(_ ! Worker.Retire) + case Terminated(`queue`) => + log.info(s"Queue ${queue.path} is terminated") + self ! Shutdown(retireQueue = false) - delayedMsg(timeout, ShutdownTimeout) - context become shuttingDown(pool, reportTo) + case QueueMaxProcessTimeReached(queue) => + log.warning(s"Queue ${queue.path} is still processing after max process time. Shutting Down") + self ! Shutdown(retireQueue = true) + case qs: QueryStatus => qs reply RunningStatus(pool) + + case Shutdown(reportTo, timeout, retireQueue) => + log.info("Commanded to shutdown. Shutting down") + if(retireQueue) + queue ! Retire(timeout) + else //retire from the workers' side + pool.foreach(_ ! Worker.Retire) + + delayedMsg(timeout, ShutdownTimeout) + context become shuttingDown(pool, reportTo) + }: Receive } + def shuttingDown(pool: WorkerPool, reportTo: Option[ActorRef]): Receive = { case MissionAccomplished(worker) => removeWorker(pool, worker, shuttingDown(_, reportTo), "successfully after command", reportTo) @@ -139,8 +167,10 @@ case class DefaultQueueProcessor(queue: QueueRef, settings: ProcessingWorkerPoolSettings, metricsCollector: MetricsCollector = NoOpMetricsCollector, resultChecker: ResultChecker) extends QueueProcessor { - def workerProp(queue: QueueRef, delegateeProps: Props): Props = - Worker.default(queue, delegateeProps, metricsCollector)(resultChecker) + + override val resultHistoryLength: Int = 0 + + override protected def onWorkError(resultHistory: ResultHistory, pool: WorkerPool): Unit = () //do nothing } case class QueueProcessorWithCircuitBreaker(queue: QueueRef, @@ -149,8 +179,13 @@ case class QueueProcessorWithCircuitBreaker(queue: QueueRef, circuitBreakerSettings: CircuitBreakerSettings, metricsCollector: MetricsCollector = NoOpMetricsCollector, resultChecker: ResultChecker) extends QueueProcessor { - def workerProp(queue: QueueRef, delegateeProps: Props): Props = - Worker.withCircuitBreaker(queue, delegateeProps, circuitBreakerSettings, metricsCollector)(resultChecker) + + override val resultHistoryLength: Int = circuitBreakerSettings.historyLength + + override protected def onWorkError(resultHistory: ResultHistory, pool: WorkerPool): Unit = { + if( (resultHistory.count(r => !r).toDouble / resultHistoryLength) >= circuitBreakerSettings.errorRateThreshold ) + pool.foreach(_ ! Hold(circuitBreakerSettings.closeDuration)) + } } @@ -164,7 +199,10 @@ object QueueProcessor { } + case class MissionAccomplished(worker: WorkerRef) + case class WorkCompleted(worker: WorkerRef) + case class QueueMaxProcessTimeReached(queue: QueueRef) case class RunningStatus(pool: WorkerPool) case object ShuttingDown diff --git a/src/main/scala/com/iheart/workpipeline/akka/patterns/queue/Worker.scala b/src/main/scala/com/iheart/workpipeline/akka/patterns/queue/Worker.scala index 0aa2f09..5424f36 100644 --- a/src/main/scala/com/iheart/workpipeline/akka/patterns/queue/Worker.scala +++ b/src/main/scala/com/iheart/workpipeline/akka/patterns/queue/Worker.scala @@ -3,36 +3,26 @@ package com.iheart.workpipeline.akka.patterns.queue import akka.actor._ import com.iheart.workpipeline.akka.helpers.MessageScheduler import com.iheart.workpipeline.akka.patterns -import com.iheart.workpipeline.collection.FiniteCollection -import com.iheart.workpipeline.metrics.{MetricsCollector, NoOpMetricsCollector, Metric} import patterns.CommonProtocol.QueryStatus import CommonProtocol.{WorkTimedOut, WorkFailed} -import QueueProcessor.MissionAccomplished +import com.iheart.workpipeline.akka.patterns.queue.QueueProcessor.{WorkCompleted, MissionAccomplished} import Queue.{Unregistered, Unregister, NoWorkLeft, RequestWork} import Worker._ import scala.concurrent.duration._ -import FiniteCollection._ trait Worker extends Actor with ActorLogging with MessageScheduler { - type ResultHistory = Vector[Boolean] protected def delegateeProps: Props //actor who really does the work protected val queue: ActorRef protected def monitor: ActorRef = context.parent - protected val metricsCollector: MetricsCollector def receive = idle() - def resultHistoryLength: Int - - //hate to have a var here, but this field avoid having to pass this history all over the places. - protected var resultHistory: ResultHistory = Vector.empty - context watch queue override def preStart(): Unit = { - askMoreWork() + askMoreWork(None) } lazy val delegatee = { @@ -41,8 +31,11 @@ trait Worker extends Actor with ActorLogging with MessageScheduler { ref } - def idle(): Receive = { - case work : Work => sendWorkToDelegatee(work, 0) + def idle(delayBeforeNextWork: Option[FiniteDuration] = None): Receive = { + + case Hold(period) ⇒ context become idle(Some(period)) + + case work : Work => sendWorkToDelegatee(work, 0, delayBeforeNextWork) case NoWorkLeft => monitor ! MissionAccomplished(self) //todo: maybe a simple stop is good enough? @@ -59,16 +52,18 @@ trait Worker extends Actor with ActorLogging with MessageScheduler { def finish(): Unit = context stop self - def working(outstanding: Outstanding): Receive = ({ - case Terminated(`queue`) => context become retiring(Some(outstanding)) + def working(outstanding: Outstanding, delayBeforeNextWork: Option[FiniteDuration] = None): Receive = ({ + case Hold(period) ⇒ context become working(outstanding, Some(period)) + + case Terminated(`queue`) => context become retiring(Some(outstanding)) - case qs: QueryStatus => qs reply Working + case qs: QueryStatus => qs reply Working - case Worker.Retire => context become retiring(Some(outstanding)) + case Worker.Retire => context become retiring(Some(outstanding)) }: Receive).orElse( - waitingResult(outstanding, false)) + waitingResult(outstanding, false, delayBeforeNextWork)) .orElse { case msg => log.error(s"unrecognized interrupting msg during working $msg" ) @@ -81,7 +76,7 @@ trait Worker extends Actor with ActorLogging with MessageScheduler { case Retire => //already retiring }: Receive) orElse ( if(outstanding.isDefined) - waitingResult(outstanding.get, true) + waitingResult(outstanding.get, true, None) else { case w: Work => sender ! Rejected(w, "Retiring") @@ -89,16 +84,16 @@ trait Worker extends Actor with ActorLogging with MessageScheduler { } ) - - def waitingResult(outstanding: Outstanding, isRetiring: Boolean): Receive = ({ + def waitingResult(outstanding: Outstanding, + isRetiring: Boolean, + delayBeforeNextWork: Option[FiniteDuration]): Receive = ({ case DelegateeTimeout => log.error(s"${delegatee.path} timed out after ${outstanding.work.settings.timeout} work ${outstanding.work.messageToDelegatee} abandoned") outstanding.timeout() if(isRetiring) finish() else { - appendResultHistory(false) - askMoreWork() + askMoreWork(delayBeforeNextWork) } case w: Work => sender ! Rejected(w, "busy") //just in case @@ -106,67 +101,58 @@ trait Worker extends Actor with ActorLogging with MessageScheduler { case Right(result) => outstanding.success(result) if(isRetiring) finish() else { - appendResultHistory(true) - askMoreWork() + askMoreWork(delayBeforeNextWork) } case Left(e) => log.error(s"error $e returned by delegatee in regards to running work $outstanding") - appendResultHistory(false) - retryOrAbandon(outstanding, isRetiring, e) + retryOrAbandon(outstanding, isRetiring, e, delayBeforeNextWork) } - private def retryOrAbandon(outstanding: Outstanding, isRetiring: Boolean, error: Any): Unit = { + private def retryOrAbandon(outstanding: Outstanding, + isRetiring: Boolean, + error: Any, + delayBeforeNextWork: Option[FiniteDuration]): Unit = { outstanding.cancel() - if (outstanding.retried < outstanding.work.settings.retry ) { + if (outstanding.retried < outstanding.work.settings.retry && delayBeforeNextWork.isEmpty) { log.info(s"Retry work $outstanding") - sendWorkToDelegatee(outstanding.work, outstanding.retried + 1) + sendWorkToDelegatee(outstanding.work, outstanding.retried + 1, None) } else { val message = s"Work failed after ${outstanding.retried} try(s)" log.error(s"$message, work $outstanding abandoned") outstanding.fail(WorkFailed(message + s" due to $error")) if(isRetiring) finish() else - askMoreWork() + askMoreWork(delayBeforeNextWork) } } - private def sendWorkToDelegatee(work: Work, retried: Int): Unit = { - val timeoutHandle: Cancellable = delayedMsg(work.settings.timeout, DelegateeTimeout) - delegatee ! work.messageToDelegatee + private def sendWorkToDelegatee(work: Work, retried: Int, delay: Option[FiniteDuration]): Unit = { + val timeoutHandle: Cancellable = delayedMsg(delay.fold(work.settings.timeout)(_ + work.settings.timeout) , DelegateeTimeout) + maybeDelayedMsg(delay, work.messageToDelegatee, delegatee) context become working(Outstanding(work, timeoutHandle, retried)) } - private def askMoreWork(): Unit = { - val delay = holdOnGettingMoreWork - if(delay.isDefined) - delayedMsg(delay.get, RequestWork(self), queue) - else - queue ! RequestWork(self) - context become idle + private def askMoreWork(delay: Option[FiniteDuration]): Unit = { + maybeDelayedMsg(delay, RequestWork(self), queue) + context become idle() } - private def appendResultHistory(result: Boolean): Unit = - resultHistory = resultHistory.enqueueFinite(result, resultHistoryLength) - - protected def resultChecker: ResultChecker - protected def holdOnGettingMoreWork: Option[FiniteDuration] - protected case class Outstanding(work: Work, timeoutHandle: Cancellable, retried: Int = 0) { def success(result: Any): Unit = { - metricsCollector.send(Metric.WorkCompleted) + monitor ! WorkCompleted(self) done(result) } def fail(result: Any): Unit = { - metricsCollector.send(Metric.WorkFailed) + monitor ! WorkFailed(result.toString) done(result) } def timeout(): Unit = { - metricsCollector.send(Metric.WorkTimedOut) + monitor ! WorkTimedOut("unknown") done(WorkTimedOut(s"Delegatee didn't respond within ${work.settings.timeout}")) } @@ -196,46 +182,22 @@ object Worker { case object Idle extends WorkerStatus case object Working extends WorkerStatus + case class Hold(period: FiniteDuration) + class DefaultWorker(protected val queue: QueueRef, protected val delegateeProps: Props, - protected val resultChecker: ResultChecker, - protected val metricsCollector: MetricsCollector = NoOpMetricsCollector) extends Worker { + protected val resultChecker: ResultChecker) extends Worker { val resultHistoryLength = 0 - protected def holdOnGettingMoreWork: Option[FiniteDuration] = None } - class WorkerWithCircuitBreaker( protected val queue: QueueRef, - protected val delegateeProps: Props, - protected val resultChecker: ResultChecker, - circuitBreakerSettings: CircuitBreakerSettings, - protected val metricsCollector: MetricsCollector = NoOpMetricsCollector) extends Worker { - - protected def holdOnGettingMoreWork: Option[FiniteDuration] = { - if( (resultHistory.count(r => !r).toDouble / resultHistoryLength) >= circuitBreakerSettings.errorRateThreshold ) - Some(circuitBreakerSettings.closeDuration) - else - None - } - val resultHistoryLength = circuitBreakerSettings.historyLength - } - - - def default(queue: QueueRef, - delegateeProps: Props, - metricsCollector: MetricsCollector = NoOpMetricsCollector)(resultChecker: ResultChecker): Props = { - Props(new DefaultWorker(queue, delegateeProps, resultChecker, metricsCollector)) + delegateeProps: Props)(resultChecker: ResultChecker): Props = { + Props(new DefaultWorker(queue, delegateeProps, resultChecker)) } - def withCircuitBreaker(queue: QueueRef, - delegateeProps: Props, - circuitBreakerSettings: CircuitBreakerSettings, - metricsCollector: MetricsCollector = NoOpMetricsCollector)(resultChecker: ResultChecker): Props = { - Props(new WorkerWithCircuitBreaker(queue, delegateeProps, resultChecker, circuitBreakerSettings, metricsCollector)) - } } diff --git a/src/test/scala/com/iheart/workpipeline/akka/patterns/queue/QueueSpec.scala b/src/test/scala/com/iheart/workpipeline/akka/patterns/queue/QueueSpec.scala index c9fc012..0484198 100644 --- a/src/test/scala/com/iheart/workpipeline/akka/patterns/queue/QueueSpec.scala +++ b/src/test/scala/com/iheart/workpipeline/akka/patterns/queue/QueueSpec.scala @@ -2,6 +2,7 @@ package com.iheart.workpipeline.akka.patterns.queue import akka.actor._ +import akka.testkit.{TestActorRef, TestProbe} import com.iheart.workpipeline.akka.{SpecWithActorSystem, patterns} import com.iheart.workpipeline.akka.patterns.CommonProtocol.{ShutdownSuccessfully, QueryStatus} import com.iheart.workpipeline.metrics.{Metric, MetricsCollector, NoOpMetricsCollector} @@ -126,9 +127,9 @@ class ScalingWhenWorkingSpec extends SpecWithActorSystem with Mockito { } "retiring a worker when it already started working" in new QueueScope { - val queueProcessor = initQueue( iteratorQueue(List("a", "b", "c").iterator, - WorkSettings(sendResultTo = Some(self))), - numberOfWorkers = 2) + val queueProcessor = initQueue(iteratorQueue(List("a", "b", "c").iterator, + WorkSettings(sendResultTo = Some(self))), + numberOfWorkers = 2) delegatee.expectMsgType[DelegateeMessage] expectNoMsg(20.millisecond) //wait for both workers get occupied @@ -154,11 +155,15 @@ class CircuitBreakerSpec extends SpecWithActorSystem { "Circuit Breaker" >> { "worker cools down after consecutive errors" in new QueueScope { - - system.actorOf(queueProcessorWithCBProps(iteratorQueue(List("a", "b", "c", "d", "e").iterator), - CircuitBreakerSettings(historyLength = 3, closeDuration = 200.milliseconds) + val queue = defaultQueue() + system.actorOf(queueProcessorWithCBProps(queue, + CircuitBreakerSettings(historyLength = 3, closeDuration = 300.milliseconds) ), "queuewithCB") + queue ! Enqueue(DelegateeMessage("a")) + queue ! Enqueue(DelegateeMessage("b")) + queue ! Enqueue(DelegateeMessage("c")) + queue ! Enqueue(DelegateeMessage("d")) delegatee.expectMsg(DelegateeMessage("a")) delegatee.reply(MessageProcessed("a")) delegatee.expectMsg(DelegateeMessage("b")) @@ -167,8 +172,10 @@ class CircuitBreakerSpec extends SpecWithActorSystem { delegatee.reply(MessageFailed) delegatee.expectMsg(DelegateeMessage("d")) delegatee.reply(MessageFailed) + delegatee.expectNoMsg(30.milliseconds) //give some time for the circuit breaker to kick in - delegatee.expectNoMsg(190.milliseconds) + queue ! Enqueue(DelegateeMessage("e")) + delegatee.expectNoMsg(150.milliseconds) delegatee.expectMsg(DelegateeMessage("e")) @@ -288,7 +295,44 @@ class QueueMetricsSpec extends SpecWithActorSystem with Mockito { one(mc).send(Metric.WorkQueueMaxLength(1)) andThen two(mc).send(Metric.EnqueueRejected) } +} + +class QueueWorkMetricsSpec extends SpecWithActorSystem with Mockito { + + "send WorkCompleted, WorkFailed, and WorkTimedOut metrics" in new QueueScope { + + @volatile + var receivedMetrics: List[Metric] = Nil + val mc = new MetricsCollector { + def send(metric: Metric): Unit = receivedMetrics = metric :: receivedMetrics + } + + val workerProps: Props = Worker.default( + TestProbe().ref, + Props.empty)(resultChecker) + + val queue: QueueRef = defaultQueue(WorkSettings(timeout = 23.milliseconds)) + val processor: ActorRef = TestActorRef(defaultProcessorProps(queue, metricsCollector = mc)) + + watch(processor) + queue ! Enqueue("a") + queue ! Enqueue("b") + queue ! Enqueue("c") + + + delegatee.expectMsg("a") + delegatee.reply(MessageProcessed("a")) + delegatee.expectMsg("b") + delegatee.reply(MessageFailed) + delegatee.expectMsg("c") //timeout this one + + queue ! Enqueue("d") + delegatee.expectMsg(100.milliseconds, "d") + + receivedMetrics must contain(allOf[Metric](Metric.WorkCompleted, Metric.WorkFailed, Metric.WorkTimedOut)) + + } } @@ -302,33 +346,33 @@ class QueueScope(implicit system: ActorSystem) extends ScopeWithQueue { } - def initQueue(queue: ActorRef, numberOfWorkers: Int = 1, minPoolSize: Int = 1) : QueueProcessorRef = { + def initQueue(queue: ActorRef, numberOfWorkers: Int = 1, minPoolSize: Int = 1): QueueProcessorRef = { val processorProps: Props = defaultProcessorProps(queue, ProcessingWorkerPoolSettings(startingPoolSize = numberOfWorkers, minPoolSize = minPoolSize), metricsCollector) system.actorOf(processorProps) } def waitForWorkerRegistration(queue: QueueRef, numberOfWorkers: Int): Unit = { queue ! QueryStatus() - fishForMessage(500.millisecond, "wait for workers to register"){ - case qs : QueueStatus => + fishForMessage(500.millisecond, "wait for workers to register") { + case qs: QueueStatus => val registered = qs.queuedWorkers.size == numberOfWorkers - if(!registered) queue ! QueryStatus() + if (!registered) queue ! QueryStatus() registered } } def iteratorQueue(iterator: Iterator[String], workSetting: WorkSettings = WorkSettings()): QueueRef = system.actorOf(iteratorQueueProps(iterator, workSetting, metricsCollector), - "iterator-queue-" + Random.nextInt(100000)) + "iterator-queue-" + Random.nextInt(100000)) def defaultQueue(workSetting: WorkSettings = WorkSettings()): QueueRef = system.actorOf(Queue.default(workSetting, metricsCollector), - "default-queue-" + Random.nextInt(100000)) + "default-queue-" + Random.nextInt(100000)) def withBackPressure(backPressureSetting: BackPressureSettings = BackPressureSettings(), defaultWorkSetting: WorkSettings = WorkSettings()) = system.actorOf(Queue.withBackPressure(backPressureSetting, defaultWorkSetting, metricsCollector), - "with-back-pressure-queue" + Random.nextInt(500000)) + "with-back-pressure-queue" + Random.nextInt(500000)) } diff --git a/src/test/scala/com/iheart/workpipeline/akka/patterns/queue/WorkerSpec.scala b/src/test/scala/com/iheart/workpipeline/akka/patterns/queue/WorkerSpec.scala deleted file mode 100644 index 0d50dd3..0000000 --- a/src/test/scala/com/iheart/workpipeline/akka/patterns/queue/WorkerSpec.scala +++ /dev/null @@ -1,40 +0,0 @@ -package com.iheart.workpipeline.akka.patterns.queue - -import akka.actor.{Actor, ActorRef, ActorSystem, Props} -import akka.testkit._ -import com.iheart.workpipeline.akka.{SpecWithActorSystem, patterns} -import com.iheart.workpipeline.metrics.{Metric, MetricsCollector, NoOpMetricsCollector} -import org.specs2.specification.Scope -import org.specs2.mock.Mockito -import scala.concurrent.duration._ - -import TestUtils._ - -class WorkerMetricsSpec extends SpecWithActorSystem with Mockito { - "send WorkCompleted, WorkFailed, and WorkTimedOut metrics" in new WorkerScope { - val mc = mock[MetricsCollector] - val workerProps: Props = Worker.default( - TestProbe().ref, - Props.empty, - mc)(resultChecker) - val worker: ActorRef = TestActorRef(workerProps) - - worker ! Work("some work") - worker ! MessageProcessed("I did it") - - worker ! Work("really hard work") - worker ! MessageFailed - - worker ! Work("work with an impossible time limit", WorkSettings(timeout = 0.seconds)) - - there was after(100.milliseconds). - one(mc).send(Metric.WorkCompleted) andThen - one(mc).send(Metric.WorkFailed) andThen - one(mc).send(Metric.WorkTimedOut) - } -} - -class WorkerScope(implicit system: ActorSystem) - extends TestKit(system) with ImplicitSender with Scope - -