Skip to content

Commit

Permalink
Port RocketMQSink to FLIP-143.
Browse files Browse the repository at this point in the history
  • Loading branch information
GOODBOY008 committed Nov 15, 2022
1 parent 9d0e78d commit 06e467b
Show file tree
Hide file tree
Showing 15 changed files with 1,312 additions and 0 deletions.
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -38,6 +38,7 @@
<commons-lang.version>2.6</commons-lang.version>
<spotless.version>2.4.2</spotless.version>
<jaxb-api.version>2.3.1</jaxb-api.version>
<testcontainers.version>1.17.2</testcontainers.version>
<rocketmq.schema.registry.version>0.1.0</rocketmq.schema.registry.version>
</properties>

Expand Down
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.flink.legacy.example;

import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
import org.apache.rocketmq.flink.sink2.RocketMQSinkBuilder;
import org.apache.rocketmq.flink.sink2.writer.serializer.RocketMQMessageSerializationSchema;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.nio.ByteBuffer;
import java.util.Properties;

import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TAG;

public class RocketMQFlinkExample2 {

/**
* Source Config
*
* @return properties
*/
private static Properties getConsumerProps() {
Properties consumerProps = new Properties();
consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "http://192.168.20.10:9876");
consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "flink-consumer");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "TopicTest");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, DEFAULT_CONSUMER_TAG);
consumerProps.setProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
consumerProps.setProperty(RocketMQConfig.ACCESS_CHANNEL, AccessChannel.LOCAL.name());
return consumerProps;
}

/**
* Sink2 Config
*
* @return properties
*/
private static Properties getProducerProps() {
Properties producerProps = new Properties();
producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "http://192.168.20.10:9876");
producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, "flink-sink2");
producerProps.setProperty(RocketMQConfig.ACCESS_CHANNEL, AccessChannel.LOCAL.name());
return producerProps;
}

public static void main(String[] args) throws Exception {

final ParameterTool params = ParameterTool.fromArgs(args);

// for local
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// for cluster
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.getConfig().setGlobalJobParameters(params);
env.setStateBackend(new MemoryStateBackend());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// start a checkpoint every 10s
env.enableCheckpointing(10000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig()
.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Properties consumerProps = getConsumerProps();

SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();

final DataStreamSource<Long> source =
env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);

source.print();

source.sinkTo(
new RocketMQSinkBuilder<Long>()
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setRecordSerializer(
RocketMQMessageSerializationSchema.builder()
.setTopic("TopicTest")
.setValueSerializationSchema(new RecordSerializer())
.build())
.setProducerGroup("rocket-flink")
.setBootstrapServers("http://192.168.20.10:9876")
.build())
.name("aiden")
.setParallelism(2);

env.execute("rocketmq-connect-flink");
}

private static class RecordSerializer implements SerializationSchema<Long> {

@Override
public byte[] serialize(Long element) {
return ByteBuffer.allocate(Long.BYTES).putLong(element).array();
}
}
}
112 changes: 112 additions & 0 deletions src/main/java/org/apache/rocketmq/flink/sink2/RocketMQSink.java
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.flink.sink2;

import org.apache.rocketmq.flink.sink2.committer.RocketMQCommittable;
import org.apache.rocketmq.flink.sink2.committer.RocketMQCommittableSerializer;
import org.apache.rocketmq.flink.sink2.committer.RocketMQCommitter;
import org.apache.rocketmq.flink.sink2.writer.RocketMQWriter;
import org.apache.rocketmq.flink.sink2.writer.RocketMQWriterState;
import org.apache.rocketmq.flink.sink2.writer.RocketMQWriterStateSerializer;
import org.apache.rocketmq.flink.sink2.writer.serializer.RocketMQMessageSerializationSchema;

import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

