Skip to content

Commit

Permalink
Merge 8db0ebd into 2e43a4f
Browse files Browse the repository at this point in the history
  • Loading branch information
Qi77Qi committed Nov 25, 2020
2 parents 2e43a4f + 8db0ebd commit 711ce1e
Show file tree
Hide file tree
Showing 19 changed files with 295 additions and 95 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ test-output/
*.log
/config/
.bloop

.bsp
*.pid
.DS_Store
.metals
84 changes: 84 additions & 0 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Repository-specific configuration

# pullRequests.frequency allows to control how often or when Scala Steward
# is allowed to create pull requests.
#
# Possible values:
# @asap
# PRs are created without delay.
#
# <timespan>
# PRs are created only again after the given timespan since the last PR
# has passed. Example values are "36 hours", "1 day", or "14 days".

# <CRON expression>
# PRs are created roughly according to the given CRON expression.
#
# CRON expressions consist of five fields:
# minutes, hour of day, day of month, month, and day of week.
#
# See https://www.alonsodomin.me/cron4s/userguide/index.html#parsing for
# more information about the CRON expressions that are supported.
#
# Note that the date parts of the CRON expression are matched exactly
# while the the time parts are only used to abide to the frequency of
# the given expression.
#
# Default: @asap
#
#pullRequests.frequency = "0 0 ? * 3" # every thursday on midnight
pullRequests.frequency = "15 days"

# Only these dependencies which match the given patterns are updated.
#
# Each pattern must have `groupId`, and may have `artifactId` and `version`.
# Defaults to empty `[]` which mean Scala Steward will update all dependencies.
# updates.allow = [ { groupId = "com.example" } ]

# The dependencies which match the given version pattern are updated.
# Dependencies that are not listed will be updated.
#
# Each pattern must have `groupId`, `version` and optional `artifactId`.
# Defaults to empty `[]` which mean Scala Steward will update all dependencies.
# the following example will allow to update foo when version is 1.1.x
updates.pin = [ { groupId = "org.broadinstitute.dsde", artifactId="rawls-model", version = "0.1-2356e282" } ]

# The dependencies which match the given pattern are NOT updated.
#
# Each pattern must have `groupId`, and may have `artifactId` and `version`.
# Defaults to empty `[]` which mean Scala Steward will not ignore dependencies.
updates.ignore = []

# If set, Scala Steward will only attempt to create or update `n` PRs.
# Useful if running frequently and/or CI build are costly
# Default: None

# By default, Scala Steward does not update scala version since its tricky, error-prone
# and results in bad PRs and/or failed builds
# If set to true, Scala Steward will attempt to update the scala version
# Since this feature is experimental, the default is set to false
# Default: false
updates.includeScala = true

# The extensions of files that should be updated.
# Default: [".scala", ".sbt", ".sbt.shared", ".sc", ".yml", "pom.xml"]
updates.fileExtensions = [".scala", ".sbt", ".sbt.shared", ".sc", ".yml", ".md", ".markdown", ".txt"]

# If "on-conflicts", Scala Steward will update the PR it created to resolve conflicts as
# long as you don't change it yourself.
# If "always", Scala Steward will always update the PR it created as long as
# you don't change it yourself.
# If "never", Scala Steward will never update the PR
# Default: "on-conflicts"
updatePullRequests = "always" | "on-conflicts" | "never"

# If set, Scala Steward will use this message template for the commit messages and PR titles.
# Supported variables: ${artifactName}, ${currentVersion}, ${nextVersion} and ${default}
# Default: "${default}" which is equivalent to "Update ${artifactName} to ${nextVersion}"
commits.message = "Update ${artifactName} from ${currentVersion} to ${nextVersion}"

# If true and when upgrading version in .scalafmt.conf, Scala Steward will perform scalafmt
# and add a separate commit when format changed. So you don't need reformat manually and can merge PR.
# If false, Scala Steward will not perform scalafmt, so your CI may abort when reformat needed.
# Default: true
scalafmt.runAfterUpgrading = true
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Workbench utility libraries, built for Scala 2.12 and 2.13. You can find the ful

Contains utility functions for talking to Google APIs via com.google.cloud client library (more recent) via gRPC.

Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.16-42883ed"`
Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.17-TRAVIS-REPLACE-ME"`

To start the Google PubSub emulator for unit testing:

Expand Down
56 changes: 55 additions & 1 deletion google2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,66 @@

