Skip to content

Commit

Permalink
Merge 1eac0e1 into 8578f44
Browse files Browse the repository at this point in the history
  • Loading branch information
kyuksel committed Jun 23, 2020
2 parents 8578f44 + 1eac0e1 commit 2c6d794
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 35 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ To depend on the `MockGoogle*` classes, additionally depend on:

Contains utility functions for talking to Google APIs via com.google.cloud client library (more recent) via gRPC.

Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.10-956a642"`
Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.10-TRAVIS-REPLACE-ME"`

To start the Google PubSub emulator for unit testing:

Expand Down
9 changes: 5 additions & 4 deletions google2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@ This file documents changes to the `workbench-google2` library, including notes
## 0.10
Changed:
- Move `resizeDisk` from `GoogleComputeService` to `GoogleDiskService`
- Renamed KubernetesSerializableName extension classes
- Rename KubernetesSerializableName extension classes
- Add `getDisk`
- Make `genDiskName` non-empty

Added:
- Add `GoogleDiskService` and `GoogleDiskInterpreter`
- Add `{create,delete}Disk` and `listDisks` to `GoogleDiskService`
- refactor parameters for kubernetes service entity
- Refactor parameters for Kubernetes service entity
- Add `BigQuery`
- Add Generator for `DiskName`
- Add Kubernetes client APIs for creating service accounts, roles and role bindings

SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.10-956a642"`
SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.10-TRAVIS-REPLACE-ME"`

## 0.9
Changed:
- Fix a bug in `GoogleDataprocService` where region is not set properly
- A few minor dependency updates
- Upgade google pubsub library to latest, which deprecated ProjectTopicName in many APIs
- Upgrade Google PubSub library to latest, which deprecated ProjectTopicName in many APIs

SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.9-8051635"`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.google.auth.oauth2.{AccessToken, GoogleCredentials}
import com.google.common.cache.{CacheBuilder, CacheLoader}
import com.google.container.v1.Cluster
import io.kubernetes.client.{ApiClient, ApiException}
import io.kubernetes.client.apis.CoreV1Api
import io.kubernetes.client.apis.{CoreV1Api, RbacAuthorizationV1Api}
import io.kubernetes.client.util.Config
import org.broadinstitute.dsde.workbench.google2.GKEModels.KubernetesClusterId
import org.broadinstitute.dsde.workbench.google2.KubernetesModels._
Expand All @@ -38,7 +38,7 @@ class KubernetesInterpreter[F[_]: Async: StructuredLogger: Effect: Timer: Contex
val cache = CacheBuilder
.newBuilder()
// We expect calls to be batched, such as when a user's environment within a cluster is created/deleted/stopped.
// This may need configuration
// TODO: Unhardcode expiration time
.expireAfterWrite(2, TimeUnit.HOURS)
.build(
new CacheLoader[KubernetesClusterId, ApiClient] {
Expand Down Expand Up @@ -68,40 +68,95 @@ class KubernetesInterpreter[F[_]: Async: StructuredLogger: Effect: Timer: Contex

// https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#podspec-v1-core
override def createPod(clusterId: KubernetesClusterId, pod: KubernetesPod, namespace: KubernetesNamespace): F[Unit] =
blockingClientProvider(clusterId, { kubernetesClient =>
Async[F].delay(
kubernetesClient.createNamespacedPod(namespace.name.value, pod.getJavaSerialization, null, null, null)
)
})
blockingClientProvider[CoreV1Api, Unit](
clusterId,
client => new CoreV1Api(client), { kubernetesClient =>
Async[F].delay(
kubernetesClient.createNamespacedPod(namespace.name.value, pod.getJavaSerialization, null, "true", null)
)
}
)

//why we use a service over a deployment https://matthewpalmer.net/kubernetes-app-developer/articles/service-kubernetes-example-tutorial.html
//services can be applied to pods/containers, while deployments are for pre-creating pods/containers
override def createService(clusterId: KubernetesClusterId,
service: KubernetesServiceKind,
namespace: KubernetesNamespace): F[Unit] =
blockingClientProvider(
clusterId, { kubernetesClient =>
blockingClientProvider[CoreV1Api, Unit](
clusterId,
client => new CoreV1Api(client), { kubernetesClient =>
Async[F].delay(
kubernetesClient.createNamespacedService(namespace.name.value, service.getJavaSerialization, null, null, null)
kubernetesClient.createNamespacedService(namespace.name.value,
service.getJavaSerialization,
null,
"true",
null)
)
}
)

override def createNamespace(clusterId: KubernetesClusterId, namespace: KubernetesNamespace): F[Unit] =
blockingClientProvider(clusterId, { kubernetesClient =>
Async[F].delay(kubernetesClient.createNamespace(namespace.getJavaSerialization, null, null, null))
})
blockingClientProvider[CoreV1Api, Unit](
clusterId,
client => new CoreV1Api(client), { kubernetesClient =>
Async[F].delay(kubernetesClient.createNamespace(namespace.getJavaSerialization, null, "true", null))
}
)

override def createServiceAccount(clusterId: KubernetesClusterId,
serviceAccount: KubernetesServiceAccount,
namespace: KubernetesNamespace): F[Unit] =
blockingClientProvider[CoreV1Api, Unit](
clusterId,
client => new CoreV1Api(client), { kubernetesClient =>
Async[F].delay(
kubernetesClient.createNamespacedServiceAccount(namespace.name.value,
serviceAccount.getJavaSerialization,
null,
"true",
null)
)
}
)

override def createRole(clusterId: KubernetesClusterId,
role: KubernetesRole,
namespace: KubernetesNamespace): F[Unit] =
blockingClientProvider[RbacAuthorizationV1Api, Unit](
clusterId,
client => new RbacAuthorizationV1Api(client), { kubernetesClient =>
Async[F].delay(
kubernetesClient.createNamespacedRole(namespace.name.value, role.getJavaSerialization, null, "true", null)
)
}
)

override def createRoleBinding(clusterId: KubernetesClusterId,
roleBinding: KubernetesRoleBinding,
namespace: KubernetesNamespace): F[Unit] =
blockingClientProvider[RbacAuthorizationV1Api, Unit](
clusterId,
client => new RbacAuthorizationV1Api(client), { kubernetesClient =>
Async[F].delay(
kubernetesClient.createNamespacedRoleBinding(namespace.name.value,
roleBinding.getJavaSerialization,
null,
"true",
null)
)
}
)

//DO NOT QUERY THE CACHE DIRECTLY
//There is a wrapper method that is necessary to ensure the token is refreshed
//we never make the entry stale, because we always need to refresh the token (see comment above getToken)
//if we did stale the entry we would have to unnecessarily re-do the google call
private def getClient(clusterId: KubernetesClusterId): F[CoreV1Api] =
private def getClient[A](clusterId: KubernetesClusterId, fa: ApiClient => A): F[A] =
for {
client <- Async[F].delay(cache.get(clusterId))
token <- getToken
token <- getToken()
_ <- Async[F].delay(client.setApiKey(token.getTokenValue))
} yield new CoreV1Api(client)
} yield fa(client)

//we always update the token, even for existing clients, so we don't have to maintain a reference to the last time each client was updated
//unfortunately, the kubernetes client does not implement a gcp authenticator, so we must do this ourselves.
Expand Down Expand Up @@ -133,15 +188,16 @@ class KubernetesInterpreter[F[_]: Async: StructuredLogger: Effect: Timer: Contex
} yield (apiClient)
}

//TODO: retry once we know what kubernetes codes are applicable
private def blockingClientProvider[A](clusterId: KubernetesClusterId, fa: CoreV1Api => F[A]): F[A] =
// TODO: retry once we know what Kubernetes error codes are applicable
private def blockingClientProvider[A, B](clusterId: KubernetesClusterId, fa: ApiClient => A, fb: A => F[B]): F[B] =
blockerBound.withPermit(
blocker
.blockOn(
for {
kubernetesClient <- getClient(clusterId)
clientCallResult <- fa(kubernetesClient)
.onError { //we aren't handling any errors here, they will be bubbled up, but we want to print a more helpful message that is otherwise obfuscated
kubernetesClient <- getClient(clusterId, fa)
clientCallResult <- fb(kubernetesClient)
.onError { // We aren't handling any errors here, they will be bubbled up, but we want to print a more helpful message that is otherwise obfuscated
// TODO: e.getResponseBody() doesn't seem to get printed out. Find out why and fix.
case e: ApiException => Async[F].delay(StructuredLogger[F].info(e.getResponseBody()))
}
} yield clientCallResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,28 @@ import org.broadinstitute.dsde.workbench.google2.util.RetryPredicates
import org.broadinstitute.dsde.workbench.model.TraceId

trait KubernetesService[F[_]] {
//namespaces group resources, and allow our list/get/update API calls to be segmented. This can be used on a per-user basis, for example
// namespaces group resources, and allow our list/get/update API calls to be segmented. This can be used on a per-user basis, for example
def createNamespace(clusterId: KubernetesClusterId, namespace: KubernetesNamespace): F[Unit]

//pods represent a set of containers
// A Kubernetes service account is an automatically enabled authenticator that uses signed bearer tokens to verify requests.
// NB: It is distinct from Google service accounts.
def createServiceAccount(clusterId: KubernetesClusterId,
serviceAccount: KubernetesServiceAccount,
namespaceName: KubernetesNamespace): F[Unit]

// pods represent a set of containers
def createPod(clusterId: KubernetesClusterId, pod: KubernetesPod, namespace: KubernetesNamespace): F[Unit]

//certain services allow us to expose various containers via a matching selector
// certain services allow us to expose various containers via a matching selector
def createService(clusterId: KubernetesClusterId,
service: KubernetesServiceKind,
namespace: KubernetesNamespace): F[Unit]

def createRole(clusterId: KubernetesClusterId, role: KubernetesRole, namespace: KubernetesNamespace): F[Unit]

def createRoleBinding(clusterId: KubernetesClusterId,
roleBinding: KubernetesRoleBinding,
namespace: KubernetesNamespace): F[Unit]
}

// This kubernetes service requires a GKEService because it needs to call getCluster
Expand Down
Loading

0 comments on commit 2c6d794

Please sign in to comment.