Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #63] Support rich initialization modes of RocketMQSource & Fix some bugs #72

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The RocketMQSourceFunction is based on RocketMQ pull consumer mode, and provides
Otherwise, the source doesn't provide any reliability guarantees.

### KeyValueDeserializationSchema

The main API for deserializing topic and tags is the `org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema` interface.
`rocketmq-flink` includes general purpose `KeyValueDeserializationSchema` implementations called `SimpleKeyValueDeserializationSchema`.

Expand All @@ -23,7 +24,16 @@ public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>
}
```

## RocketMQSource

RocketMQSource implement flink's new source interface,which provide capability of flow-batch integration.Now you can construct an instance by `RocketMQSourceBuilder.build()`.

### RocketMQDeserializationSchema

The mian API for deserializing topic and tags is the `org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema`interface.`rocketmq-flink` includes general purpose `RocketMQDeserializationSchema` implementations called `RocketMQRowDeserializationSchema` and `SimpleStringSchema`.If you only focus on the value of message,you can use the wrapper class of `RocketMQValueOnlyDeserializationSchemaWrapper` to expand.

## RocketMQSink

To use the `RocketMQSink`, you construct an instance of it by specifying KeyValueSerializationSchema & TopicSelector instances and a Properties instance which including rocketmq configs.
`RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props)`
The RocketMQSink provides at-least-once reliability guarantees when checkpoints are enabled and `withBatchFlushOnCheckpoint(true)` is set.
Expand Down Expand Up @@ -57,6 +67,9 @@ public interface TopicSelector<T> extends Serializable {
```

## Examples

You can find more examples in directory of `org.apache.rocketmq.flink.legacy.example`

The following is an example which receive messages from RocketMQ brokers and send messages to broker after processing.

```java
Expand Down Expand Up @@ -119,7 +132,36 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
}
```

The following is an example which use new source function to fetch the records and deserialize to a simple string.

```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000L);

RocketMQSource<String> source =
RocketMQSource.<String>builder()
.setNameServerAddress(nameServerAddress)
.setTopic(topic)
.setConsumerGroup(consumerGroup)
.setStartFromEarliest()
.setDeserializer(
new RocketMQValueOnlyDeserializationSchemaWrapper<>(
new SimpleStringSchema()))
.build();

DataStreamSource<String> newSource =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "new source")
.setParallelism(4);

newSource.print().setParallelism(1);

env.execute();
```



## Configurations

The following configurations are all from the class `org.apache.rocketmq.flink.legacy.RocketMQConfig`.

### Producer Configurations
Expand Down
49 changes: 36 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,24 @@
</properties>

<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
Expand Down Expand Up @@ -114,11 +132,27 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-namesrv</artifactId>
<version>${rocketmq.version}</version>
<exclusions>
<exclusion>
<artifactId>logback-core</artifactId>
<groupId>ch.qos.logback</groupId>
</exclusion>
<exclusion>
<artifactId>logback-classic</artifactId>
<groupId>ch.qos.logback</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-broker</artifactId>
<version>${rocketmq.version}</version>
<exclusions>
<exclusion>
<artifactId>logback-classic</artifactId>
<groupId>ch.qos.logback</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
Expand All @@ -142,7 +176,7 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<version>4.12</version>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
Expand Down Expand Up @@ -179,6 +213,7 @@
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>log4j.properties</exclude>
</excludes>
</filter>
</filters>
Expand All @@ -194,18 +229,6 @@
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<compilerVersion>${maven.compiler.source}</compilerVersion>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
68 changes: 44 additions & 24 deletions src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

package org.apache.rocketmq.flink.common;

import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy;
import org.apache.rocketmq.flink.legacy.common.config.StartupMode;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_START_MESSAGE_OFFSET;