This file documents changes to the `workbench-google2` library, including notes on how to upgrade to new versions.

## 0.17
Add
- Add `list` to `GoogleTopicAdmin`
- Add `GoogleSubscriptionAdmin`

Changed
- Remove `retryConfig` from `PublisherConfig`
- Update `io.kubernetes client-java` to `5.0.0` to `10.0.0` (This has some breaking changes)
- Scala Steward (5):
```
Update google-cloud-pubsub to 1.109.0 (#409)
Update fs2-io to 2.4.6 (#411)
Update google-cloud-bigquery to 1.125.0 (#381)
Update google-cloud-firestore to 1.35.2 (#385)
Update google-cloud-kms to 1.40.2 (#386)
Update google-cloud-firestore to 2.1.0 (#412)
Update grpc-core to 1.33.1 (#395)
Update metrics4-scala to 4.1.14 (#413)
Update http4s-blaze-client, http4s-circe, ... to 0.21.12 (#415)
Update mockito-core to 3.6.28 (#414)
Update guava to 30.0-jre (#390)
```
SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.17-TRAVIS-REPLACE-ME"`

## 0.16

Changed:
Add:
- Add `subscriptionName: Option[ProjectSubscriptionName]`, `deadLetterPolicy: Option[SubscriberDeadLetterPolicy]` and `filter: Option[String]` to `SubscriberConfig`
- Add a `GoogleBigQueryService.resource()` method that accepts the Google project to be billed

Changed:
- Scala Steward:
```
Update mockito-3-4 to 3.2.3.0 (#404)
Update commons-codec to 1.15 (#393)
Update akka-http, akka-http-spray-json, ... to 10.2.1 (#392)
Update mockito-core to 2.28.2 (#399)
Update sbt to 1.4.4 (#400)
Update google-cloud-container to 1.2.0 (#382)
Update google-cloud-errorreporting to 0.120.8-beta (#384)
Update google-api-services-container to v1-rev20201007-1.30.10 (#380)
Update google-cloud-nio to 0.122.1 (#387) (Note: upgrade to this version if your project explicitly specifies version)
Update akka-actor, akka-stream, ... to 2.6.10 (#391)
Update mockito-core to 3.6.0 (#407)
Update opencensus-api, ... to 0.28.2 (#397)
Update log4cats-slf4j to 1.1.1 (#394)
Update google-cloud-storage to 1.113.4 (#389)
Update google-cloud-pubsub to 1.105.1 (#388)
Update google-cloud-dataproc to 0.122.3 (#383)
Update sbt-scalafix to 0.9.23 (#378)
Update scalacheck to 1.15.1 (#401)
Update commons-codec to 20041127.091804 (#406)
Update scalafmt-core to 2.7.5 (#402)
Update http4s-blaze-client, http4s-circe, ... to 0.21.11 (#398)
Update google-cloud-dataproc to 1.1.7 (#408)
Update scalatest to 3.2.3 (#403)
Update fs2-io to 2.4.5 (#379)
```

SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.16-42883ed"`

## 0.15
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ object GooglePublisher {
): Resource[F, GooglePublisher[F]] =
for {
publisher <- GooglePublisherInterpreter.publisher(config)
} yield GooglePublisherInterpreter(publisher, config.retryConfig)
} yield GooglePublisherInterpreter(publisher)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,22 @@ import fs2.{Pipe, Stream}
import io.chrisdavenport.log4cats.StructuredLogger
import io.circe.Encoder
import io.circe.syntax._
import org.broadinstitute.dsde.workbench.RetryConfig
import org.broadinstitute.dsde.workbench.model.TraceId

