Skip to content

Commit

Permalink
Merge f9ba0aa into 9b2805b
Browse files Browse the repository at this point in the history
  • Loading branch information
dvoet committed Jul 31, 2019
2 parents 9b2805b + f9ba0aa commit bccd51e
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 111 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ after_success:
- SBT_OPTS=-J-Xmx3g sbt clean coverage test coverageReport coverageAggregate coveralls -Denv.type=test
- bash scripts/publish.sh
- bash scripts/version_update.sh
dist: trusty
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ To depend on the `MockGoogle*` classes, additionally depend on:

Contains utility functions for talking to Google APIs via com.google.cloud client library (more recent) via gRPC.

Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.5-3fa06b5"`
Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.5-TRAVIS-REPLACE-ME"`

To start the Google PubSub emulator for unit testing:

Expand Down
4 changes: 3 additions & 1 deletion google2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Added
- Add `setBucketLabels`
- Add `listBlobsWithPrefix`
- Add `isRecursive` parameter to `listBlobsWithPrefix` and `listObjectsWithPrefix`
- Add RetryPredicates

Changed
- Use linebacker for blocking execution context
Expand All @@ -21,8 +22,9 @@ Changed
- Bump `http4sVersion` to `0.20.3`
- Deprecate `storeObject`, and add `createObject` that returns `Blob`
- Support custom storage IAM roles
- GoogleStorageService retry config defined per function via parameters instead of per service instance

SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.5-3fa06b5"`
SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.5-TRAVIS-REPLACE-ME"`

## 0.4

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import java.nio.file.Path
import cats.data.NonEmptyList
import cats.implicits._
import cats.effect._
import com.google.cloud.Policy
import com.google.cloud.{Identity, Policy}
import com.google.auth.oauth2.GoogleCredentials
import com.google.cloud.Identity
import com.google.cloud.storage.{Acl, Blob, BlobId, StorageOptions}
import com.google.cloud.storage.BucketInfo.LifecycleRule
import fs2.Stream
import io.chrisdavenport.linebacker.Linebacker
import io.chrisdavenport.log4cats.Logger
import org.broadinstitute.dsde.workbench.RetryConfig
import org.broadinstitute.dsde.workbench.google2.GoogleStorageInterpreter.defaultRetryConfig
import org.broadinstitute.dsde.workbench.model.TraceId
import org.broadinstitute.dsde.workbench.model.google.{GcsBucketName, GcsObjectName, GoogleProject}

import org.broadinstitute.dsde.workbench.google2.util.RetryPredicates.standardRetryConfig

import scala.language.higherKinds

/**
Expand All @@ -29,23 +29,24 @@ trait GoogleStorageService[F[_]] {
/**
* @param traceId uuid for tracing a unique call flow in logging
*/
def listObjectsWithPrefix(bucketName: GcsBucketName, objectNamePrefix: String, isRecursive: Boolean = false, maxPageSize: Long = 1000, traceId: Option[TraceId] = None): Stream[F, GcsObjectName]
def listObjectsWithPrefix(bucketName: GcsBucketName, objectNamePrefix: String, isRecursive: Boolean = false, maxPageSize: Long = 1000, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, GcsObjectName]

/**
* @param traceId uuid for tracing a unique call flow in logging
*/
def listBlobsWithPrefix(bucketName: GcsBucketName, objectNamePrefix: String, isRecursive: Boolean, maxPageSize: Long = 1000, traceId: Option[TraceId] = None): Stream[F, Blob]
def listBlobsWithPrefix(bucketName: GcsBucketName, objectNamePrefix: String, isRecursive: Boolean, maxPageSize: Long = 1000, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Blob]

