diff --git a/README.md b/README.md
index 45637239..56149d01 100644
--- a/README.md
+++ b/README.md
@@ -19,20 +19,20 @@ If you are using Maven, add this to your pom.xml file:
com.google.cloud
pubsublite-kafka
- 1.0.3
+ 1.0.4
```
If you are using Gradle without BOM, add this to your dependencies:
```Groovy
-implementation 'com.google.cloud:pubsublite-kafka:1.0.3'
+implementation 'com.google.cloud:pubsublite-kafka:1.0.4'
```
If you are using SBT, add this to your dependencies:
```Scala
-libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.0.3"
+libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.0.4"
```
## Authentication
@@ -160,6 +160,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsublite-ka
| Sample | Source Code | Try it |
| --------------------------- | --------------------------------- | ------ |
| Consumer Example | [source code](https://github.com/googleapis/java-pubsublite-kafka/blob/main/samples/snippets/src/main/java/pubsublite/ConsumerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-kafka&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/ConsumerExample.java) |
+| Kafka Producer Example | [source code](https://github.com/googleapis/java-pubsublite-kafka/blob/main/samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-kafka&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java) |
| Producer Example | [source code](https://github.com/googleapis/java-pubsublite-kafka/blob/main/samples/snippets/src/main/java/pubsublite/ProducerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-kafka&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/ProducerExample.java) |
diff --git a/kafka_gcp_credentials.py b/kafka_gcp_credentials.py
new file mode 100644
index 00000000..3a00c3b3
--- /dev/null
+++ b/kafka_gcp_credentials.py
@@ -0,0 +1,70 @@
+import base64
+import datetime
+import google.auth
+import google.auth.transport.urllib3
+import http.server
+import json
+import urllib3
+
+_credentials, _project = google.auth.default()
+_http_client = urllib3.PoolManager()
+
+
+def valid_credentials():
+ if not _credentials.valid:
+ _credentials.refresh(
+ google.auth.transport.urllib3.Request(_http_client))
+ return _credentials
+
+
+_HEADER = json.dumps(dict(typ='JWT', alg='GOOG_TOKEN'))
+
+
+def get_jwt(creds):
+ return json.dumps(dict(exp=creds.expiry.timestamp(),
+ iat=datetime.datetime.utcnow().timestamp(),
+ scope='pubsub',
+ sub='unused'))
+
+
+def b64_encode(source):
+ return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8')
+
+
+def get_kafka_access_token(creds):
+ return '.'.join([b64_encode(_HEADER), b64_encode(get_jwt(creds)),
+ b64_encode(creds.token)])
+
+
+def build_message():
+ creds = valid_credentials()
+ expiry_seconds = (creds.expiry - datetime.datetime.utcnow()).total_seconds()
+ return json.dumps(
+ dict(access_token=get_kafka_access_token(creds), token_type='bearer',
+ expires_in=expiry_seconds))
+
+
+class AuthHandler(http.server.BaseHTTPRequestHandler):
+ def _handle(self):
+ self.send_response(200)
+ self.send_header('Content-type', 'text/plain')
+ self.end_headers()
+ self.wfile.write(build_message().encode('utf-8'))
+
+ def do_GET(self):
+ self._handle()
+
+ def do_POST(self):
+ self._handle()
+
+
+def run_server():
+ server_address = ('localhost', 14293)
+ server = http.server.ThreadingHTTPServer(server_address, AuthHandler)
+ print("Serving on localhost:14293. This is not accessible outside of the "
+ "current machine.")
+ server.serve_forever()
+
+
+if __name__ == '__main__':
+ run_server()
diff --git a/pom.xml b/pom.xml
index aa8fdda9..196c3ccd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,6 +15,7 @@
Parent POM for Pub/Sub Lite Kafka Integrations
pubsublite-kafka
+ pubsublite-kafka-auth
1.9.2
diff --git a/pubsublite-kafka-auth/pom.xml b/pubsublite-kafka-auth/pom.xml
new file mode 100644
index 00000000..7604b184
--- /dev/null
+++ b/pubsublite-kafka-auth/pom.xml
@@ -0,0 +1,34 @@
+
+
+
+ com.google.cloud
+ pubsublite-kafka-parent
+ 1.0.5-SNAPSHOT
+
+ 4.0.0
+ com.google.cloud
+ pubsublite-kafka-auth
+ 1.0.5-SNAPSHOT
+ jar
+ Pub/Sub Lite Kafka Auth
+ https://github.com/googleapis/java-pubsublite-kafka
+ Kafka Auth Provider for Google Cloud Pub/Sub Lite
+
+
+ com.google.auth
+ google-auth-library-oauth2-http
+
+
+ com.google.cloud
+ google-cloud-pubsublite
+
+
+ com.google.code.gson
+ gson
+
+
+ com.google.guava
+ guava
+
+
+
diff --git a/pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/ClientParameters.java b/pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/ClientParameters.java
new file mode 100644
index 00000000..1f3cbf44
--- /dev/null
+++ b/pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/ClientParameters.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.kafka;
+
+import com.google.cloud.pubsublite.CloudRegion;
+import com.google.cloud.pubsublite.ProjectId;
+import com.google.cloud.pubsublite.ProjectIdOrNumber;
+import com.google.cloud.pubsublite.ProjectNumber;
+import com.google.cloud.pubsublite.kafka.internal.AuthServer;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A class providing the correct parameters for connecting a Kafka client to Pub/Sub Lite. */
+public final class ClientParameters {
+ public static Map getProducerParams(ProjectId project, CloudRegion region) {
+ return getProducerParams(ProjectIdOrNumber.of(project), region);
+ }
+
+ public static Map getProducerParams(ProjectNumber project, CloudRegion region) {
+ return getProducerParams(ProjectIdOrNumber.of(project), region);
+ }
+
+ public static Map getProducerParams(
+ ProjectIdOrNumber project, CloudRegion region) {
+ HashMap params = new HashMap<>();
+ params.put("enable.idempotence", false);
+ params.put("bootstrap.servers", getEndpoint(region));
+ params.put("security.protocol", "SASL_SSL");
+ params.put("sasl.mechanism", "OAUTHBEARER");
+ params.put("sasl.oauthbearer.token.endpoint.url", "http://localhost:" + AuthServer.PORT);
+ params.put("sasl.jaas.config", getJaasConfig(project));
+ params.put(
+ "sasl.login.callback.handler.class",
+ "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler");
+ return params;
+ }
+
+ private static String getEndpoint(CloudRegion region) {
+ return region.value() + "-kafka-pubsub.googleapis.com:443";
+ }
+
+ private static String getJaasConfig(ProjectIdOrNumber project) {
+ return String.format(
+ "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=\"unused\" clientSecret=\"unused\" extension_pubsubProject=\"%s\";",
+ project);
+ }
+
+ private ClientParameters() {}
+}
diff --git a/pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/internal/AuthServer.java b/pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/internal/AuthServer.java
new file mode 100644
index 00000000..c13621e9
--- /dev/null
+++ b/pubsublite-kafka-auth/src/main/java/com/google/cloud/pubsublite/kafka/internal/AuthServer.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.kafka.internal;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.singletonList;
+
+import com.google.auth.oauth2.AccessToken;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Base64;
+
+public class AuthServer {
+ public static int PORT = 14293;
+ public static InetSocketAddress ADDRESS =
+ new InetSocketAddress(InetAddress.getLoopbackAddress(), PORT);
+
+ private static final String HEADER =
+ new Gson().toJson(ImmutableMap.of("typ", "JWT", "alg", "GOOG_TOKEN"));
+
+ static {
+ spawnDaemon();
+ }
+
+ private static String b64Encode(String data) {
+ return Base64.getUrlEncoder().encodeToString(data.getBytes(UTF_8));
+ }
+
+ private static String getJwt(AccessToken token) {
+ return new Gson()
+ .toJson(
+ ImmutableMap.of(
+ "exp",
+ token.getExpirationTime().toInstant().getEpochSecond(),
+ "iat",
+ Instant.now().getEpochSecond(),
+ "scope",
+ "pubsub",
+ "sub",
+ "unused"));
+ }
+
+ private static String getKafkaAccessToken(AccessToken token) {
+ return String.join(
+ ".", b64Encode(HEADER), b64Encode(getJwt(token)), b64Encode(token.getTokenValue()));
+ }
+
+ private static String getResponse(GoogleCredentials creds) throws IOException {
+ creds.refreshIfExpired();
+ AccessToken token = creds.getAccessToken();
+ long exipiresInSeconds =
+ Duration.between(Instant.now(), token.getExpirationTime().toInstant()).getSeconds();
+ return new Gson()
+ .toJson(
+ ImmutableMap.of(
+ "access_token",
+ getKafkaAccessToken(token),
+ "token_type",
+ "bearer",
+ "expires_in",
+ Long.toString(exipiresInSeconds)));
+ }
+
+ private static void spawnDaemon() {
+ // Run spawn() in a daemon thread so the created threads are themselves daemons.
+ Thread thread = new Thread(AuthServer::spawn);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ private static void spawn() {
+ try {
+ GoogleCredentials creds =
+ GoogleCredentials.getApplicationDefault()
+ .createScoped("https://www.googleapis.com/auth/cloud-platform");
+ HttpServer server = HttpServer.create(ADDRESS, 0);
+ server.createContext(
+ "/",
+ handler -> {
+ try {
+ byte[] response = getResponse(creds).getBytes(UTF_8);
+ handler.getResponseHeaders().put("Content-type", singletonList("text/plain"));
+ handler.sendResponseHeaders(200, response.length);
+ handler.getResponseBody().write(response);
+ handler.close();
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ throw new RuntimeException(e);
+ }
+ });
+ server.start();
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index b1579e32..d7045c0e 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -45,12 +45,28 @@
pubsublite-kafka
1.0.5-SNAPSHOT
+
+ com.google.cloud
+ pubsublite-kafka-auth
+ 1.0.5-SNAPSHOT
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.14.1
+
org.apache.kafka
kafka-clients
3.3.1
+
+ org.slf4j
+ slf4j-simple
+ 1.7.36
+ test
+
junit
junit
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index 2635992f..c321b313 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -47,6 +47,11 @@
pubsublite-kafka
1.0.3
+
+ com.google.cloud
+ pubsublite-kafka-auth
+ 1.0.5-SNAPSHOT
+
org.apache.kafka
kafka-clients
diff --git a/samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java b/samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java
new file mode 100644
index 00000000..cba9832f
--- /dev/null
+++ b/samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pubsublite;
+
+// [START pubsublite_kafka_native_producer]
+import com.google.cloud.pubsublite.CloudRegion;
+import com.google.cloud.pubsublite.CloudZone;
+import com.google.cloud.pubsublite.ProjectNumber;
+import com.google.cloud.pubsublite.TopicName;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.kafka.ClientParameters;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+public class KafkaProducerExample {
+
+ public static void main(String... args) throws Exception {
+ // TODO(developer): Replace these variables before running the sample.
+ String cloudRegion = "your-cloud-region";
+ char zoneId = 'b';
+ // Use an existing Pub/Sub Lite topic.
+ String topicId = "your-topic-id";
+ // Using the project number is required for constructing a Pub/Sub Lite
+ // topic path that the Kafka producer can use.
+ long projectNumber = Long.parseLong("123456789");
+
+ kafkaProducerExample(cloudRegion, zoneId, projectNumber, topicId);
+ }
+
+ public static void kafkaProducerExample(
+ String cloudRegion, char zoneId, long projectNumber, String topicId)
+ throws InterruptedException, ExecutionException {
+ ProjectNumber project = ProjectNumber.of(projectNumber);
+ CloudZone location = CloudZone.of(CloudRegion.of(cloudRegion), zoneId);
+ TopicPath topicPath =
+ TopicPath.newBuilder()
+ .setProject(project)
+ .setLocation(location)
+ .setName(TopicName.of(topicId))
+ .build();
+
+ ImmutableMap.Builder properties = ImmutableMap.builder();
+ properties.putAll(ClientParameters.getProducerParams(project, location.region()));
+
+ List> futures = new ArrayList<>();
+ try (KafkaProducer producer =
+ new KafkaProducer<>(
+ properties.build(), new ByteArraySerializer(), new ByteArraySerializer())) {
+ for (long i = 0L; i < 10L; i++) {
+ String key = "demo";
+ Future future =
+ producer.send(
+ new ProducerRecord<>(
+ topicPath.toString(), key.getBytes(), ("message-" + i).getBytes()));
+ futures.add(future);
+ }
+ for (Future future : futures) {
+ RecordMetadata meta = future.get();
+ System.out.println(meta.offset());
+ }
+ }
+ System.out.printf("Published 10 messages to %s%n", topicPath);
+ }
+}
+// [END pubsublite_kafka_native_producer]
diff --git a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java
index 5bf55e5b..cbf109b7 100644
--- a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java
+++ b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java
@@ -24,6 +24,8 @@
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
+import com.google.cloud.pubsublite.ReservationName;
+import com.google.cloud.pubsublite.ReservationPath;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
@@ -33,6 +35,7 @@
import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.cloud.pubsublite.proto.Topic.PartitionConfig;
+import com.google.cloud.pubsublite.proto.Topic.ReservationConfig;
import com.google.cloud.pubsublite.proto.Topic.RetentionConfig;
import com.google.protobuf.util.Durations;
import java.io.ByteArrayOutputStream;
@@ -43,18 +46,22 @@
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
public class QuickStartIT {
+
private ByteArrayOutputStream bout;
private PrintStream out;
private PrintStream console;
Random rand = new Random();
- private static final Long projectNumber =
- Long.parseLong(System.getenv("GOOGLE_CLOUD_PROJECT_NUMBER"));
- private String cloudRegion = "us-central1";
+ private static final ProjectNumber projectNumber =
+ ProjectNumber.of(Long.parseLong(System.getenv("GOOGLE_CLOUD_PROJECT_NUMBER")));
+ private final CloudRegion cloudRegion = CloudRegion.of("us-central1");
private final char zoneId = (char) (rand.nextInt(3) + 'a');
+
+ private final CloudZone cloudZone = CloudZone.of(cloudRegion, zoneId);
private static final String suffix = UUID.randomUUID().toString();
private static final String topicId = "lite-topic-" + suffix;
private static final String subscriptionId = "lite-subscription-" + suffix;
@@ -72,33 +79,35 @@ public static void checkRequirements() {
@Before
public void setUp() throws Exception {
+ ReservationPath reservationPath =
+ ReservationPath.newBuilder()
+ .setProject(projectNumber)
+ .setLocation(cloudRegion)
+ .setName(ReservationName.of("java-pubsublite-kafka-reservation"))
+ .build();
+
TopicPath topicPath =
TopicPath.newBuilder()
- .setProject(ProjectNumber.of(projectNumber))
- .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
+ .setProject(projectNumber)
+ .setLocation(cloudZone)
.setName(TopicName.of(topicId))
.build();
Topic topic =
Topic.newBuilder()
- .setPartitionConfig(
- PartitionConfig.newBuilder()
- .setCapacity(
- PartitionConfig.Capacity.newBuilder()
- .setPublishMibPerSec(4)
- .setSubscribeMibPerSec(4)
- .build())
- .setCount(1))
+ .setPartitionConfig(PartitionConfig.newBuilder().setCount(1))
.setRetentionConfig(
RetentionConfig.newBuilder()
.setPeriod(Durations.fromDays(1))
.setPerPartitionBytes(30 * 1024 * 1024 * 1024L))
+ .setReservationConfig(
+ ReservationConfig.newBuilder().setThroughputReservation(reservationPath.toString()))
.setName(topicPath.toString())
.build();
SubscriptionPath subscriptionPath =
SubscriptionPath.newBuilder()
- .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
- .setProject(ProjectNumber.of(projectNumber))
+ .setLocation(cloudZone)
+ .setProject(projectNumber)
.setName(SubscriptionName.of(subscriptionId))
.build();
@@ -112,7 +121,7 @@ public void setUp() throws Exception {
.build();
AdminClientSettings adminClientSettings =
- AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();
+ AdminClientSettings.newBuilder().setRegion(cloudRegion).build();
try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
adminClient.createTopic(topic).get();
adminClient.createSubscription(subscription).get();
@@ -131,20 +140,20 @@ public void tearDown() throws Exception {
System.setOut(console);
TopicPath topicPath =
TopicPath.newBuilder()
- .setProject(ProjectNumber.of(projectNumber))
- .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
+ .setProject(projectNumber)
+ .setLocation(cloudZone)
.setName(TopicName.of(topicId))
.build();
SubscriptionPath subscriptionPath =
SubscriptionPath.newBuilder()
- .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
- .setProject(ProjectNumber.of(projectNumber))
+ .setLocation(cloudZone)
+ .setProject(projectNumber)
.setName(SubscriptionName.of(subscriptionId))
.build();
AdminClientSettings adminClientSettings =
- AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();
+ AdminClientSettings.newBuilder().setRegion(cloudRegion).build();
try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
adminClient.deleteTopic(topicPath).get();
@@ -153,20 +162,40 @@ public void tearDown() throws Exception {
}
@Test
+ @Ignore
public void testQuickstart() throws ExecutionException, InterruptedException {
bout.reset();
// Publish.
- ProducerExample.producerExample(cloudRegion, zoneId, projectNumber, topicId);
+ ProducerExample.producerExample(cloudRegion.value(), zoneId, projectNumber.value(), topicId);
+ assertThat(bout.toString())
+ .contains(
+ String.format(
+ "Published 10 messages to projects/%s/locations/%s-%s/topics/%s",
+ projectNumber.value(), cloudRegion.value(), zoneId, topicId));
+
+ bout.reset();
+ // Subscribe.
+ ConsumerExample.consumerExample(
+ cloudRegion.value(), zoneId, projectNumber.value(), topicId, subscriptionId);
+ assertThat(bout.toString()).contains("Received 10 messages.");
+ }
+
+ @Test
+ public void testKafkaPublish() throws ExecutionException, InterruptedException {
+ bout.reset();
+ KafkaProducerExample.kafkaProducerExample(
+ cloudRegion.value(), zoneId, projectNumber.value(), topicId);
assertThat(bout.toString())
.contains(
String.format(
"Published 10 messages to projects/%s/locations/%s-%s/topics/%s",
- projectNumber, cloudRegion, zoneId, topicId));
+ projectNumber.value(), cloudRegion.value(), zoneId, topicId));
bout.reset();
// Subscribe.
- ConsumerExample.consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);
+ ConsumerExample.consumerExample(
+ cloudRegion.value(), zoneId, projectNumber.value(), topicId, subscriptionId);
assertThat(bout.toString()).contains("Received 10 messages.");
}
}