From 6c99767e4ba841ec0d45e20a7f07003e2f7e3a42 Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Fri, 6 Jan 2023 12:53:22 -0500 Subject: [PATCH] feat: add pubsublite-kafka-auth module (#363) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * feat: Create AuthServer/ClientParameters * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot --- README.md | 7 +- kafka_gcp_credentials.py | 70 +++++++++++ pom.xml | 1 + pubsublite-kafka-auth/pom.xml | 34 +++++ .../pubsublite/kafka/ClientParameters.java | 63 ++++++++++ .../pubsublite/kafka/internal/AuthServer.java | 118 ++++++++++++++++++ samples/snapshot/pom.xml | 16 +++ samples/snippets/pom.xml | 5 + .../java/pubsublite/KafkaProducerExample.java | 86 +++++++++++++ .../test/java/pubsublite/QuickStartIT.java | 77 ++++++++---- 10 files changed, 450 insertions(+), 27 deletions(-) create mode 100644 kafka_gcp_credentials.py create mode 100644 pubsublite-kafka-auth/pom.xml create mode 100644 pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/ClientParameters.java create mode 100644 pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/internal/AuthServer.java create mode 100644 samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java diff --git a/README.md b/README.md index 4563723..56149d0 100644 --- a/README.md +++ b/README.md @@ -19,20 +19,20 @@ If you are using Maven, add this to your pom.xml file: com.google.cloud pubsublite-kafka - 1.0.3 + 1.0.4 ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:pubsublite-kafka:1.0.3' +implementation 'com.google.cloud:pubsublite-kafka:1.0.4' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.0.3" +libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.0.4" ``` ## Authentication @@ -160,6 +160,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsublite-ka | Sample | Source Code | Try it | | --------------------------- | --------------------------------- | ------ | | Consumer Example | [source code](https://github.com/googleapis/java-pubsublite-kafka/blob/main/samples/snippets/src/main/java/pubsublite/ConsumerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-kafka&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/ConsumerExample.java) | +| Kafka Producer Example | [source code](https://github.com/googleapis/java-pubsublite-kafka/blob/main/samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-kafka&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java) | | Producer Example | [source code](https://github.com/googleapis/java-pubsublite-kafka/blob/main/samples/snippets/src/main/java/pubsublite/ProducerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-kafka&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/ProducerExample.java) | diff --git a/kafka_gcp_credentials.py b/kafka_gcp_credentials.py new file mode 100644 index 0000000..3a00c3b --- /dev/null +++ b/kafka_gcp_credentials.py @@ -0,0 +1,70 @@ +import base64 +import datetime +import google.auth +import google.auth.transport.urllib3 +import http.server +import json +import urllib3 + +_credentials, _project = google.auth.default() +_http_client = urllib3.PoolManager() + + +def valid_credentials(): + if not _credentials.valid: + _credentials.refresh( + google.auth.transport.urllib3.Request(_http_client)) + return _credentials + + +_HEADER = json.dumps(dict(typ='JWT', alg='GOOG_TOKEN')) + + +def get_jwt(creds): + return json.dumps(dict(exp=creds.expiry.timestamp(), + iat=datetime.datetime.utcnow().timestamp(), + scope='pubsub', + sub='unused')) + + +def b64_encode(source): + return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8') + + +def get_kafka_access_token(creds): + return '.'.join([b64_encode(_HEADER), b64_encode(get_jwt(creds)), + b64_encode(creds.token)]) + + +def build_message(): + creds = valid_credentials() + expiry_seconds = (creds.expiry - datetime.datetime.utcnow()).total_seconds() + return json.dumps( + dict(access_token=get_kafka_access_token(creds), token_type='bearer', + expires_in=expiry_seconds)) + + +class AuthHandler(http.server.BaseHTTPRequestHandler): + def _handle(self): + self.send_response(200) + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write(build_message().encode('utf-8')) + + def do_GET(self): + self._handle() + + def do_POST(self): + self._handle() + + +def run_server(): + server_address = ('localhost', 14293) + server = http.server.ThreadingHTTPServer(server_address, AuthHandler) + print("Serving on localhost:14293. This is not accessible outside of the " + "current machine.") + server.serve_forever() + + +if __name__ == '__main__': + run_server() diff --git a/pom.xml b/pom.xml index aa8fdda..196c3cc 100644 --- a/pom.xml +++ b/pom.xml @@ -15,6 +15,7 @@ Parent POM for Pub/Sub Lite Kafka Integrations pubsublite-kafka + pubsublite-kafka-auth 1.9.2 diff --git a/pubsublite-kafka-auth/pom.xml b/pubsublite-kafka-auth/pom.xml new file mode 100644 index 0000000..7604b18 --- /dev/null +++ b/pubsublite-kafka-auth/pom.xml @@ -0,0 +1,34 @@ + + + + com.google.cloud + pubsublite-kafka-parent + 1.0.5-SNAPSHOT + + 4.0.0 + com.google.cloud + pubsublite-kafka-auth + 1.0.5-SNAPSHOT + jar + Pub/Sub Lite Kafka Auth + https://github.com/googleapis/java-pubsublite-kafka + Kafka Auth Provider for Google Cloud Pub/Sub Lite + + + com.google.auth + google-auth-library-oauth2-http + + + com.google.cloud + google-cloud-pubsublite + + + com.google.code.gson + gson + + + com.google.guava + guava + + + diff --git a/pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/ClientParameters.java b/pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/ClientParameters.java new file mode 100644 index 0000000..1f3cbf4 --- /dev/null +++ b/pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/ClientParameters.java @@ -0,0 +1,63 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.cloud.pubsublite.CloudRegion; +import com.google.cloud.pubsublite.ProjectId; +import com.google.cloud.pubsublite.ProjectIdOrNumber; +import com.google.cloud.pubsublite.ProjectNumber; +import com.google.cloud.pubsublite.kafka.internal.AuthServer; +import java.util.HashMap; +import java.util.Map; + +/** A class providing the correct parameters for connecting a Kafka client to Pub/Sub Lite. */ +public final class ClientParameters { + public static Map getProducerParams(ProjectId project, CloudRegion region) { + return getProducerParams(ProjectIdOrNumber.of(project), region); + } + + public static Map getProducerParams(ProjectNumber project, CloudRegion region) { + return getProducerParams(ProjectIdOrNumber.of(project), region); + } + + public static Map getProducerParams( + ProjectIdOrNumber project, CloudRegion region) { + HashMap params = new HashMap<>(); + params.put("enable.idempotence", false); + params.put("bootstrap.servers", getEndpoint(region)); + params.put("security.protocol", "SASL_SSL"); + params.put("sasl.mechanism", "OAUTHBEARER"); + params.put("sasl.oauthbearer.token.endpoint.url", "http://localhost:" + AuthServer.PORT); + params.put("sasl.jaas.config", getJaasConfig(project)); + params.put( + "sasl.login.callback.handler.class", + "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"); + return params; + } + + private static String getEndpoint(CloudRegion region) { + return region.value() + "-kafka-pubsub.googleapis.com:443"; + } + + private static String getJaasConfig(ProjectIdOrNumber project) { + return String.format( + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=\"unused\" clientSecret=\"unused\" extension_pubsubProject=\"%s\";", + project); + } + + private ClientParameters() {} +} diff --git a/pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/internal/AuthServer.java b/pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/internal/AuthServer.java new file mode 100644 index 0000000..c13621e --- /dev/null +++ b/pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/internal/AuthServer.java @@ -0,0 +1,118 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka.internal; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Collections.singletonList; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import com.sun.net.httpserver.HttpServer; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.time.Instant; +import java.util.Base64; + +public class AuthServer { + public static int PORT = 14293; + public static InetSocketAddress ADDRESS = + new InetSocketAddress(InetAddress.getLoopbackAddress(), PORT); + + private static final String HEADER = + new Gson().toJson(ImmutableMap.of("typ", "JWT", "alg", "GOOG_TOKEN")); + + static { + spawnDaemon(); + } + + private static String b64Encode(String data) { + return Base64.getUrlEncoder().encodeToString(data.getBytes(UTF_8)); + } + + private static String getJwt(AccessToken token) { + return new Gson() + .toJson( + ImmutableMap.of( + "exp", + token.getExpirationTime().toInstant().getEpochSecond(), + "iat", + Instant.now().getEpochSecond(), + "scope", + "pubsub", + "sub", + "unused")); + } + + private static String getKafkaAccessToken(AccessToken token) { + return String.join( + ".", b64Encode(HEADER), b64Encode(getJwt(token)), b64Encode(token.getTokenValue())); + } + + private static String getResponse(GoogleCredentials creds) throws IOException { + creds.refreshIfExpired(); + AccessToken token = creds.getAccessToken(); + long exipiresInSeconds = + Duration.between(Instant.now(), token.getExpirationTime().toInstant()).getSeconds(); + return new Gson() + .toJson( + ImmutableMap.of( + "access_token", + getKafkaAccessToken(token), + "token_type", + "bearer", + "expires_in", + Long.toString(exipiresInSeconds))); + } + + private static void spawnDaemon() { + // Run spawn() in a daemon thread so the created threads are themselves daemons. + Thread thread = new Thread(AuthServer::spawn); + thread.setDaemon(true); + thread.start(); + } + + private static void spawn() { + try { + GoogleCredentials creds = + GoogleCredentials.getApplicationDefault() + .createScoped("https://www.googleapis.com/auth/cloud-platform"); + HttpServer server = HttpServer.create(ADDRESS, 0); + server.createContext( + "/", + handler -> { + try { + byte[] response = getResponse(creds).getBytes(UTF_8); + handler.getResponseHeaders().put("Content-type", singletonList("text/plain")); + handler.sendResponseHeaders(200, response.length); + handler.getResponseBody().write(response); + handler.close(); + } catch (Exception e) { + e.printStackTrace(System.err); + throw new RuntimeException(e); + } + }); + server.start(); + } catch (Exception e) { + e.printStackTrace(System.err); + throw new RuntimeException(e); + } + } +} diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index b1579e3..d7045c0 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -45,12 +45,28 @@ pubsublite-kafka 1.0.5-SNAPSHOT + + com.google.cloud + pubsublite-kafka-auth + 1.0.5-SNAPSHOT + + + com.fasterxml.jackson.core + jackson-databind + 2.14.1 + org.apache.kafka kafka-clients 3.3.1 + + org.slf4j + slf4j-simple + 1.7.36 + test + junit junit diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index 2635992..c321b31 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -47,6 +47,11 @@ pubsublite-kafka 1.0.3 + + com.google.cloud + pubsublite-kafka-auth + 1.0.5-SNAPSHOT + org.apache.kafka kafka-clients diff --git a/samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java b/samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java new file mode 100644 index 0000000..cba9832 --- /dev/null +++ b/samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java @@ -0,0 +1,86 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsublite; + +// [START pubsublite_kafka_native_producer] +import com.google.cloud.pubsublite.CloudRegion; +import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.ProjectNumber; +import com.google.cloud.pubsublite.TopicName; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.kafka.ClientParameters; +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +public class KafkaProducerExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String cloudRegion = "your-cloud-region"; + char zoneId = 'b'; + // Use an existing Pub/Sub Lite topic. + String topicId = "your-topic-id"; + // Using the project number is required for constructing a Pub/Sub Lite + // topic path that the Kafka producer can use. + long projectNumber = Long.parseLong("123456789"); + + kafkaProducerExample(cloudRegion, zoneId, projectNumber, topicId); + } + + public static void kafkaProducerExample( + String cloudRegion, char zoneId, long projectNumber, String topicId) + throws InterruptedException, ExecutionException { + ProjectNumber project = ProjectNumber.of(projectNumber); + CloudZone location = CloudZone.of(CloudRegion.of(cloudRegion), zoneId); + TopicPath topicPath = + TopicPath.newBuilder() + .setProject(project) + .setLocation(location) + .setName(TopicName.of(topicId)) + .build(); + + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.putAll(ClientParameters.getProducerParams(project, location.region())); + + List> futures = new ArrayList<>(); + try (KafkaProducer producer = + new KafkaProducer<>( + properties.build(), new ByteArraySerializer(), new ByteArraySerializer())) { + for (long i = 0L; i < 10L; i++) { + String key = "demo"; + Future future = + producer.send( + new ProducerRecord<>( + topicPath.toString(), key.getBytes(), ("message-" + i).getBytes())); + futures.add(future); + } + for (Future future : futures) { + RecordMetadata meta = future.get(); + System.out.println(meta.offset()); + } + } + System.out.printf("Published 10 messages to %s%n", topicPath); + } +} +// [END pubsublite_kafka_native_producer] diff --git a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java index 5bf55e5..cbf109b 100644 --- a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java +++ b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java @@ -24,6 +24,8 @@ import com.google.cloud.pubsublite.CloudRegion; import com.google.cloud.pubsublite.CloudZone; import com.google.cloud.pubsublite.ProjectNumber; +import com.google.cloud.pubsublite.ReservationName; +import com.google.cloud.pubsublite.ReservationPath; import com.google.cloud.pubsublite.SubscriptionName; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicName; @@ -33,6 +35,7 @@ import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement; import com.google.cloud.pubsublite.proto.Topic; import com.google.cloud.pubsublite.proto.Topic.PartitionConfig; +import com.google.cloud.pubsublite.proto.Topic.ReservationConfig; import com.google.cloud.pubsublite.proto.Topic.RetentionConfig; import com.google.protobuf.util.Durations; import java.io.ByteArrayOutputStream; @@ -43,18 +46,22 @@ import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; public class QuickStartIT { + private ByteArrayOutputStream bout; private PrintStream out; private PrintStream console; Random rand = new Random(); - private static final Long projectNumber = - Long.parseLong(System.getenv("GOOGLE_CLOUD_PROJECT_NUMBER")); - private String cloudRegion = "us-central1"; + private static final ProjectNumber projectNumber = + ProjectNumber.of(Long.parseLong(System.getenv("GOOGLE_CLOUD_PROJECT_NUMBER"))); + private final CloudRegion cloudRegion = CloudRegion.of("us-central1"); private final char zoneId = (char) (rand.nextInt(3) + 'a'); + + private final CloudZone cloudZone = CloudZone.of(cloudRegion, zoneId); private static final String suffix = UUID.randomUUID().toString(); private static final String topicId = "lite-topic-" + suffix; private static final String subscriptionId = "lite-subscription-" + suffix; @@ -72,33 +79,35 @@ public static void checkRequirements() { @Before public void setUp() throws Exception { + ReservationPath reservationPath = + ReservationPath.newBuilder() + .setProject(projectNumber) + .setLocation(cloudRegion) + .setName(ReservationName.of("java-pubsublite-kafka-reservation")) + .build(); + TopicPath topicPath = TopicPath.newBuilder() - .setProject(ProjectNumber.of(projectNumber)) - .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId)) + .setProject(projectNumber) + .setLocation(cloudZone) .setName(TopicName.of(topicId)) .build(); Topic topic = Topic.newBuilder() - .setPartitionConfig( - PartitionConfig.newBuilder() - .setCapacity( - PartitionConfig.Capacity.newBuilder() - .setPublishMibPerSec(4) - .setSubscribeMibPerSec(4) - .build()) - .setCount(1)) + .setPartitionConfig(PartitionConfig.newBuilder().setCount(1)) .setRetentionConfig( RetentionConfig.newBuilder() .setPeriod(Durations.fromDays(1)) .setPerPartitionBytes(30 * 1024 * 1024 * 1024L)) + .setReservationConfig( + ReservationConfig.newBuilder().setThroughputReservation(reservationPath.toString())) .setName(topicPath.toString()) .build(); SubscriptionPath subscriptionPath = SubscriptionPath.newBuilder() - .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId)) - .setProject(ProjectNumber.of(projectNumber)) + .setLocation(cloudZone) + .setProject(projectNumber) .setName(SubscriptionName.of(subscriptionId)) .build(); @@ -112,7 +121,7 @@ public void setUp() throws Exception { .build(); AdminClientSettings adminClientSettings = - AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build(); + AdminClientSettings.newBuilder().setRegion(cloudRegion).build(); try (AdminClient adminClient = AdminClient.create(adminClientSettings)) { adminClient.createTopic(topic).get(); adminClient.createSubscription(subscription).get(); @@ -131,20 +140,20 @@ public void tearDown() throws Exception { System.setOut(console); TopicPath topicPath = TopicPath.newBuilder() - .setProject(ProjectNumber.of(projectNumber)) - .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId)) + .setProject(projectNumber) + .setLocation(cloudZone) .setName(TopicName.of(topicId)) .build(); SubscriptionPath subscriptionPath = SubscriptionPath.newBuilder() - .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId)) - .setProject(ProjectNumber.of(projectNumber)) + .setLocation(cloudZone) + .setProject(projectNumber) .setName(SubscriptionName.of(subscriptionId)) .build(); AdminClientSettings adminClientSettings = - AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build(); + AdminClientSettings.newBuilder().setRegion(cloudRegion).build(); try (AdminClient adminClient = AdminClient.create(adminClientSettings)) { adminClient.deleteTopic(topicPath).get(); @@ -153,20 +162,40 @@ public void tearDown() throws Exception { } @Test + @Ignore public void testQuickstart() throws ExecutionException, InterruptedException { bout.reset(); // Publish. - ProducerExample.producerExample(cloudRegion, zoneId, projectNumber, topicId); + ProducerExample.producerExample(cloudRegion.value(), zoneId, projectNumber.value(), topicId); + assertThat(bout.toString()) + .contains( + String.format( + "Published 10 messages to projects/%s/locations/%s-%s/topics/%s", + projectNumber.value(), cloudRegion.value(), zoneId, topicId)); + + bout.reset(); + // Subscribe. + ConsumerExample.consumerExample( + cloudRegion.value(), zoneId, projectNumber.value(), topicId, subscriptionId); + assertThat(bout.toString()).contains("Received 10 messages."); + } + + @Test + public void testKafkaPublish() throws ExecutionException, InterruptedException { + bout.reset(); + KafkaProducerExample.kafkaProducerExample( + cloudRegion.value(), zoneId, projectNumber.value(), topicId); assertThat(bout.toString()) .contains( String.format( "Published 10 messages to projects/%s/locations/%s-%s/topics/%s", - projectNumber, cloudRegion, zoneId, topicId)); + projectNumber.value(), cloudRegion.value(), zoneId, topicId)); bout.reset(); // Subscribe. - ConsumerExample.consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId); + ConsumerExample.consumerExample( + cloudRegion.value(), zoneId, projectNumber.value(), topicId, subscriptionId); assertThat(bout.toString()).contains("Received 10 messages."); } }