/**
* not memory safe. Use listObjectsWithPrefix if you're worried about OOM
* @param traceId uuid for tracing a unique call flow in logging
*/
def unsafeListObjectsWithPrefix(bucketName: GcsBucketName, objectNamePrefix: String, maxPageSize: Long = 1000, traceId: Option[TraceId] = None)(implicit sf: Sync[F]): F[List[GcsObjectName]] = listObjectsWithPrefix(bucketName, objectNamePrefix).compile.toList
def unsafeListObjectsWithPrefix(bucketName: GcsBucketName, objectNamePrefix: String, maxPageSize: Long = 1000, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig)(implicit sf: Sync[F]): F[List[GcsObjectName]] =
listObjectsWithPrefix(bucketName, objectNamePrefix, maxPageSize = maxPageSize, traceId = traceId, retryConfig = retryConfig).compile.toList

/**
* @param traceId uuid for tracing a unique call flow in logging
*/
def createBlob(bucketName: GcsBucketName, objectName: GcsBlobName, objectContents: Array[Byte], objectType: String = "text/plain", metadata: Map[String, String] = Map.empty, generation: Option[Long] = None, traceId: Option[TraceId] = None): Stream[F, Blob]
def createBlob(bucketName: GcsBucketName, objectName: GcsBlobName, objectContents: Array[Byte], objectType: String = "text/plain", metadata: Map[String, String] = Map.empty, generation: Option[Long] = None, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Blob]

/**
* @param traceId uuid for tracing a unique call flow in logging
Expand All @@ -56,93 +57,94 @@ trait GoogleStorageService[F[_]] {
/**
* @param traceId uuid for tracing a unique call flow in logging
*/
def setBucketLifecycle(bucketName: GcsBucketName, lifecycleRules: List[LifecycleRule], traceId: Option[TraceId] = None): Stream[F, Unit]
def setBucketLifecycle(bucketName: GcsBucketName, lifecycleRules: List[LifecycleRule], traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Unit]

/**
* not memory safe. Use getObject if you're worried about OOM
* @param traceId uuid for tracing a unique call flow in logging
*/
@deprecated("Use unsafeGetObjectBody instead", "0.5")
def unsafeGetObject(bucketName: GcsBucketName, blobName: GcsBlobName, traceId: Option[TraceId] = None): F[Option[String]] = unsafeGetBlobBody(bucketName, blobName, traceId)
def unsafeGetObject(bucketName: GcsBucketName, blobName: GcsBlobName, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): F[Option[String]] = unsafeGetBlobBody(bucketName, blobName, traceId, retryConfig)

/**
* not memory safe. Use getObject if you're worried about OOM
* @param traceId uuid for tracing a unique call flow in logging
*/
def unsafeGetBlobBody(bucketName: GcsBucketName, blobName: GcsBlobName, traceId: Option[TraceId] = None): F[Option[String]]
def unsafeGetBlobBody(bucketName: GcsBucketName, blobName: GcsBlobName, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): F[Option[String]]

/**
* @param traceId uuid for tracing a unique call flow in logging
*/
@deprecated("Use getObject instead", "0.5")
def getObject(bucketName: GcsBucketName, blobName: GcsBlobName, traceId: Option[TraceId] = None): Stream[F, Byte] = getBlobBody(bucketName, blobName, traceId)
def getObject(bucketName: GcsBucketName, blobName: GcsBlobName, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Byte] = getBlobBody(bucketName, blobName, traceId, retryConfig)

/**
* @param traceId uuid for tracing a unique call flow in logging
*/
def getBlobBody(bucketName: GcsBucketName, blobName: GcsBlobName, traceId: Option[TraceId] = None): Stream[F, Byte]
def getBlobBody(bucketName: GcsBucketName, blobName: GcsBlobName, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Byte]

/**
* return com.google.cloud.storage.Blob, which gives you metadata and user defined metadata etc
* @param traceId uuid for tracing a unique call flow in logging
*/
def getBlob(bucketName: GcsBucketName, blobName: GcsBlobName, traceId: Option[TraceId] = None): Stream[F, Blob]
def getBlob(bucketName: GcsBucketName, blobName: GcsBlobName, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Blob]

/**
* @param traceId uuid for tracing a unique call flow in logging
*/
def downloadObject(blobId: BlobId, path: Path, traceId: Option[TraceId] = None): Stream[F, Unit]
def downloadObject(blobId: BlobId, path: Path, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Unit]

