Skip to content

Commit

Permalink
Code review comments, and use GCS parsing code from workbench-utils
Browse files Browse the repository at this point in the history
  • Loading branch information
rtitle committed Sep 28, 2017
1 parent 8116db2 commit a825524
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 58 deletions.
14 changes: 8 additions & 6 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ object Dependencies {
val scalaTestV = "3.0.1"
val slickV = "3.2.1"

val workbenchUtilV = "0.2-d34dcf2"
val workbenchModelV = "0.1-d34dcf2"
val workbenchGoogleV = "0.1-d34dcf2"
val workbenchUtilV = "0.2-544dd49-SNAP"
val workbenchModelV = "0.2-544dd49-SNAP"
val workbenchGoogleV = "0.3-544dd49-SNAP"
val workbenchMetricsV = "0.3-544dd49-SNAP"

val excludeAkkaActor = ExclusionRule(organization = "com.typesafe.akka", name = "akka-actor_2.12")
val excludeGuavaJDK5 = ExclusionRule(organization = "com.google.guava", name = "guava-jdk5")
Expand Down Expand Up @@ -46,9 +47,10 @@ object Dependencies {

// All of workbench-libs pull in Akka; exclude it since we provide our own Akka dependency.
// workbench-google pulls in workbench-{util, model, metrics}; exclude them so we can control the library versions individually.
val workbenchUtil: ModuleID = "org.broadinstitute.dsde.workbench" %% "workbench-util" % workbenchUtilV
val workbenchModel: ModuleID = "org.broadinstitute.dsde.workbench" %% "workbench-model" % workbenchModelV
val workbenchGoogle: ModuleID = "org.broadinstitute.dsde.workbench" %% "workbench-google" % workbenchGoogleV excludeAll(excludeWorkbenchUtil, excludeWorkbenchModel, excludeWorkbenchMetrics)
val workbenchUtil: ModuleID = "org.broadinstitute.dsde.workbench" %% "workbench-util" % workbenchUtilV
val workbenchModel: ModuleID = "org.broadinstitute.dsde.workbench" %% "workbench-model" % workbenchModelV
val workbenchGoogle: ModuleID = "org.broadinstitute.dsde.workbench" %% "workbench-google" % workbenchGoogleV // excludeAll(excludeWorkbenchUtil, excludeWorkbenchModel, excludeWorkbenchMetrics)
val workbenchMetrics: ModuleID = "org.broadinstitute.dsde.workbench" %% "workbench-metrics" % workbenchMetricsV

val slick: ModuleID = "com.typesafe.slick" %% "slick" % slickV
val hikariCP: ModuleID = "com.typesafe.slick" %% "slick-hikaricp" % slickV
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.broadinstitute.dsde.workbench.leonardo.dao

import java.io.File

import org.broadinstitute.dsde.workbench.google.gcs.GcsBucketName
import org.broadinstitute.dsde.workbench.leonardo.model.ClusterStatus.{ClusterStatus => LeoClusterStatus}
import org.broadinstitute.dsde.workbench.leonardo.model.ModelTypes.GoogleProject
import org.broadinstitute.dsde.workbench.leonardo.model.{ClusterErrorDetails, ClusterRequest, ClusterResponse}
Expand Down Expand Up @@ -30,5 +32,5 @@ trait DataprocDAO {

def deleteClusterInitBucket(googleProject: GoogleProject, clusterName: String)(implicit executionContext: ExecutionContext): Future[Option[String]]

def deleteBucket(googleProject: GoogleProject, bucketName: String)(implicit executionContext: ExecutionContext): Future[Unit]
def deleteBucket(googleProject: GoogleProject, gcsBucketName: GcsBucketName)(implicit executionContext: ExecutionContext): Future[Unit]
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.broadinstitute.dsde.workbench.leonardo.dao

import java.io.{ByteArrayInputStream, File}
import java.net.URI
import java.nio.charset.StandardCharsets

import akka.actor.ActorSystem
Expand All @@ -27,24 +26,29 @@ import com.google.api.services.storage.model.Bucket.Lifecycle.Rule.{Action, Cond
import com.google.api.services.storage.model.{Bucket, StorageObject}
import com.google.api.services.storage.{Storage, StorageScopes}
import org.broadinstitute.dsde.workbench.google.GoogleUtilities
import org.broadinstitute.dsde.workbench.google.gcs.{GcsBucketName, GcsPath}
import org.broadinstitute.dsde.workbench.leonardo.config.{DataprocConfig, ProxyConfig}
import org.broadinstitute.dsde.workbench.leonardo.model.ClusterStatus.{ClusterStatus => LeoClusterStatus}
import org.broadinstitute.dsde.workbench.leonardo.model.ModelTypes.GoogleProject
import org.broadinstitute.dsde.workbench.leonardo.model.{ClusterErrorDetails, ClusterRequest, ClusterResponse, GoogleBucketUri, LeoException, StorageObjectResponse, Cluster => LeoCluster, ClusterStatus => LeoClusterStatus}
import org.broadinstitute.dsde.workbench.metrics.GoogleInstrumentedService

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future, blocking}
import scala.util.Try

case class CallToGoogleApiFailedException(googleProject: GoogleProject, clusterName:String, exceptionStatusCode: Int, errorMessage: String)
extends LeoException(s"Call to Google API failed for $googleProject/$clusterName. Message: $errorMessage",exceptionStatusCode)
case class CallToGoogleApiFailedException(googleProject: GoogleProject, objectName: String, exceptionStatusCode: Int, errorMessage: String)
extends LeoException(s"Call to Google API failed for $googleProject/$objectName. Message: $errorMessage", exceptionStatusCode)

case class FirewallRuleNotFoundException(googleProject: GoogleProject, firewallRuleName: String)
extends LeoException(s"Firewall rule $firewallRuleName not found in project $googleProject", StatusCodes.NotFound)

class GoogleDataprocDAO(protected val dataprocConfig: DataprocConfig, protected val proxyConfig: ProxyConfig)(implicit val system: ActorSystem, val executionContext: ExecutionContext)
extends DataprocDAO with GoogleUtilities {

// TODO pass as constructor arg when we add metrics
override protected val workbenchMetricBaseName: String = ""
implicit val service = GoogleInstrumentedService.Groups

private val httpTransport = GoogleNetHttpTransport.newTrustedTransport
private val jsonFactory = JacksonFactory.getDefaultInstance
private lazy val cloudPlatformScopes = List(ComputeScopes.CLOUD_PLATFORM)
Expand Down Expand Up @@ -178,22 +182,22 @@ class GoogleDataprocDAO(protected val dataprocConfig: DataprocConfig, protected
cluster <- OptionT.liftF(getCluster(googleProject, clusterName))
bucketName <- OptionT.fromOption(getInitBucketName(cluster))
_ <- OptionT.liftF(deleteBucket(googleProject, bucketName))
} yield bucketName
} yield bucketName.name

result.value
}

override def deleteBucket(googleProject: GoogleProject, bucketName: String)(implicit executionContext: ExecutionContext): Future[Unit] = {
override def deleteBucket(googleProject: GoogleProject, bucketName: GcsBucketName)(implicit executionContext: ExecutionContext): Future[Unit] = {
// Delete all objects in the bucket then delete the bucket itself
val listObjectsRequest = storage.objects().list(bucketName)
executeGoogleRequestAsync(googleProject, bucketName, listObjectsRequest).flatMap { objects =>
val listObjectsRequest = storage.objects().list(bucketName.name)
executeGoogleRequestAsync(googleProject, bucketName.name, listObjectsRequest).flatMap { objects =>
Future.traverse(objects.getItems.asScala) { item =>
val deleteObjectRequest = storage.objects().delete(bucketName, item.getName)
executeGoogleRequestAsync(googleProject, bucketName, deleteObjectRequest)
val deleteObjectRequest = storage.objects().delete(bucketName.name, item.getName)
executeGoogleRequestAsync(googleProject, bucketName.name, deleteObjectRequest)
}
} flatMap { _ =>
val deleteBucketRequest = storage.buckets().delete(bucketName)
executeGoogleRequestAsync(googleProject, bucketName, deleteBucketRequest).void
val deleteBucketRequest = storage.buckets().delete(bucketName.name)
executeGoogleRequestAsync(googleProject, bucketName.name, deleteBucketRequest).void
}
}

Expand Down Expand Up @@ -349,26 +353,22 @@ class GoogleDataprocDAO(protected val dataprocConfig: DataprocConfig, protected
} yield accessConfig.getNatIP
}

private def getInitBucketName(cluster: GoogleCluster): Option[String] = {
def parseBucketName(bucketPath: String): Option[String] = {
Try(Option(new URI(bucketPath).getHost)).toOption.flatten
}

private def getInitBucketName(cluster: GoogleCluster): Option[GcsBucketName] = {
for {
config <- Option(cluster.getConfig)
initAction <- config.getInitializationActions.asScala.headOption
bucketPath <- Option(initAction.getExecutableFile)
bucketName <- parseBucketName(bucketPath)
} yield bucketName
parsedBucketPath <- GcsPath.parse(bucketPath).toOption
} yield parsedBucketPath.bucketName
}

private def executeGoogleRequestAsync[A](googleProject: GoogleProject, clusterName: String, request: AbstractGoogleClientRequest[A])(implicit executionContext: ExecutionContext): Future[A] = {
private def executeGoogleRequestAsync[A](googleProject: GoogleProject, objectName: String, request: AbstractGoogleClientRequest[A])(implicit executionContext: ExecutionContext): Future[A] = {
Future {
blocking(executeGoogleRequest(request))
} recover {
case e: GoogleJsonResponseException =>
logger.error(s"Error occurred executing Google request for $googleProject/$clusterName", e)
throw CallToGoogleApiFailedException(googleProject, clusterName, e.getStatusCode, e.getDetails.getMessage)
logger.error(s"Error occurred executing Google request for $googleProject/$objectName", e)
throw CallToGoogleApiFailedException(googleProject, objectName, e.getStatusCode, e.getDetails.getMessage)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.broadinstitute.dsde.workbench.leonardo.model.ClusterStatus._
import org.broadinstitute.dsde.workbench.leonardo.model.{Cluster, ClusterErrorDetails, ClusterStatus}
import org.broadinstitute.dsde.workbench.leonardo.monitor.ClusterMonitorActor._
import org.broadinstitute.dsde.workbench.leonardo.monitor.ClusterMonitorSupervisor.ClusterDeleted
import org.broadinstitute.dsde.workbench.leonardo.service.LeonardoService
import org.broadinstitute.dsde.workbench.util.addJitter

import scala.concurrent.Future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package org.broadinstitute.dsde.workbench.leonardo.service

import java.io.File
import java.net.URI
import java.util.UUID

import akka.actor.ActorRef
import akka.http.scaladsl.model.StatusCodes
Expand All @@ -16,6 +15,7 @@ import org.broadinstitute.dsde.workbench.leonardo.model.LeonardoJsonSupport._
import org.broadinstitute.dsde.workbench.leonardo.model.ModelTypes.GoogleProject
import org.broadinstitute.dsde.workbench.leonardo.model._
import org.broadinstitute.dsde.workbench.leonardo.monitor.ClusterMonitorSupervisor.{ClusterCreated, ClusterDeleted, RegisterLeoService}
import org.broadinstitute.dsde.workbench.google.gcs._
import slick.dbio.DBIO
import spray.json._

Expand Down Expand Up @@ -109,16 +109,16 @@ class LeonardoService(protected val dataprocConfig: DataprocConfig, gdDAO: Datap
- Create the cluster in the google project
Currently, the bucketPath of the clusterRequest is not used - it will be used later as a place to store notebook results */
private[service] def createGoogleCluster(googleProject: GoogleProject, clusterName: String, clusterRequest: ClusterRequest)(implicit executionContext: ExecutionContext): Future[ClusterResponse] = {
val bucketName = generateBucketName(clusterName)
val bucketName = generateUniqueBucketName(clusterName)
for {
// Validate that the Jupyter extension URI is a valid URI and references a real GCS object
_ <- validateJupyterExtensionUri(googleProject, clusterRequest.jupyterExtensionUri)
// Create the firewall rule in the google project if it doesn't already exist, so we can access the cluster
_ <- gdDAO.updateFirewallRule(googleProject)
// Create the bucket in leo's google bucket and populate with initialization files
_ <- initializeBucket(dataprocConfig.leoGoogleBucket, clusterName, bucketName, clusterRequest)
_ <- initializeBucket(dataprocConfig.leoGoogleBucket, clusterName, bucketName.name, clusterRequest)
// Once the bucket is ready, build the cluster
clusterResponse <- gdDAO.createCluster(googleProject, clusterName, clusterRequest, bucketName).andThen { case Failure(e) =>
clusterResponse <- gdDAO.createCluster(googleProject, clusterName, clusterRequest, bucketName.name).andThen { case Failure(e) =>
// If cluster creation fails, delete the init bucket asynchronously
gdDAO.deleteBucket(googleProject, bucketName)
}
Expand Down Expand Up @@ -208,26 +208,4 @@ class LeonardoService(protected val dataprocConfig: DataprocConfig, gdDAO: Datap
case None => params
}
}

// TODO: remove once workbench-libs #26 is merged
def generateBucketName(prefix: String): String = {
// may only contain lowercase letters, numbers, underscores, dashes, or dots
val lowerCaseName = prefix.toLowerCase.filter { c =>
Character.isLetterOrDigit(c) || c == '_' || c == '-' || c == '.'
}

// must start with a letter or number
val sb = new StringBuilder(lowerCaseName)
if (!Character.isLetterOrDigit(sb.head)) sb.setCharAt(0, '0')

// max length of 63 chars, including the uuid
val uuid = UUID.randomUUID.toString
val maxNameLength = 63 - uuid.length - 1
if (sb.length > maxNameLength) sb.setLength(maxNameLength)

// must not start with "goog" or contain the string "google"
val processedName = sb.replaceAllLiterally("goog", "g00g")

s"$processedName-$uuid"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package org.broadinstitute.dsde.workbench.leonardo.dao

import java.io.File
import java.util.UUID

import org.broadinstitute.dsde.workbench.google.gcs.GcsBucketName
import org.broadinstitute.dsde.workbench.leonardo.config.DataprocConfig
import org.broadinstitute.dsde.workbench.leonardo.model.ModelTypes.GoogleProject
import org.broadinstitute.dsde.workbench.leonardo.model._
import org.broadinstitute.dsde.workbench.leonardo.model.ClusterStatus._

import scala.collection.concurrent.TrieMap
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -54,8 +57,8 @@ class MockGoogleDataprocDAO(protected val dataprocConfig: DataprocConfig) extend
Future.successful(None)
}

override def deleteBucket(googleProject: GoogleProject, bucketName: String)(implicit executionContext: ExecutionContext): Future[Unit] = {
buckets -= bucketName
override def deleteBucket(googleProject: GoogleProject, bucketName: GcsBucketName)(implicit executionContext: ExecutionContext): Future[Unit] = {
buckets -= bucketName.name
Future.successful(())
}

Expand Down

0 comments on commit a825524

Please sign in to comment.