Skip to content

Commit

Permalink
Support the discovery and traffic subsystems in pulsar
Browse files Browse the repository at this point in the history
  • Loading branch information
timperrett committed Jan 10, 2019
1 parent 5859886 commit 2aeeb3c
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 17 deletions.
21 changes: 21 additions & 0 deletions core/src/main/scala/Workflow.scala
Expand Up @@ -168,6 +168,16 @@ object Workflow {
_ <- Discovery.writeDiscoveryInfoToConsul(ns, sn, dc.domain.name, dt).inject[WorkflowOp]
} yield ()

/**
* Provides a way to see if creating a traffic shift is actually relevant. In the case
* of periodic units, shifting traffic would make no sense, so those are NoOps.
*/
def getTrafficShift(unit: UnitDef, plan: Plan, d: Datacenter): Option[Manifest.TrafficShift] =
if (!Manifest.isPeriodic(unit, plan))
Option(plan.environment.trafficShift
.getOrElse(Manifest.TrafficShift(d.defaultTrafficShift.policy, d.defaultTrafficShift.duration)))
else None

def createTrafficShift(id: ID, nsRef: NamespaceName, dc: Datacenter, p: TrafficShiftPolicy, dur: FiniteDuration): WorkflowF[Unit] = {
val prog = for {
ns <- OptionT(StoreOp.getNamespace(dc.name, nsRef))
Expand All @@ -179,6 +189,17 @@ object Workflow {
prog.value.inject[WorkflowOp].map(_ => ())
}

/*
* When the workflow is completed, we typically want to set the deployment to "Warming", so that once
* consul indicates the deployment to be passing the health check, we can promote to "Ready" (via the
* DeploymentMonitor background process). However, units without ports are not registered in consul, and
* thus we should immediately advance mark the deployment as "Ready". Once Reconciliation is also used as
* a gating factor for promoting deployments to "Ready", we can potentially set all units to "Warming" here.
*/
def getStatus(unit: UnitDef, plan: Plan): DeploymentStatus =
if (Manifest.isPeriodic(unit,plan)) DeploymentStatus.Ready
else unit.ports.fold[DeploymentStatus](DeploymentStatus.Ready)(_ => DeploymentStatus.Warming)

def dockerOps(id: ID, unit: UnitDef, registry: RegistryURI): WorkflowF[Image] = {
import Docker.Pull.{Error => PullError}
import Docker.Push.{Error => PushError}
Expand Down
19 changes: 2 additions & 17 deletions core/src/main/scala/workflows/Magnetar.scala
Expand Up @@ -16,7 +16,7 @@
//: ----------------------------------------------------------------------------
package nelson

import Manifest.{UnitDef,Versioned,Plan,TrafficShift}
import Manifest.{UnitDef,Versioned,Plan}
import Datacenter.{Namespace,Deployment}
import Workflow.WorkflowF
import cats.implicits._
Expand All @@ -32,21 +32,6 @@ object Magnetar extends Workflow[Unit] {
val sn = Datacenter.StackName(unit.name, vunit.version, hash)
val rs = unit.dependencies.keys.toSet ++ unit.resources.map(_.name)

// When the workflow is completed, we typically want to set the deployment to "Warming", so that once
// consul indicates the deployment to be passing the health check, we can promote to "Ready" (via the
// DeploymentMonitor background process). However, units without ports are not registered in consul, and
// thus we should immediately advance mark the deployment as "Ready". Once Reconciliation is also used as
// a gating factor for promoting deployments to "Ready", we can potentially set all units to "Warming" here.
def getStatus(unit: UnitDef, plan: Plan): DeploymentStatus =
if (Manifest.isPeriodic(unit,plan)) Ready
else unit.ports.fold[DeploymentStatus](Ready)(_ => Warming)

def getTrafficShift: Option[TrafficShift] =
if (!Manifest.isPeriodic(unit, p))
Option(p.environment.trafficShift
.getOrElse(TrafficShift(dc.defaultTrafficShift.policy, dc.defaultTrafficShift.duration)))
else None

for {
_ <- status(id, Pending, "workflow about to start")
i <- dockerOps(id, unit, dc.docker.registry)
Expand All @@ -56,7 +41,7 @@ object Magnetar extends Workflow[Unit] {
_ <- writePolicyToVault(cfg = dc.policy, sn = sn, ns = ns.name, rs = rs)
_ <- logToFile(id, s"writing discovery tables to ${routing.Discovery.consulDiscoveryKey(sn)}")
_ <- writeDiscoveryToConsul(id, sn, ns.name, dc)
_ <- getTrafficShift.fold(pure(()))(ts => createTrafficShift(id, ns.name, dc, ts.policy, ts.duration) *> logToFile(id, s"Creating traffic shift: ${ts.policy.ref}"))
_ <- getTrafficShift(unit, p, dc).fold(pure(()))(ts => createTrafficShift(id, ns.name, dc, ts.policy, ts.duration) *> logToFile(id, s"Creating traffic shift: ${ts.policy.ref}"))
_ <- logToFile(id, s"instructing ${dc.name}'s scheduler to handle service container")
l <- launch(i, dc, ns.name, vunit, p, hash)
_ <- debug(s"response from scheduler $l")
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/workflows/Pulsar.scala
Expand Up @@ -54,6 +54,11 @@ object Pulsar extends Workflow[Unit] {
//// create a Vault kubernetes auth role
_ <- logToFile(id, s"Writing Kubernetes auth role '${sn.toString}' to Vault...")
_ <- writeKubernetesRoleToVault(dc = dc, sn = sn, ns = ns.name)
//// write the needful to consul
_ <- logToFile(id, s"writing discovery tables to ${routing.Discovery.consulDiscoveryKey(sn)}")
_ <- writeDiscoveryToConsul(id, sn, ns.name, dc)
_ <- getTrafficShift(unit, p, dc).fold(pure(()))(ts => createTrafficShift(id, ns.name, dc, ts.policy, ts.duration) *> logToFile(id, s"Creating traffic shift: ${ts.policy.ref}"))
//// show kubernetes some love
_ <- logToFile(id, s"Instructing ${dc.name}'s scheduler to handle service container")
l <- launch(i, dc, ns.name, vunit, p, hash)
_ <- debug(s"Scheduler responded with: ${l}")
Expand Down
12 changes: 12 additions & 0 deletions docs/src/hugo/content/getting-started/blueprints.md
Expand Up @@ -81,6 +81,18 @@ It is these kinds of actions and steps, when viewed together, that makeup a work
<td align="center">✓</td>
<td align="center">✓</td>
</tr>
<tr>
<td><i>Discovery</i></td>
<td></td>
<td align="center">✓</td>
<td align="center">✓</td>
</tr>
<tr>
<td><i>Traffic Shifting</i></td>
<td></td>
<td align="center">✓</td>
<td align="center">✓</td>
</tr>
<tr>
<td><i>Prometheus</i></td>
<td></td>
Expand Down

0 comments on commit 2aeeb3c

Please sign in to comment.