Skip to content

Commit

Permalink
adding delete interface to admin (#278)
Browse files Browse the repository at this point in the history
* adding delete interface to topicAdmin
  • Loading branch information
jdcanas committed Jan 23, 2020
1 parent 255941e commit c91d96b
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 4 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ To depend on the `MockGoogle*` classes, additionally depend on:

Contains utility functions for talking to Google APIs via com.google.cloud client library (more recent) via gRPC.

Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.6-ca8509e"`
Latest SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.6-TRAVIS-REPLACE-ME"`

To start the Google PubSub emulator for unit testing:

Expand Down
4 changes: 3 additions & 1 deletion google2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ Changed
- Remove `LineBacker` usage
- Add arguments to `insertBucket`
- Fix `scala.MatchError` from `handleErrorWith`
- Add `delete` function to `GoogleTopicAdmin` trait and implementation

Add
- Add `GoogleDataproc` and `GoogleDataprocInterpreter`

SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.6-ca8509e"`
SBT dependency: `"org.broadinstitute.dsde.workbench" %% "workbench-google2" % "0.6-TRAVIS-REPLACE-ME"`


## 0.5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ trait GooglePublisher[F[_]] {
* Watch out message size quota and limitations https://cloud.google.com/pubsub/quotas
*/
def publishString: Pipe[F, String, Unit]

}

object GooglePublisher {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ trait GoogleTopicAdmin[F[_]] {
*/
def create(projectTopicName: ProjectTopicName, traceId: Option[TraceId] = None): Stream[F, Unit]

def delete(projectTopicName: ProjectTopicName, traceId: Option[TraceId] = None): Stream[F, Unit]

/**
* @param projectTopicName
* @param members can have the following values
Expand Down Expand Up @@ -46,4 +48,4 @@ object GoogleTopicAdmin {
def fromServiceAccountCrendential[F[_]: Logger: Sync: Timer](serviceAccountCredentials: ServiceAccountCredentials, retryConfig: RetryConfig): Resource[F, GoogleTopicAdmin[F]] = for {
topicAdminClient <- topicAdminClientResource(serviceAccountCredentials)
} yield new GoogleTopicAdminInterpreter[F](topicAdminClient, retryConfig)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ class GoogleTopicAdminInterpreter[F[_]: Logger: Sync: Timer](topicAdminClient: T
retryHelper[Unit](createTopic, traceId, s"com.google.cloud.pubsub.v1.TopicAdminClient.createTopic($projectTopicName)")
}

def delete(projectTopicName: ProjectTopicName, traceId: Option[TraceId] = None): Stream[F, Unit] = {
val loggingCtx = Map("topic" -> projectTopicName.asJson, "traceId" -> traceId.asJson)
val deleteTopic = Sync[F].delay(topicAdminClient.deleteTopic(projectTopicName))

retryHelper[Unit](deleteTopic, traceId, s"com.google.cloud.pubsub.v1.TopicAdminClient.deleteTopic($projectTopicName)")
}

def createWithPublisherMembers(projectTopicName: ProjectTopicName, members: List[Identity], traceId: Option[TraceId] = None): Stream[F, Unit] = {
val loggingCtx = Map("topic" -> projectTopicName.asJson, "traceId" -> traceId.asJson)
val createTopic = Sync[F].delay(topicAdminClient.createTopic(projectTopicName)).void.onError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,27 @@ class GoogleTopicAdminSpec extends FlatSpec with Matchers with WorkbenchTestSuit
}
}

"GoogleTopicAdminInterpreter" should "be able to delete topic" in {
forAll{
(topic: ProjectTopicName) =>
val result = localTopicAdmin.use {
topicAdmin =>
val googleTopicAdmin = new GoogleTopicAdminInterpreter[IO](topicAdmin, GoogleTopicAdminInterpreter.defaultRetryConfig)
val res = for {
_ <- googleTopicAdmin.create(topic)
_ <- googleTopicAdmin.delete(topic)
caught = the[com.google.api.gax.rpc.NotFoundException] thrownBy {
topicAdmin.getTopic(topic)
}
} yield (caught.getMessage should include("NOT_FOUND"))

res.compile.lastOrError
}

result.unsafeRunSync()
}
}

//pubsub getIamPolicy isn't implemented in emulator
}

Expand All @@ -45,4 +66,4 @@ object GoogleTopicAdminSpec {
.setCredentialsProvider(credentialsProvider)
.build())))(client => IO(client.shutdown()))
} yield topicClient
}
}

0 comments on commit c91d96b

Please sign in to comment.