/**
* @param traceId uuid for tracing a unique call flow in logging
*/
def getObjectMetadata(bucketName: GcsBucketName, blobName: GcsBlobName, traceId: Option[TraceId]): Stream[F, GetMetadataResponse]
def getObjectMetadata(bucketName: GcsBucketName, blobName: GcsBlobName, traceId: Option[TraceId], retryConfig: RetryConfig = standardRetryConfig): Stream[F, GetMetadataResponse]

/**
* @param traceId uuid for tracing a unique call flow in logging
*/
def setObjectMetadata(bucketName: GcsBucketName, blobName: GcsBlobName, metadata: Map[String, String], traceId: Option[TraceId]): Stream[F, Unit]
def setObjectMetadata(bucketName: GcsBucketName, blobName: GcsBlobName, metadata: Map[String, String], traceId: Option[TraceId], retryConfig: RetryConfig = standardRetryConfig): Stream[F, Unit]

/**
* @return true if deleted; false if not found
*/
def removeObject(bucketName: GcsBucketName, blobName: GcsBlobName, generation: Option[Long] = None, traceId: Option[TraceId] = None): Stream[F, RemoveObjectResult]
def removeObject(bucketName: GcsBucketName, blobName: GcsBlobName, generation: Option[Long] = None, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, RemoveObjectResult]

/**
* @param traceId uuid for tracing a unique call flow in logging
* Acl is deprecated. Use setIamPolicy if possible
*/
@deprecated("Deprecated in favor of insertBucket", "0.5")
def createBucket(billingProject: GoogleProject, bucketName: GcsBucketName, acl: Option[NonEmptyList[Acl]] = None, traceId: Option[TraceId] = None): Stream[F, Unit] = insertBucket(billingProject, bucketName, acl, Map.empty, traceId)
def createBucket(billingProject: GoogleProject, bucketName: GcsBucketName, acl: Option[NonEmptyList[Acl]] = None, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Unit] = insertBucket(billingProject, bucketName, acl, Map.empty, traceId)

/**
* @param googleProject The name of the Google project to create the bucket in
* @param traceId uuid for tracing a unique call flow in logging
* Supports adding bucket labels during creation
* Acl is deprecated. Use setIamPolicy if possible
*/
def insertBucket(googleProject: GoogleProject, bucketName: GcsBucketName, acl: Option[NonEmptyList[Acl]] = None, labels: Map[String, String] = Map.empty, traceId: Option[TraceId] = None): Stream[F, Unit]
def insertBucket(googleProject: GoogleProject, bucketName: GcsBucketName, acl: Option[NonEmptyList[Acl]] = None, labels: Map[String, String] = Map.empty, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Unit]

/**
* @param traceId uuid for tracing a unique call flow in logging
*/
def setBucketPolicyOnly(bucketName: GcsBucketName, bucketPolicyOnlyEnabled: Boolean, traceId: Option[TraceId] = None): Stream[F, Unit]
def setBucketPolicyOnly(bucketName: GcsBucketName, bucketPolicyOnlyEnabled: Boolean, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Unit]

def setBucketLabels(bucketName: GcsBucketName, labels: Map[String, String], traceId: Option[TraceId] = None): Stream[F, Unit]
def setBucketLabels(bucketName: GcsBucketName, labels: Map[String, String], traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Unit]

/**
* @param traceId uuid for tracing a unique call flow in logging
*/
def setIamPolicy(bucketName: GcsBucketName, roles: Map[StorageRole, NonEmptyList[Identity]], traceId: Option[TraceId] = None): Stream[F, Unit]
def setIamPolicy(bucketName: GcsBucketName, roles: Map[StorageRole, NonEmptyList[Identity]], traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Unit]

def getIamPolicy(bucketName: GcsBucketName, traceId: Option[TraceId] = None): Stream[F, Policy]
def getIamPolicy(bucketName: GcsBucketName, traceId: Option[TraceId] = None, retryConfig: RetryConfig = standardRetryConfig): Stream[F, Policy]
}