private[google2] class GooglePublisherInterpreter[F[_]: Async: Timer: StructuredLogger](
publisher: Publisher,
retryConfig: RetryConfig
publisher: Publisher
) extends GooglePublisher[F] {
def publish[MessageType: Encoder]: Pipe[F, MessageType, Unit] = in => {
in.flatMap { message =>
publishMessage(message.asJson.noSpaces, None) //This will turn message case class into raw json string
Stream
.eval(publishMessage(message.asJson.noSpaces, None)) //This will turn message case class into raw json string
}
}

def publishNative: Pipe[F, PubsubMessage, Unit] = in => {
in.flatMap { message =>
for {
_ <- retryGoogleF(retryConfig)(
Stream.eval(
withLogging(
Async[F]
.async[String] { callback =>
ApiFutures.addCallback(
Expand All @@ -44,22 +43,22 @@ private[google2] class GooglePublisherInterpreter[F[_]: Async: Timer: Structured
Option(message.getAttributesMap.get("traceId")).map(s => TraceId(s)),
s"Publishing ${message}"
)
} yield ()
)
}
}

def publishString: Pipe[F, String, Unit] = in => {
in.flatMap(s => publishMessage(s, None))
in.flatMap(s => Stream.eval(publishMessage(s, None)))
}

override def publishOne[MessageType: Encoder](message: MessageType)(implicit ev: Ask[F, TraceId]): F[Unit] = {
val byteString = ByteString.copyFromUtf8(message.asJson.noSpaces)
tracedLogging(asyncPublishMessage(byteString), s"com.google.cloud.pubsub.v1.Publisher.publish($byteString)")
}

private def publishMessage(message: String, traceId: Option[TraceId]): Stream[F, Unit] = {
private def publishMessage(message: String, traceId: Option[TraceId]): F[Unit] = {
val byteString = ByteString.copyFromUtf8(message)
retryGoogleF(retryConfig)(asyncPublishMessage(byteString), traceId, s"Publishing $message")
withLogging(asyncPublishMessage(byteString), traceId, s"Publishing $message")
}

private def asyncPublishMessage(byteString: ByteString): F[Unit] =
Expand All @@ -80,9 +79,8 @@ private[google2] class GooglePublisherInterpreter[F[_]: Async: Timer: Structured

object GooglePublisherInterpreter {
def apply[F[_]: Async: Timer: ContextShift: StructuredLogger](
publisher: Publisher,
retryConfig: RetryConfig
): GooglePublisherInterpreter[F] = new GooglePublisherInterpreter(publisher, retryConfig)
publisher: Publisher
): GooglePublisherInterpreter[F] = new GooglePublisherInterpreter(publisher)

def publisher[F[_]: Sync: StructuredLogger](config: PublisherConfig): Resource[F, Publisher] =
for {
Expand Down Expand Up @@ -117,7 +115,4 @@ object GooglePublisherInterpreter {
)(p => Sync[F].delay(p.shutdown()))
}

final case class PublisherConfig(pathToCredentialJson: String,
projectTopicName: ProjectTopicName,
retryConfig: RetryConfig
)
final case class PublisherConfig(pathToCredentialJson: String, projectTopicName: ProjectTopicName)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.broadinstitute.dsde.workbench.google2

import java.nio.file.Path

import cats.effect.{Resource, Sync, Timer}
import cats.mtl.Ask
import com.google.api.gax.core.FixedCredentialsProvider
import com.google.auth.oauth2.ServiceAccountCredentials
import com.google.cloud.pubsub.v1.{SubscriptionAdminClient, SubscriptionAdminSettings}
import com.google.pubsub.v1.{ProjectSubscriptionName, Subscription}
import fs2.Stream
import io.chrisdavenport.log4cats.StructuredLogger
import org.broadinstitute.dsde.workbench.model.TraceId
import org.broadinstitute.dsde.workbench.model.google.GoogleProject

trait GoogleSubscriptionAdmin[F[_]] {
def list(project: GoogleProject)(implicit ev: Ask[F, TraceId]): Stream[F, Subscription]
def delete(projectSubscriptionName: ProjectSubscriptionName)(implicit ev: Ask[F, TraceId]): F[Unit]
}

object GoogleSubscriptionAdmin {
def fromCredentialPath[F[_]: StructuredLogger: Sync: Timer](
pathToCredential: Path
): Resource[F, GoogleSubscriptionAdmin[F]] =
for {
credential <- credentialResource(pathToCredential.toString)
topicAdmin <- fromServiceAccountCredential(credential)
} yield topicAdmin

def fromServiceAccountCredential[F[_]: StructuredLogger: Sync: Timer](
serviceAccountCredentials: ServiceAccountCredentials
): Resource[F, GoogleSubscriptionAdmin[F]] =
for {
client <- Resource.make(
Sync[F].delay(
SubscriptionAdminClient.create(
SubscriptionAdminSettings
.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(serviceAccountCredentials))
.build()
)
)
)(client => Sync[F].delay(client.shutdown()))
} yield new GoogleSubscriptionAdminInterpreter[F](client)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.broadinstitute.dsde.workbench.google2

import cats.effect.{Sync, Timer}
import cats.mtl.Ask
import com.google.cloud.pubsub.v1.SubscriptionAdminClient
import com.google.pubsub.v1.{ProjectName, ProjectSubscriptionName, Subscription}
import fs2.Stream
import io.chrisdavenport.log4cats.StructuredLogger
import org.broadinstitute.dsde.workbench.model.TraceId
import org.broadinstitute.dsde.workbench.model.google.GoogleProject

import scala.collection.JavaConverters._

private[google2] class GoogleSubscriptionAdminInterpreter[F[_]: Timer](client: SubscriptionAdminClient)(implicit
F: Sync[F],
logger: StructuredLogger[F]
) extends GoogleSubscriptionAdmin[F] {
def list(project: GoogleProject)(implicit ev: Ask[F, TraceId]): Stream[F, Subscription] = {
val fa = F.delay(client.listSubscriptions(ProjectName.of(project.value)))
for {
resp <- Stream.eval(
tracedLogging(fa, s"com.google.cloud.pubsub.v1.SubscriptionAdminClient.listSubscriptions(${project.value})")
)
pagedResponse <- Stream.fromIterator(resp.iteratePages().iterator().asScala)
subscription <- Stream.fromIterator(pagedResponse.getValues.iterator().asScala)
} yield subscription
}

def delete(projectSubscriptionName: ProjectSubscriptionName)(implicit ev: Ask[F, TraceId]): F[Unit] = {
val fa = F.delay(client.deleteSubscription(projectSubscriptionName))
tracedLogging(fa,
s"com.google.cloud.pubsub.v1.SubscriptionAdminClient.deleteSubscription(${projectSubscriptionName})"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@ package org.broadinstitute.dsde.workbench
package google2

import cats.effect.{Resource, Sync, Timer}
import cats.mtl.Ask
import com.google.auth.oauth2.ServiceAccountCredentials
import com.google.cloud.Identity
import com.google.pubsub.v1.TopicName
import com.google.pubsub.v1.{Topic, TopicName}
import fs2.Stream
import io.chrisdavenport.log4cats.StructuredLogger
import org.broadinstitute.dsde.workbench.google2.GoogleTopicAdminInterpreter._
import org.broadinstitute.dsde.workbench.model.TraceId
import org.broadinstitute.dsde.workbench.model.google.GoogleProject

trait GoogleTopicAdmin[F[_]] {

/**
* @param traceId uuid for tracing a unique call flow in logging
*/
def create(projectTopicName: TopicName, traceId: Option[TraceId] = None): Stream[F, Unit]
def create(projectTopicName: TopicName, traceId: Option[TraceId] = None): F[Unit]

def delete(projectTopicName: TopicName, traceId: Option[TraceId] = None): Stream[F, Unit]
def delete(projectTopicName: TopicName, traceId: Option[TraceId] = None): F[Unit]

def list(project: GoogleProject)(implicit ev: Ask[F, TraceId]): Stream[F, Topic]

/**
* @param projectTopicName
Expand All @@ -39,24 +43,22 @@ trait GoogleTopicAdmin[F[_]] {
def createWithPublisherMembers(projectTopicName: TopicName,
members: List[Identity],
traceId: Option[TraceId] = None
): Stream[F, Unit]
): F[Unit]
}

object GoogleTopicAdmin {
def fromCredentialPath[F[_]: StructuredLogger: Sync: Timer](
pathToCredential: String,
retryConfig: RetryConfig = GoogleTopicAdminInterpreter.defaultRetryConfig
pathToCredential: String
): Resource[F, GoogleTopicAdmin[F]] =
for {
credential <- credentialResource(pathToCredential)
topicAdmin <- fromServiceAccountCrendential(credential, retryConfig)
topicAdmin <- fromServiceAccountCrendential(credential)
} yield topicAdmin

def fromServiceAccountCrendential[F[_]: StructuredLogger: Sync: Timer](
serviceAccountCredentials: ServiceAccountCredentials,
retryConfig: RetryConfig
serviceAccountCredentials: ServiceAccountCredentials
): Resource[F, GoogleTopicAdmin[F]] =
for {
topicAdminClient <- topicAdminClientResource(serviceAccountCredentials)
} yield new GoogleTopicAdminInterpreter[F](topicAdminClient, retryConfig)
} yield new GoogleTopicAdminInterpreter[F](topicAdminClient)
}
Loading

0 comments on commit 711ce1e

Please sign in to comment.