Skip to content

Commit

Permalink
feat: add pubsublite.ordering.mode to kafka connector (#228)
Browse files Browse the repository at this point in the history
* feat: add pubsublite.ordering.mode to kafka connector

This is useful for migration cases using the kafka wire protocol.

* feat: add pubsublite.ordering.mode to kafka connector

This is useful for migration cases using the kafka wire protocol.

Also clean up dependency management.

* feat: add pubsublite.ordering.mode to kafka connector

This is useful for migration cases using the kafka wire protocol.

Also clean up dependency management.

* feat: add pubsublite.ordering.mode to kafka connector

This is useful for migration cases using the kafka wire protocol.

Also clean up dependency management.

* feat: add pubsublite.ordering.mode to kafka connector

This is useful for migration cases using the kafka wire protocol.

Also clean up dependency management.

* feat: add pubsublite.ordering.mode to kafka connector

This is useful for migration cases using the kafka wire protocol.

Also clean up dependency management.
  • Loading branch information
dpcollins-google committed Mar 2, 2023
1 parent 423141f commit c499c39
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 190 deletions.
106 changes: 52 additions & 54 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,73 @@
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-shared-config</artifactId>
<version>1.5.4</version>
<version>1.5.5</version>
</parent>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

<kafka.version>3.3.1</kafka.version>
<pubsub.version>1.120.25</pubsub.version>
<pubsublite.version>1.8.0</pubsublite.version>
<cloud-compute.version>1.16.0</cloud-compute.version>
<protobuf-java.vesion>3.21.9</protobuf-java.vesion>
<gax.version>2.19.4</gax.version>
<slf4j.version>2.0.3</slf4j.version>
<kafka.version>3.4.0</kafka.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-shared-dependencies</artifactId>
<version>3.3.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.8.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--TODO(dpcollins-google): remove this !-->
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-pubsublite-v1</artifactId>
<version>1.11.1</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
<!--TODO(dpcollins-google): remove explicit version !-->
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<!--TODO(dpcollins-google): remove explicit version !-->
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-kafka</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>${pubsub.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf-java.vesion}</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-kafka</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
<version>${pubsublite.version}</version>
</dependency>
<dependency>
<groupId>com.google.flogger</groupId>
<artifactId>google-extensions</artifactId>
Expand All @@ -71,62 +90,51 @@
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>${gax.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax-grpc</artifactId>
<version>${gax.version}</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsub-v1</artifactId>
<version>1.102.25</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>api-common</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>${pubsublite.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>org.threeten</groupId>
<artifactId>threetenbp</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<version>2.16</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf-java.vesion}</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
</dependency>

<!-- Provided dependencies -->
Expand All @@ -141,25 +149,24 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.9.0</version>
<scope>test</scope>
<version>4.11.0</version>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.1.3</version>
<scope>test</scope>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-core</artifactId>
<version>2.8.28</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -177,13 +184,11 @@
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-compute-v1</artifactId>
<version>${cloud-compute.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-compute</artifactId>
<version>${cloud-compute.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -195,13 +200,6 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>2.15.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ private ConfigDefs() {}
static final String PROJECT_FLAG = "pubsublite.project";
static final String LOCATION_FLAG = "pubsublite.location";
static final String TOPIC_NAME_FLAG = "pubsublite.topic";
static final String ORDERING_MODE_FLAG = "pubsublite.ordering.mode";

static ConfigDef config() {
return new ConfigDef()
Expand All @@ -42,6 +43,12 @@ static ConfigDef config() {
TOPIC_NAME_FLAG,
ConfigDef.Type.STRING,
Importance.HIGH,
"The name of the topic to which to publish.");
"The name of the topic to which to publish.")
.define(
ORDERING_MODE_FLAG,
ConfigDef.Type.STRING,
OrderingMode.DEFAULT.name(),
Importance.HIGH,
"The ordering mode to use for publishing to Pub/Sub Lite. If set to `KAFKA`, messages will be republished to the same partition index they were read from on the source topic. Note that this means the Pub/Sub Lite topic *must* have the same number of partitions as the source Kafka topic.");
}
}
2 changes: 0 additions & 2 deletions src/main/java/com/google/pubsublite/kafka/sink/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,4 @@ private Constants() {}
public static final String KAFKA_OFFSET_HEADER = "x-goog-pubsublite-source-kafka-offset";
public static final String KAFKA_EVENT_TIME_TYPE_HEADER =
"x-goog-pubsublite-source-kafka-event-time-type";
public static final String PUBSUBLITE_KAFKA_SINK_CONNECTOR_NAME =
"JAVA_PUBSUBLITE_KAFKA_SINK_CONNECTOR";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.pubsublite.kafka.sink;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.RoutingPolicy;
import com.google.cloud.pubsublite.proto.PubSubMessage;

/** A routing policy that extracts the original kafka partition and routes to that partition. */
class KafkaPartitionRoutingPolicy implements RoutingPolicy {
private final long numPartitions;

KafkaPartitionRoutingPolicy(long numPartitions) {
this.numPartitions = numPartitions;
}

@Override
public Partition route(PubSubMessage message) throws CheckedApiException {
Partition partition = getPartition(message);
if (partition.value() >= numPartitions) {
throw new CheckedApiException(
"Kafka topic has more partitions than Pub/Sub Lite topic. OrderingMode.KAFKA cannot be used.",
Code.FAILED_PRECONDITION);
}
return partition;
}

private Partition getPartition(PubSubMessage message) throws CheckedApiException {
try {
return Partition.of(
Long.parseLong(
message
.getAttributesOrThrow(Constants.KAFKA_PARTITION_HEADER)
.getValues(0)
.toStringUtf8()));
} catch (Throwable t) {
throw toCanonical(t);
}
}
}
23 changes: 23 additions & 0 deletions src/main/java/com/google/pubsublite/kafka/sink/OrderingMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.pubsublite.kafka.sink;

public enum OrderingMode {
/* Order based on the standard Pub/Sub Lite logic. */
DEFAULT,
/* Send messages to the same partition index they were from in Kafka. */
KAFKA
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import static com.google.pubsublite.kafka.sink.Schemas.encodeToBytes;

import com.google.api.core.ApiService.State;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableListMultimap;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -75,7 +76,7 @@ public void put(Collection<SinkRecord> collection) {
}
}
for (SinkRecord record : collection) {
Message.Builder message = Message.builder();
PubSubMessage.Builder message = PubSubMessage.newBuilder();
if (record.key() != null) {
message.setKey(encodeToBytes(record.keySchema(), record.key()));
}
Expand All @@ -89,6 +90,7 @@ public void put(Collection<SinkRecord> collection) {
header ->
attributes.put(
header.key(), Schemas.encodeToBytes(header.schema(), header.value())));

if (record.topic() != null) {
attributes.put(Constants.KAFKA_TOPIC_HEADER, ByteString.copyFromUtf8(record.topic()));
}
Expand All @@ -106,7 +108,13 @@ public void put(Collection<SinkRecord> collection) {
ByteString.copyFromUtf8(record.timestampType().name));
message.setEventTime(Timestamps.fromMillis(record.timestamp()));
}
message.setAttributes(attributes.build());
attributes
.build()
.asMap()
.forEach(
(key, values) ->
message.putAttributes(
key, AttributeValues.newBuilder().addAllValues(values).build()));
publisher.publish(message.build());
}
}
Expand Down

0 comments on commit c499c39

Please sign in to comment.