diff --git a/core/src/main/scala/Config.scala b/core/src/main/scala/Config.scala index e537778b..bd00db5a 100644 --- a/core/src/main/scala/Config.scala +++ b/core/src/main/scala/Config.scala @@ -340,7 +340,9 @@ final case class NelsonConfig( proxyPortWhitelist: Option[ProxyPortWhitelist], defaultNamespace: NamespaceName, expirationPolicy: ExpirationPolicyConfig, - discoveryDelay: FiniteDuration + discoveryDelay: FiniteDuration, + queue: Queue[IO, Manifest.Action], + auditQueue: Queue[IO, AuditEvent[_]] ){ val log = Logger[NelsonConfig.type] @@ -353,13 +355,7 @@ final case class NelsonConfig( lazy val email = interpreters.email - lazy val queue: IO[Queue[IO, Manifest.Action]] = - boundedQueue(pipeline.bufferLimit)(Effect[IO], pools.defaultExecutor) - - lazy val auditQueue: IO[Queue[IO, AuditEvent[_]]] = - boundedQueue(audit.bufferLimit)(Effect[IO], pools.defaultExecutor) - - lazy val auditor = auditQueue.map(new Auditor(_, git.systemUsername)) + lazy val auditor = new Auditor(auditQueue, git.systemUsername) // i've currently assigned these pretty arbitrary values // but this should protect nelson from really hammering @@ -443,6 +439,11 @@ object Config { stg = storage, logger = wflogger ) + pipeline = readPipeline(cfg.subconfig("nelson.pipeline")) + queue <- boundedQueue[IO, Manifest.Action](pipeline.bufferLimit)(Effect[IO], pools.defaultExecutor) + + audit = readAudit(cfg.subconfig("nelson.audit")) + auditQueue <- boundedQueue[IO, AuditEvent[_]](audit.bufferLimit)(Effect[IO], pools.defaultExecutor) } yield { NelsonConfig( git = gitcfg, @@ -456,8 +457,8 @@ object Config { cleanup = cleanup, deploymentMonitor = DeploymentMonitorConfig(deploymentMonitor), datacenters = dcs, - pipeline = readPipeline(cfg.subconfig("nelson.pipeline")), - audit = readAudit(cfg.subconfig("nelson.audit")), + pipeline = pipeline, + audit = audit, template = readTemplate(cfg), http = http, pools = pools, @@ -468,7 +469,9 @@ object Config { proxyPortWhitelist = whitelist, defaultNamespace = defaultNS, expirationPolicy = expirationPolicy, - discoveryDelay = discoveryDelay + discoveryDelay = discoveryDelay, + queue = queue, + auditQueue = auditQueue ) } } @@ -603,7 +606,8 @@ object Config { infra match { case Some((kubernetes, sslContext)) => http4sClient(kubernetes.timeout, sslContext = Some(sslContext)).map { httpClient => - new KubernetesClient(kubernetes.version, kubernetes.endpoint, httpClient, kubernetes.token) + val tokenFileContents = scala.io.Source.fromFile(kubernetes.token).getLines.mkString("") + new KubernetesClient(kubernetes.version, kubernetes.endpoint, httpClient, tokenFileContents) } case None => IO.raiseError(new IllegalArgumentException("At least one scheduler must be defined per datacenter")) } diff --git a/core/src/main/scala/Nelson.scala b/core/src/main/scala/Nelson.scala index 1ae0a0f0..724f5f82 100644 --- a/core/src/main/scala/Nelson.scala +++ b/core/src/main/scala/Nelson.scala @@ -158,9 +158,7 @@ object Nelson { rhk = hook(cfg) - auditor <- cfg.auditor - - out <- (auditor.write(rhk, CreateAction, login = session.user.login) *> + out <- (cfg.auditor.write(rhk, CreateAction, login = session.user.login) *> getOrCreate(slug, rhk)(cfg)) r2 <- (log(s"result of createRepoWebhook: out = $out") *> @@ -168,7 +166,7 @@ object Nelson { foo <- (log(s"result of repo copy: r2 = $r2") *> storage.run(cfg.storage, storage.StoreOp.insertOrUpdateRepositories(r2 :: Nil)) <* - auditor.write(r2, CreateAction, login = session.user.login)) + cfg.auditor.write(r2, CreateAction, login = session.user.login)) _ <- log(s"result of insertOrUpdateRepositories: foo = $foo") } yield r2 } @@ -184,8 +182,7 @@ object Nelson { rep <- storage.run(cfg.storage, storage.StoreOp.findRepository(session.user, slug)) rrr <- rep.tfold(RepoNotFound(slug))(identity) id <- rrr.hook.tfold(UnexpectedMissingHook(slug))(_.id) - au <- cfg.auditor - _ <- au.write(rrr, DeleteAction, login = session.user.login) + _ <- cfg.auditor.write(rrr, DeleteAction, login = session.user.login) _ <- Github.Request.deleteRepoWebhook(slug, id)(session.github).runWith(cfg.github) .or(IO.unit) upd = Repo(rrr.id, rrr.slug, rrr.access, None) @@ -266,7 +263,7 @@ object Nelson { } def deploy(actions: List[Manifest.Action]): NelsonK[Unit] = - Kleisli(cfg => actions.traverse_(a => cfg.queue.flatMap(_.enqueue1(a)))) + Kleisli(cfg => actions.traverse_(a => cfg.queue.enqueue1(a))) /** * Invoked when the inbound webhook from Github arrives, notifying Nelson @@ -281,7 +278,7 @@ object Nelson { val unitFilter: (Datacenter,Namespace,Plan,UnitDef) => Boolean = (_,namespace,_,_) => namespace.name == ns - Manifest.unitActions(m, dcs, unitFilter) + Manifest.unitActions(m, dcs, unitFilter) } Kleisli { cfg => @@ -291,11 +288,9 @@ object Nelson { fetchRepoManifestAndValidateDeployable(e.slug, r.tagName).run(cfg)) m <- v.fold(e => IO.raiseError(MultipleErrors(e)), m => IO.pure(m)) - au <- cfg.auditor - hm <- (log(s"received manifest from github: $m") *> storage.run(cfg.storage, storage.StoreOp.createRelease(e.repositoryId, r)) - *> au.write(r, CreateAction, Option(r.id)) + *> cfg.auditor.write(r, CreateAction, Option(r.id)) *> log(s"created release in response to release ${r.id}") *> Manifest.saturateManifest(m)(r)) @@ -759,7 +754,7 @@ object Nelson { m.hash, m.description, m.port, - exp)) <* cfg.auditor.flatMap(_.write(m, CreateAction, login = s.user.login)) + exp)) <* cfg.auditor.write(m, CreateAction, login = s.user.login) } yield guid } diff --git a/core/src/main/scala/Pipeline.scala b/core/src/main/scala/Pipeline.scala index 7931ba24..815acee5 100644 --- a/core/src/main/scala/Pipeline.scala +++ b/core/src/main/scala/Pipeline.scala @@ -51,21 +51,16 @@ object Pipeline { * effect generating sinks and observations. */ def task(config: NelsonConfig)(effects: Sink[IO, Action]): IO[Unit] = { - def par[A](ps: Stream[IO, Stream[IO, A]]) = { + def par[A](ps: Stream[IO, Stream[IO, A]]): Stream[IO, A] = { implicit val ec = config.pools.defaultExecutor val withErrors = ps.join(config.pipeline.concurrencyLimit).attempt - config.auditor.map(auditor => withErrors.observeW(auditor.errorSink).stripW) + withErrors.observeW(config.auditor.errorSink).stripW } - val p: IO[Stream[IO, Stream[IO, Unit]]] = - config.queue.map(_.dequeue.map(a => Stream.emit(a).covary[IO].to(effects))) + val p: Stream[IO, Stream[IO, Unit]] = config.queue.dequeue.map(a => Stream.emit(a).covary[IO].to(effects)) - for { - stream <- p - values <- par(stream) - _ <- values.compile.drain - } yield () + par(p).compile.drain } } diff --git a/core/src/main/scala/cleanup/CleanupCron.scala b/core/src/main/scala/cleanup/CleanupCron.scala index 833d4c13..e0921ab9 100644 --- a/core/src/main/scala/cleanup/CleanupCron.scala +++ b/core/src/main/scala/cleanup/CleanupCron.scala @@ -20,6 +20,7 @@ package cleanup import nelson.Datacenter.Deployment import nelson.routing.RoutingTable +import cats.syntax.apply._ import cats.effect.{Effect, IO} import nelson.CatsHelpers._ @@ -85,7 +86,7 @@ object CleanupCron { def refresh(cfg: NelsonConfig): Stream[IO,Duration] = Stream.repeatEval(IO(cfg.cleanup.cleanupDelay)).flatMap(d => - Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery[IO](d)(Effect[IO], cfg.pools.schedulingExecutor)).head + Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery[IO](d)(Effect[IO], cfg.pools.schedulingExecutor).head) /* * This is the entry point for the cleanup pipeline. The pipeline is run at @@ -94,7 +95,5 @@ object CleanupCron { * to decommission deployments. */ def pipeline(cfg: NelsonConfig)(implicit ec: ExecutionContext): Stream[IO, Unit] = - Stream.force { - cfg.auditor.map(auditor => refresh(cfg).flatMap(_ => process(cfg).attempt).observeW(auditor.errorSink).stripW.to(Reaper.reap(cfg))) - } + (refresh(cfg) *> process(cfg).attempt).observeW(cfg.auditor.errorSink).stripW.to(Reaper.reap(cfg)) } diff --git a/core/src/main/scala/cleanup/GarbageCollector.scala b/core/src/main/scala/cleanup/GarbageCollector.scala index 70beacd8..191672f7 100644 --- a/core/src/main/scala/cleanup/GarbageCollector.scala +++ b/core/src/main/scala/cleanup/GarbageCollector.scala @@ -20,7 +20,7 @@ package cleanup import cats.effect.IO import nelson.CatsHelpers._ -import fs2.{Pipe, Stream} +import fs2.Pipe import java.time.Instant import journal.Logger @@ -56,11 +56,9 @@ object GarbageCollector { def mark(cfg: NelsonConfig): Pipe[IO, CleanupRow, CleanupRow] = { import Json._ import audit.AuditableInstances._ - _.flatMap { case (dc, ns, d, gr) => - Stream.eval { - runs(cfg.storage, markAsGarbage(d.deployment).map(_ => (dc, ns, d, gr))) <* - cfg.auditor.flatMap(_.write(d.deployment, audit.GarbageAction)) - } + _.evalMap { case (dc, ns, d, gr) => + runs(cfg.storage, markAsGarbage(d.deployment).map(_ => (dc, ns, d, gr))) <* + cfg.auditor.write(d.deployment, audit.GarbageAction) } } } diff --git a/core/src/main/scala/cleanup/Reaper.scala b/core/src/main/scala/cleanup/Reaper.scala index 9ae011c6..d4d090b2 100644 --- a/core/src/main/scala/cleanup/Reaper.scala +++ b/core/src/main/scala/cleanup/Reaper.scala @@ -61,7 +61,7 @@ object Reaper { import Json._ import audit.AuditableInstances._ Workflow.run(resolve(d).destroy(d,dc,ns))(t) <* - cfg.auditor.flatMap(_.write(d, audit.DeleteAction)) <* + cfg.auditor.write(d, audit.DeleteAction) <* IO(log.debug((s"finished cleaning up $d in datacenter $dc"))) <* Notify.sendDecommissionedNotifications(dc,ns,d)(cfg) } diff --git a/core/src/main/scala/cleanup/Sweeper.scala b/core/src/main/scala/cleanup/Sweeper.scala index 590798b8..3182922a 100644 --- a/core/src/main/scala/cleanup/Sweeper.scala +++ b/core/src/main/scala/cleanup/Sweeper.scala @@ -77,16 +77,14 @@ object Sweeper { } def process(cfg: NelsonConfig)(implicit unclaimedResourceTracker: Kleisli[IO, (Datacenter, Int), Unit]): Stream[IO, Unit] = - Stream.force(cfg.auditor.map { auditor => - Stream.repeatEval(IO(cfg.cleanup.sweeperDelay)).flatMap { d => - Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery(d)(Effect[IO], cfg.pools.schedulingExecutor).head - }.flatMap(_ => - Stream.eval(timer(cleanupLeakedConsulDiscoveryKeys(cfg))) - .attempt.observeW(auditor.errorSink)(Effect[IO], cfg.pools.defaultExecutor) - .stripW - ) - .flatMap(os => Stream.emits(os).covary[IO]) to sweeperSink - }) + Stream.repeatEval(IO(cfg.cleanup.sweeperDelay)).flatMap { d => + Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery(d)(Effect[IO], cfg.pools.schedulingExecutor).head + }.flatMap(_ => + Stream.eval(timer(cleanupLeakedConsulDiscoveryKeys(cfg))) + .attempt.observeW(cfg.auditor.errorSink)(Effect[IO], cfg.pools.defaultExecutor) + .stripW + ) + .flatMap(os => Stream.emits(os).covary[IO]) to sweeperSink def sweeperSink(implicit unclaimedResourceTracker: Kleisli[IO, (Datacenter, Int), Unit]): Sink[IO, SweeperHelmOp] = Sink { diff --git a/core/src/main/scala/monitoring/DeploymentMonitor.scala b/core/src/main/scala/monitoring/DeploymentMonitor.scala index 2ef32981..468df353 100644 --- a/core/src/main/scala/monitoring/DeploymentMonitor.scala +++ b/core/src/main/scala/monitoring/DeploymentMonitor.scala @@ -75,14 +75,12 @@ object DeploymentMonitor { * Drain all actions from the writer (using an auditor error sink, observing it and routing to a final sink. */ def drain[A](cfg: NelsonConfig)(h: Stream[IO, Duration], w: NelsonFK[Stream, Seq[A]], s: Sink[IO, A], k: NelsonFK[Sink, A]): Stream[IO, Unit] = - Stream.force(cfg.auditor.map { auditor => - h >> w.run(cfg).flatMap(as => Stream.emits(as).covary[IO]) - .observe(s)(Effect[IO], cfg.pools.defaultExecutor) - .attempt - .observeW(auditor.errorSink)(Effect[IO], cfg.pools.defaultExecutor) - .stripW - .to(k.run(cfg)) - }) + h >> w.run(cfg).flatMap(as => Stream.emits(as).covary[IO]) + .observe(s)(Effect[IO], cfg.pools.defaultExecutor) + .attempt + .observeW(cfg.auditor.errorSink)(Effect[IO], cfg.pools.defaultExecutor) + .stripW + .to(k.run(cfg)) /* * Build a list of MonitorActionItems based on the health of deployments that are presently in the Warming state. @@ -168,9 +166,7 @@ object DeploymentMonitor { } def promotionSink(cfg: NelsonConfig): Sink[IO, MonitorActionItem] = - stream => Stream.force(cfg.auditor.map { auditor => - stream.evalMap(item => promote(item)(auditor)) - }) + Sink(item => promote(item)(cfg.auditor)) def promote(item: MonitorActionItem)(auditor: audit.Auditor): IO[Unit] = { val task = item match { diff --git a/core/src/main/scala/routing/cron.scala b/core/src/main/scala/routing/cron.scala index 6734f046..5d47750c 100644 --- a/core/src/main/scala/routing/cron.scala +++ b/core/src/main/scala/routing/cron.scala @@ -57,10 +57,8 @@ object cron { } def consulRefresh(cfg: NelsonConfig): Stream[IO,(Datacenter,ConsulOp.ConsulOpF[Unit])] = - Stream.force(cfg.auditor.map { auditor => - Stream.repeatEval(IO(cfg.discoveryDelay)). - flatMap(d => Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery(d)(Effect[IO], cfg.pools.defaultExecutor).head). - flatMap(_ => Stream.eval(refresh(cfg)).attempt.observeW(auditor.errorSink)(Effect[IO], cfg.pools.defaultExecutor).stripW). - flatMap(xs => Stream.emits(xs)) - }) + Stream.repeatEval(IO(cfg.discoveryDelay)). + flatMap(d => Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery(d)(Effect[IO], cfg.pools.defaultExecutor).head). + flatMap(_ => Stream.eval(refresh(cfg)).attempt.observeW(cfg.auditor.errorSink)(Effect[IO], cfg.pools.defaultExecutor).stripW). + flatMap(xs => Stream.emits(xs)) } diff --git a/core/src/test/scala/AuditSpec.scala b/core/src/test/scala/AuditSpec.scala index 30731de8..f5ab70da 100644 --- a/core/src/test/scala/AuditSpec.scala +++ b/core/src/test/scala/AuditSpec.scala @@ -58,7 +58,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach { override def beforeEach: Unit = setup.unsafeRunSync() it should "enqueue all events in the stream" in { - val audit = new Auditor(config.auditQueue.unsafeRunSync(),defaultSystemLogin) + val audit = new Auditor(config.auditQueue,defaultSystemLogin) val p: Stream[IO, Foo] = Stream(Foo(1),Foo(2),Foo(3),Foo(10)) p.observe(audit.auditSink(LoggingAction))(Effect[IO], config.pools.defaultExecutor).compile.drain.unsafeRunSync() @@ -69,7 +69,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach { } it should "store auditable events in storage" in { - val audit = new Auditor(config.auditQueue.unsafeRunSync(),defaultSystemLogin) + val audit = new Auditor(config.auditQueue,defaultSystemLogin) val events = Vector(Foo(1),Foo(2),Foo(3),Foo(10)) val p: Stream[IO, Foo] = Stream(events: _*) @@ -83,7 +83,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach { } it should "be able to write to audit log directly" in { - val audit = new Auditor(config.auditQueue.unsafeRunSync(), defaultSystemLogin) + val audit = new Auditor(config.auditQueue, defaultSystemLogin) val foo = Foo(1) audit.write(foo, CreateAction).unsafeRunSync() @@ -97,7 +97,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach { } it should "accept an optional release id parameter" in { - val audit = new Auditor(config.auditQueue.unsafeRunSync(),defaultSystemLogin) + val audit = new Auditor(config.auditQueue,defaultSystemLogin) val foo = Foo(1) val releaseId = Option(10L) @@ -114,7 +114,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach { it should "accept an optional user parameter" in { val login = "scalatest" - val audit = new Auditor(config.auditQueue.unsafeRunSync(),defaultSystemLogin) + val audit = new Auditor(config.auditQueue,defaultSystemLogin) val foo = Foo(1) audit.write(foo, CreateAction, login = login).unsafeRunSync() @@ -130,7 +130,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach { behavior of "listing audit logs" it should "accept an optional action parameter for filtering" in { - val audit = new Auditor(config.auditQueue.unsafeRunSync(), defaultSystemLogin) + val audit = new Auditor(config.auditQueue, defaultSystemLogin) val foo = Foo(1) audit.write(foo, CreateAction).unsafeRunSync() @@ -144,7 +144,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach { } it should "accept an optional category parameter for filtering" in { - val audit = new Auditor(config.auditQueue.unsafeRunSync(), defaultSystemLogin) + val audit = new Auditor(config.auditQueue, defaultSystemLogin) val foo = Foo(1) val bar = Bar(2) diff --git a/http/src/main/scala/Main.scala b/http/src/main/scala/Main.scala index 8f837200..26c71fc1 100644 --- a/http/src/main/scala/Main.scala +++ b/http/src/main/scala/Main.scala @@ -76,7 +76,7 @@ object Main { import cleanup.SweeperDefaults._ log.info("booting the background processes nelson needs to operate...") - runBackgroundJob("auditor", Stream.force(cfg.auditor.map(auditor => auditor.process(cfg.storage)(cfg.pools.defaultExecutor)))) + runBackgroundJob("auditor", cfg.auditor.process(cfg.storage)(cfg.pools.defaultExecutor)) runBackgroundJob("pipeline_processor", Stream.eval(Pipeline.task(cfg)(Pipeline.sinks.runAction(cfg)))) runBackgroundJob("workflow_logger", cfg.workflowLogger.process) runBackgroundJob("routing_cron", routing.cron.consulRefresh(cfg) to Http4sConsul.consulSink)