Skip to content

Commit

Permalink
Merge 344f6da into be6643d
Browse files Browse the repository at this point in the history
  • Loading branch information
Qi77Qi committed Dec 5, 2019
2 parents be6643d + 344f6da commit cf667cf
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 12 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ To depend on the `MockGoogle*` classes, additionally depend on:

Contains utility functions for talking to Google APIs via com.google.cloud client library (more recent) via gRPC.

Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.6-4df42ca"`
Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.6-TRAVIS-REPLACE-ME"`

To start the Google PubSub emulator for unit testing:

Expand Down
5 changes: 3 additions & 2 deletions google2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ Changed
- Bump `fs2-io` to `2.0.1`
- Add optional `blockderBound` to `GoogleStorageService` constructor
- Remove `LineBacker` usage
- Add arguments to `insertBucket`
- Add arguments to `insertBucket`
- Fix `scala.MatchError` from `handleErrorWith`

Add
- Add `GoogleDataproc` and `GoogleDataprocInterpreter`

SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.6-4df42ca"`
SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.6-TRAVIS-REPLACE-ME"`

## 0.5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import io.circe.syntax._
import org.broadinstitute.dsde.workbench.RetryConfig

private[google2] class GooglePublisherInterpreter[F[_]: Async: Timer: Logger](
publisher: Publisher,
retryConfig: RetryConfig
) extends GooglePublisher[F] {
publisher: Publisher,
retryConfig: RetryConfig
) extends GooglePublisher[F] {
def publish[MessageType: Encoder]: Pipe[F, MessageType, Unit] = in => {
in.flatMap {
message =>
Expand Down Expand Up @@ -49,9 +49,9 @@ private[google2] class GooglePublisherInterpreter[F[_]: Async: Timer: Logger](

object GooglePublisherInterpreter {
def apply[F[_]: Async: Timer: ContextShift: Logger](
publisher: Publisher,
retryConfig: RetryConfig
): GooglePublisherInterpreter[F] = new GooglePublisherInterpreter(publisher, retryConfig)
publisher: Publisher,
retryConfig: RetryConfig
): GooglePublisherInterpreter[F] = new GooglePublisherInterpreter(publisher, retryConfig)

def publisher[F[_]: Sync](config: PublisherConfig): Resource[F, Publisher] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private[google2] class GoogleStorageInterpreter[F[_]: ContextShift: Timer: Async

val dbForProject = db.getOptions.toBuilder.setProjectId(googleProject.value).build().getService

val createBucket = blockingF(Async[F].delay(dbForProject.create(bucketInfo))).void.handleErrorWith {
val createBucket = blockingF(Async[F].delay(dbForProject.create(bucketInfo))).void.onError {
case e: com.google.cloud.storage.StorageException if(e.getCode == 409) =>
Logger[F].info(s"$bucketName already exists")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class GoogleTopicAdminInterpreter[F[_]: Logger: Sync: Timer](topicAdminClient: T

def create(projectTopicName: ProjectTopicName, traceId: Option[TraceId] = None): Stream[F, Unit] = {
val loggingCtx = Map("topic" -> projectTopicName.asJson, "traceId" -> traceId.asJson)
val createTopic = Sync[F].delay(topicAdminClient.createTopic(projectTopicName)).void.handleErrorWith {
val createTopic = Sync[F].delay(topicAdminClient.createTopic(projectTopicName)).void.onError {
case _: com.google.api.gax.rpc.AlreadyExistsException =>
Logger[F].debug(s"$projectTopicName already exists")
}
Expand All @@ -33,7 +33,7 @@ class GoogleTopicAdminInterpreter[F[_]: Logger: Sync: Timer](topicAdminClient: T

def createWithPublisherMembers(projectTopicName: ProjectTopicName, members: List[Identity], traceId: Option[TraceId] = None): Stream[F, Unit] = {
val loggingCtx = Map("topic" -> projectTopicName.asJson, "traceId" -> traceId.asJson)
val createTopic = Sync[F].delay(topicAdminClient.createTopic(projectTopicName)).void.handleErrorWith {
val createTopic = Sync[F].delay(topicAdminClient.createTopic(projectTopicName)).void.onError {
case _: com.google.api.gax.rpc.AlreadyExistsException =>
Logger[F].debug(s"$projectTopicName topic already exists")
}
Expand Down

0 comments on commit cf667cf

Please sign in to comment.