object GoogleStorageService {
def resource[F[_]: ContextShift: Timer: Async: Logger: Linebacker](pathToCredentialJson: String, project: Option[GoogleProject] = None, retryConfig: RetryConfig = defaultRetryConfig): Resource[F, GoogleStorageService[F]] = for {
def resource[F[_]: ContextShift: Timer: Async: Logger: Linebacker](pathToCredentialJson: String, project: Option[GoogleProject] = None): Resource[F, GoogleStorageService[F]] = for {
db <- GoogleStorageInterpreter.storage[F](pathToCredentialJson, Linebacker[F].blockingContext, project)
} yield GoogleStorageInterpreter[F](db, retryConfig)
} yield GoogleStorageInterpreter[F](db)

def fromApplicationDefault[F[_]: ContextShift: Timer: Async: Logger: Linebacker](retryConfig: RetryConfig = defaultRetryConfig): Resource[F, GoogleStorageService[F]] = for {
def fromApplicationDefault[F[_]: ContextShift: Timer: Async: Logger: Linebacker](): Resource[F, GoogleStorageService[F]] = for {
db <- Resource.liftF(
Sync[F].delay(
StorageOptions
Expand All @@ -152,7 +154,7 @@ object GoogleStorageService {
.getService
)
)
} yield GoogleStorageInterpreter[F](db, retryConfig)
} yield GoogleStorageInterpreter[F](db)
}

final case class GcsBlobName(value: String) extends AnyVal
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.broadinstitute.dsde.workbench.google2.util

import java.io.IOException

import com.google.cloud.BaseServiceException
import org.broadinstitute.dsde.workbench.RetryConfig
import scala.concurrent.duration._

object RetryPredicates {
val standardRetryConfig = RetryConfig(
org.broadinstitute.dsde.workbench.util.addJitter(1 seconds, 1 seconds),
x => x * 2,
5,
standardRetryPredicate
)

def retryConfigWithPredicates(predicates: (Throwable => Boolean)*): RetryConfig = {
standardRetryConfig.copy(retryable = combine(predicates))
}

/**
* Retries anything google thinks is ok to retry plus any IOException
* @return
*/
def standardRetryPredicate: Throwable => Boolean = {
case e: BaseServiceException => e.isRetryable
case _: IOException => true
case _ => false
}

def whenStatusCode(code: Int): Throwable => Boolean = {
case e: BaseServiceException => e.getCode == code
case _ => false
}

def combine(predicates: Seq[Throwable => Boolean]): Throwable => Boolean = { throwable =>
predicates.map( _(throwable) ).foldLeft(false)(_ || _)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import fs2.Stream
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper
import io.chrisdavenport.linebacker.Linebacker
import org.broadinstitute.dsde.workbench.google2.Generators._
import org.broadinstitute.dsde.workbench.google2.GoogleStorageInterpreter._
import org.broadinstitute.dsde.workbench.google2.GoogleStorageInterpreterSpec._
import org.broadinstitute.dsde.workbench.util.WorkbenchTest
import org.scalacheck.Gen
Expand Down Expand Up @@ -110,7 +109,8 @@ class GoogleStorageInterpreterSpec extends AsyncFlatSpec with Matchers with Work
}
}

it should "retrieve multiple pages" in ioAssertion {
// disabled because the LocalStorageHelper does not seem to support pagination of results
ignore should "retrieve multiple pages" in ioAssertion {
val bucketName = genGcsBucketName.sample.get
val prefix = Gen.uuid.sample.get.toString
val blobNameWithPrefix = Gen.listOfN(4, genGcsBlobName).sample.get.map(x => GcsBlobName(s"$prefix${x.value}"))
Expand Down Expand Up @@ -146,6 +146,6 @@ object GoogleStorageInterpreterSpec {
implicit val lineBacker = Linebacker.fromExecutionContext[IO](ExecutionContext.global)

val db = LocalStorageHelper.getOptions().getService()
val localStorage = GoogleStorageInterpreter[IO](db, defaultRetryConfig)
val localStorage = GoogleStorageInterpreter[IO](db)
val objectType = "text/plain"
}
Loading

0 comments on commit bccd51e

Please sign in to comment.