Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes integration for Nelson #49

Merged
merged 1 commit into from Jan 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/resources/nelson/defaults.cfg
Expand Up @@ -233,6 +233,7 @@ nelson {
#
# infrastructure {
# scheduler {
# scheduler = "nomad"
# nomad {
# endpoint = "http://nomad.service.texas.your.company.com:1234/"
# timeout = 1 second
Expand Down
92 changes: 81 additions & 11 deletions core/src/main/scala/Config.scala
Expand Up @@ -16,7 +16,11 @@
//: ----------------------------------------------------------------------------
package nelson

import java.io.FileInputStream
import javax.net.ssl.{SSLContext, X509TrustManager}
import java.nio.file.{Path, Paths}
import java.security.SecureRandom
import java.security.cert.{CertificateFactory, X509Certificate}
import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorService, ThreadFactory}

import journal.Logger
Expand All @@ -32,6 +36,7 @@ import notifications.{SlackHttp,SlackOp,EmailOp,EmailServer}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.control.NonFatal
import scalaz.Scalaz._
import scalaz.concurrent.Strategy
import scalaz.~>
Expand Down Expand Up @@ -530,18 +535,70 @@ object Config {
)((a,b,c,d,e,g) => {
val splunk = readSplunk
val loggingSidecar = readLoggingImage
val uri = org.http4s.Uri.fromString(a).toOption.yolo(s"nomad.endpoint -- $a -- is an invalid Uri")
val uri = Uri.fromString(a).toOption.yolo(s"nomad.endpoint -- $a -- is an invalid Uri")
Infrastructure.Nomad(uri,b,c,d,e,loggingSidecar,g,splunk)
})
}

/*
* Datacenters currently only support one scheduler
*/
def readScheduler(kfg: KConfig, proxy: Option[Infrastructure.ProxyCredentials]): Option[SchedulerOp ~> Task] =
readNomadInfrastructure(kfg.subconfig("nomad"))
def readKubernetesInfrastructure(kfg: KConfig): Option[Infrastructure.Kubernetes] = {
(kfg.lookup[String]("endpoint") |@| kfg.lookup[Duration]("timeout")) { (endpoint, timeout) =>
val uri = Uri.fromString(endpoint).toOption.yolo(s"kubernetes.endpoint -- $endpoint -- is an invalid Uri")
Infrastructure.Kubernetes(uri, timeout)
}
}

def readNomadScheduler(kfg: KConfig): Option[SchedulerOp ~> Task] =
readNomadInfrastructure(kfg)
.map(n => new scheduler.NomadHttp(nomadcfg, n, http4sClient(n.timeout)))

// Create a X509TrustManager that trusts a whitelist of certificates, similar to `curl --cacert`
def cacert(certs: Array[X509Certificate]): X509TrustManager =
new X509TrustManager {
def getAcceptedIssuers(): Array[X509Certificate] = certs
def checkClientTrusted(certs: Array[X509Certificate], authType: String): Unit = ()
def checkServerTrusted(certs: Array[X509Certificate], authType: String): Unit = ()
}

def getKubernetesPodCert(): Option[SSLContext] = {
// Auto-mounted at this path for pods
// https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod
val certBundle = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"

val is = new FileInputStream(certBundle)
val x509Cert: Option[X509Certificate] = try {
val cf = CertificateFactory.getInstance("X.509")
cf.generateCertificate(is) match {
case c: X509Certificate => Some(c)
case _ => None
}
} catch {
case NonFatal(_) => None
} finally {
is.close()
}

x509Cert.map { cert =>
val trustManager = cacert(Array(cert))
val sslContext = SSLContext.getInstance("TLS")
sslContext.init(null, Array(trustManager), new SecureRandom())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this make a new secure random for every request? It's not clear if this code block is called frequently or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appears so yeah, but it should only be called once on startup I think since (currently) it's only being used to create the K8s scheduler interpreter. I could move it out but I'm not sure what the security or mutability ramifications of that are.

sslContext
}
}

def readKubernetesClient(kfg: KConfig): Option[KubernetesClient] =
(readKubernetesInfrastructure(kfg) |@| getKubernetesPodCert()) {
case (kubernetes, sslContext) =>
// Auto-mounted at this path for pods
// https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod
val path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
val serviceAccountToken = scala.io.Source.fromFile(path).getLines.toList.head
new KubernetesClient(
kubernetes.endpoint,
http4sClient(kubernetes.timeout, sslContext = Some(sslContext)),
serviceAccountToken
)
}

