Skip to content

Commit

Permalink
Fix bugs in pipeline and cleanup
Browse files Browse the repository at this point in the history
- Buggy migration to FS2's Queue.bounded. In scalaz-stream Queue.bounded
  gave Queue[F, A] but in FS2 it gives F[Queue[F, A]] since allocating
  mutable state is an effect (this is the correct behavior). However the
  migration naively migrated f(queue.dequeue) to queue.flatMap(q => f(q.dequeue))
  and did not thread the allocated queue properly so each operation on
  the queue was done on a freshly allocated queue. Basically the place
  where the pipeline queue was being enqueued to was disconnected from
  where the pipeline queue was being pulled.
- Typo in migration in CleanupCron where head was called in the wrong
  scope
  • Loading branch information
Adelbert Chang authored and Adelbert Chang committed May 1, 2018
1 parent 31b7c5f commit 63d6592
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 79 deletions.
28 changes: 16 additions & 12 deletions core/src/main/scala/Config.scala
Expand Up @@ -340,7 +340,9 @@ final case class NelsonConfig(
proxyPortWhitelist: Option[ProxyPortWhitelist],
defaultNamespace: NamespaceName,
expirationPolicy: ExpirationPolicyConfig,
discoveryDelay: FiniteDuration
discoveryDelay: FiniteDuration,
queue: Queue[IO, Manifest.Action],
auditQueue: Queue[IO, AuditEvent[_]]
){

val log = Logger[NelsonConfig.type]
Expand All @@ -353,13 +355,7 @@ final case class NelsonConfig(

lazy val email = interpreters.email

lazy val queue: IO[Queue[IO, Manifest.Action]] =
boundedQueue(pipeline.bufferLimit)(Effect[IO], pools.defaultExecutor)

lazy val auditQueue: IO[Queue[IO, AuditEvent[_]]] =
boundedQueue(audit.bufferLimit)(Effect[IO], pools.defaultExecutor)

lazy val auditor = auditQueue.map(new Auditor(_, git.systemUsername))
lazy val auditor = new Auditor(auditQueue, git.systemUsername)

// i've currently assigned these pretty arbitrary values
// but this should protect nelson from really hammering
Expand Down Expand Up @@ -443,6 +439,11 @@ object Config {
stg = storage,
logger = wflogger
)
pipeline = readPipeline(cfg.subconfig("nelson.pipeline"))
queue <- boundedQueue[IO, Manifest.Action](pipeline.bufferLimit)(Effect[IO], pools.defaultExecutor)

audit = readAudit(cfg.subconfig("nelson.audit"))
auditQueue <- boundedQueue[IO, AuditEvent[_]](audit.bufferLimit)(Effect[IO], pools.defaultExecutor)
} yield {
NelsonConfig(
git = gitcfg,
Expand All @@ -456,8 +457,8 @@ object Config {
cleanup = cleanup,
deploymentMonitor = DeploymentMonitorConfig(deploymentMonitor),
datacenters = dcs,
pipeline = readPipeline(cfg.subconfig("nelson.pipeline")),
audit = readAudit(cfg.subconfig("nelson.audit")),
pipeline = pipeline,
audit = audit,
template = readTemplate(cfg),
http = http,
pools = pools,
Expand All @@ -468,7 +469,9 @@ object Config {
proxyPortWhitelist = whitelist,
defaultNamespace = defaultNS,
expirationPolicy = expirationPolicy,
discoveryDelay = discoveryDelay
discoveryDelay = discoveryDelay,
queue = queue,
auditQueue = auditQueue
)
}
}
Expand Down Expand Up @@ -603,7 +606,8 @@ object Config {
infra match {
case Some((kubernetes, sslContext)) =>
http4sClient(kubernetes.timeout, sslContext = Some(sslContext)).map { httpClient =>
new KubernetesClient(kubernetes.version, kubernetes.endpoint, httpClient, kubernetes.token)
val tokenFileContents = scala.io.Source.fromFile(kubernetes.token).getLines.mkString("")
new KubernetesClient(kubernetes.version, kubernetes.endpoint, httpClient, tokenFileContents)
}
case None => IO.raiseError(new IllegalArgumentException("At least one scheduler must be defined per datacenter"))
}
Expand Down
19 changes: 7 additions & 12 deletions core/src/main/scala/Nelson.scala
Expand Up @@ -158,17 +158,15 @@ object Nelson {

rhk = hook(cfg)

auditor <- cfg.auditor

out <- (auditor.write(rhk, CreateAction, login = session.user.login) *>
out <- (cfg.auditor.write(rhk, CreateAction, login = session.user.login) *>
getOrCreate(slug, rhk)(cfg))

r2 <- (log(s"result of createRepoWebhook: out = $out") *>
r1.tfold(RepoNotFound(slug))(_.copy(hook = Some(Hook(out.id, out.active)))))

foo <- (log(s"result of repo copy: r2 = $r2") *>
storage.run(cfg.storage, storage.StoreOp.insertOrUpdateRepositories(r2 :: Nil)) <*
auditor.write(r2, CreateAction, login = session.user.login))
cfg.auditor.write(r2, CreateAction, login = session.user.login))
_ <- log(s"result of insertOrUpdateRepositories: foo = $foo")
} yield r2
}
Expand All @@ -184,8 +182,7 @@ object Nelson {
rep <- storage.run(cfg.storage, storage.StoreOp.findRepository(session.user, slug))
rrr <- rep.tfold(RepoNotFound(slug))(identity)
id <- rrr.hook.tfold(UnexpectedMissingHook(slug))(_.id)
au <- cfg.auditor
_ <- au.write(rrr, DeleteAction, login = session.user.login)
_ <- cfg.auditor.write(rrr, DeleteAction, login = session.user.login)
_ <- Github.Request.deleteRepoWebhook(slug, id)(session.github).runWith(cfg.github)
.or(IO.unit)
upd = Repo(rrr.id, rrr.slug, rrr.access, None)
Expand Down Expand Up @@ -266,7 +263,7 @@ object Nelson {
}

def deploy(actions: List[Manifest.Action]): NelsonK[Unit] =
Kleisli(cfg => actions.traverse_(a => cfg.queue.flatMap(_.enqueue1(a))))
Kleisli(cfg => actions.traverse_(a => cfg.queue.enqueue1(a)))

/**
* Invoked when the inbound webhook from Github arrives, notifying Nelson
Expand All @@ -281,7 +278,7 @@ object Nelson {
val unitFilter: (Datacenter,Namespace,Plan,UnitDef) => Boolean =
(_,namespace,_,_) => namespace.name == ns

Manifest.unitActions(m, dcs, unitFilter)
Manifest.unitActions(m, dcs, unitFilter)
}

Kleisli { cfg =>
Expand All @@ -291,11 +288,9 @@ object Nelson {
fetchRepoManifestAndValidateDeployable(e.slug, r.tagName).run(cfg))
m <- v.fold(e => IO.raiseError(MultipleErrors(e)), m => IO.pure(m))

au <- cfg.auditor

hm <- (log(s"received manifest from github: $m")
*> storage.run(cfg.storage, storage.StoreOp.createRelease(e.repositoryId, r))
*> au.write(r, CreateAction, Option(r.id))
*> cfg.auditor.write(r, CreateAction, Option(r.id))
*> log(s"created release in response to release ${r.id}")
*> Manifest.saturateManifest(m)(r))

Expand Down Expand Up @@ -759,7 +754,7 @@ object Nelson {
m.hash,
m.description,
m.port,
exp)) <* cfg.auditor.flatMap(_.write(m, CreateAction, login = s.user.login))
exp)) <* cfg.auditor.write(m, CreateAction, login = s.user.login)
} yield guid
}

Expand Down
13 changes: 4 additions & 9 deletions core/src/main/scala/Pipeline.scala
Expand Up @@ -51,21 +51,16 @@ object Pipeline {
* effect generating sinks and observations.
*/
def task(config: NelsonConfig)(effects: Sink[IO, Action]): IO[Unit] = {
def par[A](ps: Stream[IO, Stream[IO, A]]) = {
def par[A](ps: Stream[IO, Stream[IO, A]]): Stream[IO, A] = {
implicit val ec = config.pools.defaultExecutor

val withErrors = ps.join(config.pipeline.concurrencyLimit).attempt

config.auditor.map(auditor => withErrors.observeW(auditor.errorSink).stripW)
withErrors.observeW(config.auditor.errorSink).stripW
}

val p: IO[Stream[IO, Stream[IO, Unit]]] =
config.queue.map(_.dequeue.map(a => Stream.emit(a).covary[IO].to(effects)))
val p: Stream[IO, Stream[IO, Unit]] = config.queue.dequeue.map(a => Stream.emit(a).covary[IO].to(effects))

for {
stream <- p
values <- par(stream)
_ <- values.compile.drain
} yield ()
par(p).compile.drain
}
}
7 changes: 3 additions & 4 deletions core/src/main/scala/cleanup/CleanupCron.scala
Expand Up @@ -20,6 +20,7 @@ package cleanup
import nelson.Datacenter.Deployment
import nelson.routing.RoutingTable

import cats.syntax.apply._
import cats.effect.{Effect, IO}
import nelson.CatsHelpers._

Expand Down Expand Up @@ -85,7 +86,7 @@ object CleanupCron {

def refresh(cfg: NelsonConfig): Stream[IO,Duration] =
Stream.repeatEval(IO(cfg.cleanup.cleanupDelay)).flatMap(d =>
Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery[IO](d)(Effect[IO], cfg.pools.schedulingExecutor)).head
Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery[IO](d)(Effect[IO], cfg.pools.schedulingExecutor).head)

/*
* This is the entry point for the cleanup pipeline. The pipeline is run at
Expand All @@ -94,7 +95,5 @@ object CleanupCron {
* to decommission deployments.
*/
def pipeline(cfg: NelsonConfig)(implicit ec: ExecutionContext): Stream[IO, Unit] =
Stream.force {
cfg.auditor.map(auditor => refresh(cfg).flatMap(_ => process(cfg).attempt).observeW(auditor.errorSink).stripW.to(Reaper.reap(cfg)))
}
(refresh(cfg) *> process(cfg).attempt).observeW(cfg.auditor.errorSink).stripW.to(Reaper.reap(cfg))
}
10 changes: 4 additions & 6 deletions core/src/main/scala/cleanup/GarbageCollector.scala
Expand Up @@ -20,7 +20,7 @@ package cleanup
import cats.effect.IO
import nelson.CatsHelpers._

import fs2.{Pipe, Stream}
import fs2.Pipe

import java.time.Instant
import journal.Logger
Expand Down Expand Up @@ -56,11 +56,9 @@ object GarbageCollector {
def mark(cfg: NelsonConfig): Pipe[IO, CleanupRow, CleanupRow] = {
import Json._
import audit.AuditableInstances._
_.flatMap { case (dc, ns, d, gr) =>
Stream.eval {
runs(cfg.storage, markAsGarbage(d.deployment).map(_ => (dc, ns, d, gr))) <*
cfg.auditor.flatMap(_.write(d.deployment, audit.GarbageAction))
}
_.evalMap { case (dc, ns, d, gr) =>
runs(cfg.storage, markAsGarbage(d.deployment).map(_ => (dc, ns, d, gr))) <*
cfg.auditor.write(d.deployment, audit.GarbageAction)
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/cleanup/Reaper.scala
Expand Up @@ -61,7 +61,7 @@ object Reaper {
import Json._
import audit.AuditableInstances._
Workflow.run(resolve(d).destroy(d,dc,ns))(t) <*
cfg.auditor.flatMap(_.write(d, audit.DeleteAction)) <*
cfg.auditor.write(d, audit.DeleteAction) <*
IO(log.debug((s"finished cleaning up $d in datacenter $dc"))) <*
Notify.sendDecommissionedNotifications(dc,ns,d)(cfg)
}
Expand Down
18 changes: 8 additions & 10 deletions core/src/main/scala/cleanup/Sweeper.scala
Expand Up @@ -77,16 +77,14 @@ object Sweeper {
}

def process(cfg: NelsonConfig)(implicit unclaimedResourceTracker: Kleisli[IO, (Datacenter, Int), Unit]): Stream[IO, Unit] =
Stream.force(cfg.auditor.map { auditor =>
Stream.repeatEval(IO(cfg.cleanup.sweeperDelay)).flatMap { d =>
Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery(d)(Effect[IO], cfg.pools.schedulingExecutor).head
}.flatMap(_ =>
Stream.eval(timer(cleanupLeakedConsulDiscoveryKeys(cfg)))
.attempt.observeW(auditor.errorSink)(Effect[IO], cfg.pools.defaultExecutor)
.stripW
)
.flatMap(os => Stream.emits(os).covary[IO]) to sweeperSink
})
Stream.repeatEval(IO(cfg.cleanup.sweeperDelay)).flatMap { d =>
Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery(d)(Effect[IO], cfg.pools.schedulingExecutor).head
}.flatMap(_ =>
Stream.eval(timer(cleanupLeakedConsulDiscoveryKeys(cfg)))
.attempt.observeW(cfg.auditor.errorSink)(Effect[IO], cfg.pools.defaultExecutor)
.stripW
)
.flatMap(os => Stream.emits(os).covary[IO]) to sweeperSink

def sweeperSink(implicit unclaimedResourceTracker: Kleisli[IO, (Datacenter, Int), Unit]): Sink[IO, SweeperHelmOp] =
Sink {
Expand Down
18 changes: 7 additions & 11 deletions core/src/main/scala/monitoring/DeploymentMonitor.scala
Expand Up @@ -75,14 +75,12 @@ object DeploymentMonitor {
* Drain all actions from the writer (using an auditor error sink, observing it and routing to a final sink.
*/
def drain[A](cfg: NelsonConfig)(h: Stream[IO, Duration], w: NelsonFK[Stream, Seq[A]], s: Sink[IO, A], k: NelsonFK[Sink, A]): Stream[IO, Unit] =
Stream.force(cfg.auditor.map { auditor =>
h >> w.run(cfg).flatMap(as => Stream.emits(as).covary[IO])
.observe(s)(Effect[IO], cfg.pools.defaultExecutor)
.attempt
.observeW(auditor.errorSink)(Effect[IO], cfg.pools.defaultExecutor)
.stripW
.to(k.run(cfg))
})
h >> w.run(cfg).flatMap(as => Stream.emits(as).covary[IO])
.observe(s)(Effect[IO], cfg.pools.defaultExecutor)
.attempt
.observeW(cfg.auditor.errorSink)(Effect[IO], cfg.pools.defaultExecutor)
.stripW
.to(k.run(cfg))

/*
* Build a list of MonitorActionItems based on the health of deployments that are presently in the Warming state.
Expand Down Expand Up @@ -168,9 +166,7 @@ object DeploymentMonitor {
}

def promotionSink(cfg: NelsonConfig): Sink[IO, MonitorActionItem] =
stream => Stream.force(cfg.auditor.map { auditor =>
stream.evalMap(item => promote(item)(auditor))
})
Sink(item => promote(item)(cfg.auditor))

def promote(item: MonitorActionItem)(auditor: audit.Auditor): IO[Unit] = {
val task = item match {
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/scala/routing/cron.scala
Expand Up @@ -57,10 +57,8 @@ object cron {
}

def consulRefresh(cfg: NelsonConfig): Stream[IO,(Datacenter,ConsulOp.ConsulOpF[Unit])] =
Stream.force(cfg.auditor.map { auditor =>
Stream.repeatEval(IO(cfg.discoveryDelay)).
flatMap(d => Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery(d)(Effect[IO], cfg.pools.defaultExecutor).head).
flatMap(_ => Stream.eval(refresh(cfg)).attempt.observeW(auditor.errorSink)(Effect[IO], cfg.pools.defaultExecutor).stripW).
flatMap(xs => Stream.emits(xs))
})
Stream.repeatEval(IO(cfg.discoveryDelay)).
flatMap(d => Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery(d)(Effect[IO], cfg.pools.defaultExecutor).head).
flatMap(_ => Stream.eval(refresh(cfg)).attempt.observeW(cfg.auditor.errorSink)(Effect[IO], cfg.pools.defaultExecutor).stripW).
flatMap(xs => Stream.emits(xs))
}
14 changes: 7 additions & 7 deletions core/src/test/scala/AuditSpec.scala
Expand Up @@ -58,7 +58,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach {
override def beforeEach: Unit = setup.unsafeRunSync()

it should "enqueue all events in the stream" in {
val audit = new Auditor(config.auditQueue.unsafeRunSync(),defaultSystemLogin)
val audit = new Auditor(config.auditQueue,defaultSystemLogin)
val p: Stream[IO, Foo] = Stream(Foo(1),Foo(2),Foo(3),Foo(10))

p.observe(audit.auditSink(LoggingAction))(Effect[IO], config.pools.defaultExecutor).compile.drain.unsafeRunSync()
Expand All @@ -69,7 +69,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach {
}

it should "store auditable events in storage" in {
val audit = new Auditor(config.auditQueue.unsafeRunSync(),defaultSystemLogin)
val audit = new Auditor(config.auditQueue,defaultSystemLogin)
val events = Vector(Foo(1),Foo(2),Foo(3),Foo(10))
val p: Stream[IO, Foo] = Stream(events: _*)

Expand All @@ -83,7 +83,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach {
}

it should "be able to write to audit log directly" in {
val audit = new Auditor(config.auditQueue.unsafeRunSync(), defaultSystemLogin)
val audit = new Auditor(config.auditQueue, defaultSystemLogin)
val foo = Foo(1)

audit.write(foo, CreateAction).unsafeRunSync()
Expand All @@ -97,7 +97,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach {
}

it should "accept an optional release id parameter" in {
val audit = new Auditor(config.auditQueue.unsafeRunSync(),defaultSystemLogin)
val audit = new Auditor(config.auditQueue,defaultSystemLogin)
val foo = Foo(1)

val releaseId = Option(10L)
Expand All @@ -114,7 +114,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach {

it should "accept an optional user parameter" in {
val login = "scalatest"
val audit = new Auditor(config.auditQueue.unsafeRunSync(),defaultSystemLogin)
val audit = new Auditor(config.auditQueue,defaultSystemLogin)
val foo = Foo(1)

audit.write(foo, CreateAction, login = login).unsafeRunSync()
Expand All @@ -130,7 +130,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach {
behavior of "listing audit logs"

it should "accept an optional action parameter for filtering" in {
val audit = new Auditor(config.auditQueue.unsafeRunSync(), defaultSystemLogin)
val audit = new Auditor(config.auditQueue, defaultSystemLogin)
val foo = Foo(1)

audit.write(foo, CreateAction).unsafeRunSync()
Expand All @@ -144,7 +144,7 @@ class AuditSpec extends NelsonSuite with BeforeAndAfterEach {
}

it should "accept an optional category parameter for filtering" in {
val audit = new Auditor(config.auditQueue.unsafeRunSync(), defaultSystemLogin)
val audit = new Auditor(config.auditQueue, defaultSystemLogin)
val foo = Foo(1)
val bar = Bar(2)

Expand Down
2 changes: 1 addition & 1 deletion http/src/main/scala/Main.scala
Expand Up @@ -76,7 +76,7 @@ object Main {
import cleanup.SweeperDefaults._

log.info("booting the background processes nelson needs to operate...")
runBackgroundJob("auditor", Stream.force(cfg.auditor.map(auditor => auditor.process(cfg.storage)(cfg.pools.defaultExecutor))))
runBackgroundJob("auditor", cfg.auditor.process(cfg.storage)(cfg.pools.defaultExecutor))
runBackgroundJob("pipeline_processor", Stream.eval(Pipeline.task(cfg)(Pipeline.sinks.runAction(cfg))))
runBackgroundJob("workflow_logger", cfg.workflowLogger.process)
runBackgroundJob("routing_cron", routing.cron.consulRefresh(cfg) to Http4sConsul.consulSink)
Expand Down

0 comments on commit 63d6592

Please sign in to comment.