diff --git a/atlas-aggregator/src/main/resources/application.conf b/atlas-aggregator/src/main/resources/application.conf index f52d7434..b01788c8 100644 --- a/atlas-aggregator/src/main/resources/application.conf +++ b/atlas-aggregator/src/main/resources/application.conf @@ -22,6 +22,10 @@ atlas.aggregator { } ] + publisher { + queue-size = 10000 + } + // Determines whether or not to include the aggregator node as a tag on counters. // If false it will use atlas.dstype=sum instead. include-aggr-tag = false diff --git a/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AggrConfig.scala b/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AggrConfig.scala index 5e3faa62..81fcb397 100644 --- a/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AggrConfig.scala +++ b/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AggrConfig.scala @@ -28,7 +28,7 @@ import com.netflix.spectator.impl.Cache import com.typesafe.config.Config class AggrConfig( - config: Config, + val config: Config, registry: Registry, system: ActorSystem ) extends AtlasConfig diff --git a/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AkkaPublisher.scala b/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AkkaPublisher.scala index dbe055c8..32d24cc2 100644 --- a/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AkkaPublisher.scala +++ b/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/AkkaPublisher.scala @@ -76,6 +76,9 @@ class AkkaPublisher(registry: Registry, config: AggrConfig, implicit val system: private val ec: ExecutionContext = ThreadPools.fixedSize(registry, "PublishEncoding", encodingParallelism) + private val pubConfig = config.config.getConfig("atlas.aggregator.publisher") + private val queueSize = pubConfig.getInt("queue-size") + private val atlasClient = createClient("publisher-atlas") private val lwcClient = createClient("publisher-lwc") @@ -96,7 +99,7 @@ class AkkaPublisher(registry: Registry, config: AggrConfig, implicit val system: flow } StreamOps - .blockingQueue[RequestTuple](config.debugRegistry(), name, 1000) + .blockingQueue[RequestTuple](config.debugRegistry(), name, queueSize) .via(restartFlow) .toMat(Sink.ignore)(Keep.left) .run() @@ -106,14 +109,18 @@ class AkkaPublisher(registry: Registry, config: AggrConfig, implicit val system: override def publish(payload: PublishPayload): CompletableFuture[Void] = { val t = new RequestTuple(atlasUri, "publisher-atlas", payload) - atlasClient.offer(t) - t.future + if (atlasClient.offer(t)) + CompletableFuture.completedFuture(VoidInstance) + else + CompletableFuture.failedFuture(new IllegalStateException("failed to enqueue atlas request")) } override def publish(payload: EvalPayload): CompletableFuture[Void] = { val t = new RequestTuple(evalUri, "publisher-lwc", payload) - lwcClient.offer(t) - t.future + if (lwcClient.offer(t)) + CompletableFuture.completedFuture(VoidInstance) + else + CompletableFuture.failedFuture(new IllegalStateException("failed to enqueue lwc request")) } override def close(): Unit = {