@SuppressWarnings(Array("org.brianmckenna.wartremover.warts.NoNeedForMonad"))
def readDatacenter(id: String, kfg: KConfig): Datacenter = {
val proxyCreds =
Expand Down Expand Up @@ -583,8 +640,19 @@ object Config {
InstrumentedVaultClient(endpoint, rawClient)
}).yolo("We really really need vault. Seriously vault must be configured")

val sched = readScheduler(kfg.subconfig("infrastructure.scheduler"), proxyCreds)
.yolo("At least one scheduler must be defined per datacenter")
val schedConfig = kfg.subconfig("infrastructure.scheduler")

val (sched, healthChecker) = (schedConfig.lookup[String]("scheduler") match {
case Some("nomad") =>
val s = readNomadScheduler(schedConfig.subconfig("nomad"))
val h = health.Http4sConsulHealthClient(consul)
s.map((_, h))
case Some("kubernetes") =>
readKubernetesClient(schedConfig.subconfig("kubernetes")).map { client =>
(new scheduler.KubernetesHttp(client), health.KubernetesHealthClient(client))
}
case _ => None
}).yolo("At least one scheduler must be defined per datacenter")

val interpreters = Infrastructure.Interpreters(
scheduler = sched,
Expand All @@ -594,7 +662,7 @@ object Config {
logger = logger,
docker = dockerClient,
control = WorkflowControlOp.trans,
health = health.Http4sConsulHealthClient(consul)
health = healthChecker
)

val trafficShift = readTrafficShift(kfg.subconfig("traffic-shift"))
Expand Down Expand Up @@ -775,9 +843,11 @@ object Config {
pkiPath = cfg.lookup[String]("pki-path")
)

private def http4sClient(timeout: Duration, maxTotalConnections: Int = 10): Client = {
private def http4sClient(timeout: Duration, maxTotalConnections: Int = 10, sslContext: Option[SSLContext] = None): Client = {
val config = BlazeClientConfig.defaultConfig.copy(
requestTimeout = timeout)
requestTimeout = timeout,
sslContext = sslContext
)
PooledHttp1Client(maxTotalConnections = maxTotalConnections, config = config)
}
}
34 changes: 20 additions & 14 deletions core/src/main/scala/Datacenter.scala
Expand Up @@ -16,28 +16,29 @@
//: ----------------------------------------------------------------------------
package nelson

import scalaz.{Order, ValidationNel, ~>}
import scalaz.std.string._
import scalaz.syntax.monoid._
import scalaz.std.set._
import scalaz.syntax.std.option._
import scalaz.syntax.foldable._
import scalaz.concurrent.Task
import java.net.URI
import java.time.Instant
import scala.concurrent.duration.FiniteDuration

import com.amazonaws.regions.Region
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems errant? I don't think this type is used anywhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happened since I was moving around/sorting the imports, my PR specifically does not use it but it's being used somewhere else in the file.

import concurrent.duration._
import helm.ConsulOp
import health.HealthCheckOp
import storage.StoreOp
import scheduler.SchedulerOp
import Workflow.WorkflowOp
import docker.DockerOp
import helm.ConsulOp
import loadbalancers.LoadbalancerOp
import logging.LoggingOp
import org.http4s.Uri
import scalaz.concurrent.Task
import scalaz.std.set._
import scalaz.std.string._
import scalaz.syntax.foldable._
import scalaz.syntax.monoid._
import scalaz.syntax.std.option._
import scalaz.{Order, ValidationNel, ~>}
import scheduler.SchedulerOp
import storage.StoreOp
import vault.Vault
import loadbalancers.LoadbalancerOp
import com.amazonaws.regions.Region
import Workflow.WorkflowOp


object Infrastructure {
Expand All @@ -61,7 +62,7 @@ object Infrastructure {
)

final case class Nomad(
endpoint: org.http4s.Uri,
endpoint: Uri,
timeout: Duration,
dockerRepoUser: String,
dockerRepoPassword: String,
Expand All @@ -71,6 +72,11 @@ object Infrastructure {
splunk: Option[SplunkConfig]
)

final case class Kubernetes(
endpoint: Uri,
timeout: Duration
)

final case class SplunkConfig(
splunkUrl: String,
splunkToken: String
Expand Down