public class RocketMQSink<IN>
implements StatefulSink<IN, RocketMQWriterState>,
TwoPhaseCommittingSink<IN, RocketMQCommittable> {
private final DeliveryGuarantee deliveryGuarantee;

private final Properties rabbitmqProducerConfig;

private final String producerGroup;

private final RocketMQMessageSerializationSchema<IN> recordSerializer;

RocketMQSink(
DeliveryGuarantee deliveryGuarantee,
Properties rabbitmqProducerConfig,
String producerGroup,
RocketMQMessageSerializationSchema<IN> recordSerializer) {
this.deliveryGuarantee = deliveryGuarantee;
this.rabbitmqProducerConfig = rabbitmqProducerConfig;
this.producerGroup = producerGroup;
this.recordSerializer = recordSerializer;
}

/**
* Create a {@link RocketMQSinkBuilder} to construct a new {@link RocketMQSink}.
*
* @param <IN> type of incoming records
* @return {@link RocketMQSinkBuilder}
*/
public static <IN> RocketMQSinkBuilder<IN> builder() {
return new RocketMQSinkBuilder<>();
}

@Override
public RocketMQWriter<IN> createWriter(InitContext context) throws IOException {

return new RocketMQWriter<>(
context,
rabbitmqProducerConfig,
recordSerializer,
context.asSerializationSchemaInitializationContext(),
deliveryGuarantee,
producerGroup,
Collections.emptyList());
}

@Override
public RocketMQWriter<IN> restoreWriter(
InitContext context, Collection<RocketMQWriterState> recoveredState)
throws IOException {
return new RocketMQWriter<>(
context,
rabbitmqProducerConfig,
recordSerializer,
context.asSerializationSchemaInitializationContext(),
deliveryGuarantee,
producerGroup,
recoveredState);
}

@Override
public SimpleVersionedSerializer<RocketMQWriterState> getWriterStateSerializer() {
return new RocketMQWriterStateSerializer();
}

@Override
public Committer<RocketMQCommittable> createCommitter() throws IOException {
return new RocketMQCommitter();
}

@Override
public SimpleVersionedSerializer<RocketMQCommittable> getCommittableSerializer() {
return new RocketMQCommittableSerializer();
}
}
125 changes: 125 additions & 0 deletions src/main/java/org/apache/rocketmq/flink/sink2/RocketMQSinkBuilder.java
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.flink.sink2;

import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.sink2.writer.serializer.RocketMQMessageSerializationSchema;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

import static org.apache.flink.util.Preconditions.checkNotNull;

public class RocketMQSinkBuilder<IN> {

private static final Logger LOG = LoggerFactory.getLogger(RocketMQSinkBuilder.class);

private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;

private String producerGroup = "rocketmq-sink";

private final Properties rocketmqProducerConfig;

private RocketMQMessageSerializationSchema<IN> recordSerializer;

public RocketMQSinkBuilder() {
rocketmqProducerConfig = new Properties();
}

/**
* Sets the wanted the {@link DeliveryGuarantee}. The default delivery guarantee is {@link
* #deliveryGuarantee}.
*
* @param deliveryGuarantee
* @return {@link RocketMQSinkBuilder}
*/
public RocketMQSinkBuilder<IN> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
this.deliveryGuarantee = checkNotNull(deliveryGuarantee, "deliveryGuarantee");
return this;
}

/**
* Sets the configuration which used to instantiate all used
*
* @param props
* @return {@link RocketMQSinkBuilder}
*/
public RocketMQSinkBuilder<IN> setRocketmqProducerConfig(Properties props) {
checkNotNull(props);
rocketmqProducerConfig.putAll(props);
return this;
}

/**
* Sets the RocketMQ bootstrap servers.
*
* @param nameServerAddr a comma separated list of valid URIs to reach the rocketmq broker
* @return {@link RocketMQSinkBuilder}
*/
public RocketMQSinkBuilder<IN> setBootstrapServers(String nameServerAddr) {
return setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddr);
}

/**
* Sets the RocketMQ producer group.
*
* @param producerGroup
* @return {@link RocketMQSinkBuilder}
*/
public RocketMQSinkBuilder<IN> setProducerGroup(String producerGroup) {
this.producerGroup = checkNotNull(producerGroup, "producerGroup");
return this;
}

public RocketMQSinkBuilder<IN> setProperty(String key, String value) {
checkNotNull(key);
rocketmqProducerConfig.setProperty(key, value);
return this;
}

/**
* Sets the {@link RocketMQMessageSerializationSchema} that transforms incoming records to
* {@link org.apache.rocketmq.common.message.Message}s.
*
* @param recordSerializer
* @return {@link RocketMQSinkBuilder}
*/
public RocketMQSinkBuilder<IN> setRecordSerializer(
RocketMQMessageSerializationSchema<IN> recordSerializer) {
this.recordSerializer = checkNotNull(recordSerializer, "recordSerializer");
ClosureCleaner.clean(
this.recordSerializer, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
return this;
}

private void sanityCheck() {
// TODO: 2022/11/3 add sanity check
}

public RocketMQSink build() {
sanityCheck();
return new RocketMQSink<>(
deliveryGuarantee, rocketmqProducerConfig, producerGroup, recordSerializer);
}
}

0 comments on commit 06e467b

Please sign in to comment.