Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-26203] Basic table factory for Pulsar connector #56

Merged
merged 10 commits into from Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 49 additions & 0 deletions flink-connector-pulsar/pom.xml
Expand Up @@ -51,6 +51,17 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->

<!-- Projects depending on this project won't depend on flink-table-*. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<!-- Formats -->

<dependency>
Expand Down Expand Up @@ -107,13 +118,51 @@ under the License.
<scope>test</scope>
</dependency>

<!-- Table factory testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Pulsar testing environment -->

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<scope>test</scope>
</dependency>

<!-- Pulsar SQL IT test with formats -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -199,6 +200,14 @@ private TypedMessageBuilder<?> createMessageBuilder(
builder.key(key);
}

if (message.isBase64EncodedKey()) {
// HACK - otherwise we should hold both keys and keyBytes fields which
// is more confusing.
((TypedMessageBuilderImpl<?>) builder)
.getMetadataBuilder()
.setPartitionKeyB64Encoded(true);
}

long eventTime = message.getEventTime();
if (eventTime > 0) {
builder.eventTime(eventTime);
Expand Down
Expand Up @@ -39,6 +39,7 @@ public class PulsarMessage<T> {

@Nullable private final byte[] orderingKey;
@Nullable private final String key;
private final boolean isBase64EncodedKey;
private final long eventTime;
@Nullable private final Schema<T> schema;
@Nullable private final T value;
Expand All @@ -57,6 +58,7 @@ public class PulsarMessage<T> {
PulsarMessage(
@Nullable byte[] orderingKey,
@Nullable String key,
boolean isBase64EncodedKey,
long eventTime,
@Nullable Schema<T> schema,
@Nullable T value,
Expand All @@ -66,6 +68,7 @@ public class PulsarMessage<T> {
boolean disableReplication) {
this.orderingKey = orderingKey;
this.key = key;
this.isBase64EncodedKey = isBase64EncodedKey;
this.eventTime = eventTime;
this.schema = schema;
this.value = value;
Expand Down Expand Up @@ -117,6 +120,10 @@ public String getKey() {
return key;
}

public boolean isBase64EncodedKey() {
return isBase64EncodedKey;
}

public long getEventTime() {
return eventTime;
}
Expand Down
Expand Up @@ -26,6 +26,7 @@

import javax.annotation.Nullable;

import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -39,6 +40,7 @@ public class PulsarMessageBuilder<T> {

private byte[] orderingKey;
private String key;
private boolean isBase64EncodedKey;
private long eventTime;
@Nullable private final Schema<T> schema;
@Nullable private final T value;
Expand Down Expand Up @@ -68,6 +70,17 @@ public PulsarMessageBuilder<T> orderingKey(byte[] orderingKey) {
*/
public PulsarMessageBuilder<T> key(String key) {
this.key = checkNotNull(key);
this.isBase64EncodedKey = false;
return this;
}

/**
* Method wrapper of {@link TypedMessageBuilder#keyBytes(byte[])}. This key would also be used
* in {@link KeyHashTopicRouter}.
*/
public PulsarMessageBuilder<T> keyBytes(byte[] keyBytes) {
this.key = Base64.getEncoder().encodeToString(keyBytes);
this.isBase64EncodedKey = true;
return this;
}

Expand Down Expand Up @@ -115,6 +128,7 @@ public PulsarMessage<T> build() {
return new PulsarMessage<>(
orderingKey,
key,
isBase64EncodedKey,
eventTime,
schema,
value,
Expand Down
Expand Up @@ -24,6 +24,8 @@
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;

import java.util.Objects;

/** This cursor would leave pulsar start consuming from a specific message id. */
public class MessageIdStartCursor implements StartCursor {
private static final long serialVersionUID = -8057345435887170111L;
Expand All @@ -50,4 +52,21 @@ public MessageIdStartCursor(MessageId messageId, boolean inclusive) {
public CursorPosition position(String topic, int partitionId) {
return new CursorPosition(messageId, inclusive);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MessageIdStartCursor that = (MessageIdStartCursor) o;
return Objects.equals(messageId, that.messageId);
}

@Override
public int hashCode() {
return Objects.hash(messageId);
}
}
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;

import java.util.Objects;

/** This cursor would left pulsar start consuming from a specific publish timestamp. */
public class TimestampStartCursor implements StartCursor {
private static final long serialVersionUID = 5170578885838095320L;
Expand All @@ -35,4 +37,21 @@ public TimestampStartCursor(long timestamp, boolean inclusive) {
public CursorPosition position(String topic, int partitionId) {
return new CursorPosition(timestamp);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TimestampStartCursor that = (TimestampStartCursor) o;
return timestamp == that.timestamp;
}

@Override
public int hashCode() {
return Objects.hash(timestamp);
}
}
Expand Up @@ -22,6 +22,8 @@

import org.apache.pulsar.client.api.Message;

import java.util.Objects;

/** Stop consuming message at the given event time. */
public class EventTimestampStopCursor implements StopCursor {
private static final long serialVersionUID = 2391576769339369027L;
Expand All @@ -39,4 +41,21 @@ public StopCondition shouldStop(Message<?> message) {
long eventTime = message.getEventTime();
return StopCondition.compare(timestamp, eventTime, inclusive);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EventTimestampStopCursor that = (EventTimestampStopCursor) o;
return timestamp == that.timestamp && inclusive == that.inclusive;
}

@Override
public int hashCode() {
return Objects.hash(timestamp, inclusive);
}
}
Expand Up @@ -26,6 +26,8 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;

import java.util.Objects;

/**
* A stop cursor that initialize the position to the latest message id. The offsets initialization
* are taken care of by the {@code PulsarPartitionSplitReaderBase} instead of by the {@code
Expand Down Expand Up @@ -54,4 +56,21 @@ public void open(PulsarAdmin admin, TopicPartition partition) throws PulsarAdmin
this.messageId = admin.topics().getLastMessageId(topic);
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LatestMessageStopCursor that = (LatestMessageStopCursor) o;
return inclusive == that.inclusive && Objects.equals(messageId, that.messageId);
}

@Override
public int hashCode() {
return Objects.hash(messageId, inclusive);
}
}
Expand Up @@ -23,6 +23,8 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;

import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.pulsar.client.api.MessageId.earliest;
import static org.apache.pulsar.client.api.MessageId.latest;
Expand Down Expand Up @@ -51,4 +53,21 @@ public StopCondition shouldStop(Message<?> message) {
MessageId current = message.getMessageId();
return StopCondition.compare(messageId, current, inclusive);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MessageIdStopCursor that = (MessageIdStopCursor) o;
return inclusive == that.inclusive && Objects.equals(messageId, that.messageId);
}

@Override
public int hashCode() {
return Objects.hash(messageId, inclusive);
}
}
Expand Up @@ -22,12 +22,26 @@

import org.apache.pulsar.client.api.Message;

/** A implementation which wouldn't stop forever. */
/** An implementation which wouldn't stop forever. */
public class NeverStopCursor implements StopCursor {
private static final long serialVersionUID = -3113601090292771786L;

@Override
public StopCondition shouldStop(Message<?> message) {
return StopCondition.CONTINUE;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

return o != null && getClass() == o.getClass();
}

@Override
public int hashCode() {
return 31;
}
}
Expand Up @@ -22,6 +22,8 @@

import org.apache.pulsar.client.api.Message;

import java.util.Objects;

/** Stop consuming message at the given publish time. */
public class PublishTimestampStopCursor implements StopCursor {
private static final long serialVersionUID = 4386276745339324527L;
Expand All @@ -39,4 +41,21 @@ public StopCondition shouldStop(Message<?> message) {
long publishTime = message.getPublishTime();
return StopCondition.compare(timestamp, publishTime, inclusive);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PublishTimestampStopCursor that = (PublishTimestampStopCursor) o;
return timestamp == that.timestamp && inclusive == that.inclusive;
}

@Override
public int hashCode() {
return Objects.hash(timestamp, inclusive);
}
}