/** Includes config options of RocketMQ connector type. */
public class RocketMQOptions {

Expand All @@ -44,25 +45,11 @@ public class RocketMQOptions {
public static final ConfigOption<String> OPTIONAL_SQL =
ConfigOptions.key("sql").stringType().noDefaultValue();

public static final ConfigOption<Long> OPTIONAL_START_MESSAGE_OFFSET =
ConfigOptions.key("startMessageOffset")
.longType()
.defaultValue(DEFAULT_START_MESSAGE_OFFSET);

public static final ConfigOption<Long> OPTIONAL_START_TIME_MILLS =
ConfigOptions.key("startTimeMs").longType().defaultValue(-1L);

public static final ConfigOption<String> OPTIONAL_START_TIME =
ConfigOptions.key("startTime").stringType().noDefaultValue();

public static final ConfigOption<String> OPTIONAL_END_TIME =
ConfigOptions.key("endTime").stringType().noDefaultValue();

public static final ConfigOption<String> OPTIONAL_TIME_ZONE =
ConfigOptions.key("timeZone").stringType().noDefaultValue();
public static final ConfigOption<Long> OPTIONAL_END_TIME_STAMP =
ConfigOptions.key("endTimestamp").longType().defaultValue(Long.MAX_VALUE);

public static final ConfigOption<Long> OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS =
ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(30000L);
ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(-1L);

public static final ConfigOption<Boolean> OPTIONAL_USE_NEW_API =
ConfigOptions.key("useNewApi").booleanType().defaultValue(true);
Expand Down Expand Up @@ -109,9 +96,42 @@ public class RocketMQOptions {
public static final ConfigOption<String> OPTIONAL_SECRET_KEY =
ConfigOptions.key("secretKey").stringType().noDefaultValue();

public static final ConfigOption<String> OPTIONAL_SCAN_STARTUP_MODE =
ConfigOptions.key("scanStartupMode").stringType().defaultValue("latest");

public static final ConfigOption<Long> OPTIONAL_OFFSET_FROM_TIMESTAMP =
ConfigOptions.key("offsetFromTimestamp").longType().noDefaultValue();
// --------------------------------------------------------------------------------------------
// Scan specific options
// --------------------------------------------------------------------------------------------

public static final ConfigOption<StartupMode> OPTIONAL_SCAN_STARTUP_MODE =
ConfigOptions.key("scan.startup.mode")
.enumType(StartupMode.class)
.defaultValue(StartupMode.GROUP_OFFSETS)
.withDescription("Startup mode for RocketMQ consumer.");

public static final ConfigOption<OffsetResetStrategy> OPTIONAL_SCAN_OFFSET_RESET_STRATEGY =
ConfigOptions.key("scan.offsetReset.strategy")
.enumType(OffsetResetStrategy.class)
.defaultValue(OffsetResetStrategy.LATEST)
.withDescription(
"The offsetReset strategy only be used if group offsets is not found");

public static final ConfigOption<String> OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS =
ConfigOptions.key("scan.startup.specific-offsets")
.stringType()
.noDefaultValue()
.withDescription(
"Optional offsets used in case of \"specific-offsets\" startup mode");

public static final ConfigOption<Long> OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS =
ConfigOptions.key("scan.startup.timestamp-millis")
.longType()
.defaultValue(-1L)
.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");

public static final ConfigOption<Boolean> OPTIONAL_COMMIT_OFFSET_AUTO =
ConfigOptions.key("commit.offset.auto")
.booleanType()
.defaultValue(false)
.withDescription(
"Commit offset immediately when each message is fetched."
+ "If you don't enable the flink checkpoint, make sure this option is set to true.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,6 @@ public void open(Configuration parameters) throws Exception {
this.enableCheckpoint =
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();

if (offsetTable == null) {
offsetTable = new ConcurrentHashMap<>();
}
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
Expand Down Expand Up @@ -247,7 +244,16 @@ public void open(Configuration parameters) throws Exception {
// If the job recovers from the state, the state has already contained the offsets of last
// commit.
if (!restored) {
initOffsets(messageQueues);
this.offsetTable =
RocketMQUtils.initOffsets(
messageQueues,
consumer,
startMode,
offsetResetStrategy,
specificTimeStamp,
specificStartupOffsets);
} else {
this.offsetTable = new ConcurrentHashMap<>();
}
}

Expand Down Expand Up @@ -394,76 +400,6 @@ private void awaitTermination() throws InterruptedException {
}
}

/**
* only flink job start with no state can init offsets from broker
*
* @param messageQueues
* @throws MQClientException
*/
private void initOffsets(List<MessageQueue> messageQueues) throws MQClientException {
for (MessageQueue mq : messageQueues) {
long offset;
switch (startMode) {
case LATEST:
offset = consumer.maxOffset(mq);
break;
case EARLIEST:
offset = consumer.minOffset(mq);
break;
case GROUP_OFFSETS:
offset = consumer.fetchConsumeOffset(mq, false);
// the min offset return if consumer group first join,return a negative number
// if
// catch exception when fetch from broker.
// If you want consumer from earliest,please use OffsetResetStrategy.EARLIEST
if (offset <= 0) {
switch (offsetResetStrategy) {
case LATEST:
offset = consumer.maxOffset(mq);
log.info(
"current consumer thread:{} has no committed offset,use Strategy:{} instead",
mq,
offsetResetStrategy);
break;
case EARLIEST:
log.info(
"current consumer thread:{} has no committed offset,use Strategy:{} instead",
mq,
offsetResetStrategy);
offset = consumer.minOffset(mq);
break;
default:
break;
}
}
break;
case TIMESTAMP:
offset = consumer.searchOffset(mq, specificTimeStamp);
break;
case SPECIFIC_OFFSETS:
if (specificStartupOffsets == null) {
throw new RuntimeException(
"StartMode is specific_offsets.But none offsets has been specified");
}
Long specificOffset = specificStartupOffsets.get(mq);
if (specificOffset != null) {
offset = specificOffset;
} else {
offset = consumer.fetchConsumeOffset(mq, false);
}
break;
default:
throw new IllegalArgumentException(
"current startMode is not supported" + startMode);
}
log.info(
"current consumer queue:{} start from offset of: {}",
mq.getBrokerName() + "-" + mq.getQueueId(),
offset);
offsetTable.put(mq, offset);
}
}

/** consume from the min offset at every restart with no state */
public RocketMQSourceFunction<OUT> setStartFromEarliest() {
this.startMode = StartupMode.EARLIEST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,26 @@

/** RocketMQ startup mode. */
public enum StartupMode {
EARLIEST,
LATEST,
GROUP_OFFSETS,
TIMESTAMP,
SPECIFIC_OFFSETS
EARLIEST("earliest-offset", "Start from the earliest offset possible."),
LATEST("latest-offset", "Start from the latest offset."),
GROUP_OFFSETS(
"group-offsets",
"Start from committed offsets in brokers of a specific consumer group."),
TIMESTAMP("timestamp", "Start from user-supplied timestamp for each message queue."),
SPECIFIC_OFFSETS(
"specific-offsets",
"Start from user-supplied specific offsets for each message queue.");

private final String value;
private final String description;

StartupMode(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}
}
Loading