Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
  • Loading branch information
GOODBOY008 committed Nov 3, 2022
1 parent 34e6be6 commit 21e95fd
Show file tree
Hide file tree
Showing 11 changed files with 22 additions and 30 deletions.
Expand Up @@ -20,8 +20,8 @@
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
import org.apache.rocketmq.flink.sink2.RocketMQMessageSerializationSchema;
import org.apache.rocketmq.flink.sink2.RocketMQSinkBuilder;
import org.apache.rocketmq.flink.sink2.writer.serializer.RocketMQMessageSerializationSchema;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.utils.ParameterTool;
Expand Down
Expand Up @@ -21,6 +21,9 @@
import org.apache.rocketmq.flink.sink2.committer.RocketMQCommittableSerializer;
import org.apache.rocketmq.flink.sink2.committer.RocketMQCommitter;
import org.apache.rocketmq.flink.sink2.writer.RocketMQWriter;
import org.apache.rocketmq.flink.sink2.writer.RocketMQWriterState;
import org.apache.rocketmq.flink.sink2.writer.RocketMQWriterStateSerializer;
import org.apache.rocketmq.flink.sink2.writer.serializer.RocketMQMessageSerializationSchema;

import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.StatefulSink;
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.rocketmq.flink.sink2;

import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.sink2.writer.serializer.RocketMQMessageSerializationSchema;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
Expand Down Expand Up @@ -71,7 +72,7 @@ public RocketMQSinkBuilder<IN> setRocketmqProducerConfig(Properties props) {
}

/**
* Sets the Kafka bootstrap servers.
* Sets the RocketMQ bootstrap servers.
*
* @param nameServerAddr a comma separated list of valid URIs to reach the rocketmq broker
* @return {@link RocketMQSinkBuilder}
Expand Down Expand Up @@ -113,19 +114,7 @@ public RocketMQSinkBuilder<IN> setRecordSerializer(
}

private void sanityCheck() {
// checkNotNull(
// kafkaProducerConfig.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
// "bootstrapServers");
// if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
// checkState(
// transactionalIdPrefix != null,
// "EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be
// set
// to provide unique transaction names across multiple KafkaSinks writing
// to the same Kafka
// cluster.");
// }
// checkNotNull(recordSerializer, "recordSerializer");
// TODO: 2022/11/3 add sanity check
}

public RocketMQSink build() {
Expand Down
Expand Up @@ -15,12 +15,13 @@
* limitations under the License.
*/

package org.apache.rocketmq.flink.sink2;
package org.apache.rocketmq.flink.sink2.writer;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.sink2.writer.serializer.RocketMQMessageSerializationSchema;

import java.util.Comparator;
import java.util.HashMap;
Expand Down
Expand Up @@ -24,11 +24,9 @@
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.sink2.DefaultRocketMQSinkContext;
import org.apache.rocketmq.flink.sink2.RocketMQMessageSerializationSchema;
import org.apache.rocketmq.flink.sink2.RocketMQWriterState;
import org.apache.rocketmq.flink.sink2.committer.RocketMQCommittable;
import org.apache.rocketmq.flink.sink2.committer.RocketMQCommitter;
import org.apache.rocketmq.flink.sink2.writer.serializer.RocketMQMessageSerializationSchema;
import org.apache.rocketmq.remoting.exception.RemotingException;

import org.apache.flink.api.common.operators.MailboxExecutor;
Expand Down Expand Up @@ -111,7 +109,7 @@ public RocketMQWriter(
producerGroup, RocketMQConfig.buildAclRPCHook(rabbitmqProducerConfig));
} else {
throw new UnsupportedOperationException(
"Unsupported Kafka writer semantic " + this.deliveryGuarantee);
"Unsupported RocketMQ writer semantic " + this.deliveryGuarantee);
}

RocketMQConfig.buildProducerConfigs(rabbitmqProducerConfig, currentProducer);
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.rocketmq.flink.sink2;
package org.apache.rocketmq.flink.sink2.writer;

import java.util.ArrayDeque;

Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.rocketmq.flink.sink2;
package org.apache.rocketmq.flink.sink2.writer;

import org.apache.flink.core.io.SimpleVersionedSerializer;

Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.rocketmq.flink.sink2;
package org.apache.rocketmq.flink.sink2.writer.serializer;

import org.apache.rocketmq.common.message.Message;

Expand Down Expand Up @@ -55,7 +55,7 @@ default void open(
*/
Message serialize(T element, RocketMQSinkContext context, Long timestamp);

/** Context providing information of the kafka record target location. */
/** Context providing information of the RocketMQ record target location. */
@Internal
interface RocketMQSinkContext {

Expand All @@ -77,7 +77,7 @@ interface RocketMQSinkContext {
* <p>After the first retrieval the returned partitions are cached. If the partitions are
* updated the job has to be restarted to make the change visible.
*
* @param topic kafka topic with partitions
* @param topic RocketMQ topic with partitions
* @return the ids of the currently available partitions
*/
int[] getPartitionsForTopic(String topic);
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.rocketmq.flink.sink2;
package org.apache.rocketmq.flink.sink2.writer.serializer;

import org.apache.rocketmq.common.message.Message;

Expand Down Expand Up @@ -62,13 +62,14 @@ public RocketMQMessageSerializationSchema<IN> build() {
}

/**
* Sets a custom partitioner determining the target partition of the target topic.
* Sets a custom tag determining the target partition of the target topic.
*
* @param tagSelector
* @return {@code this}
*/
public <T extends IN> RocketMQMessageSerializationSchemaBuilder<T> setPartitioner(
public <T extends IN> RocketMQMessageSerializationSchemaBuilder<T> setTagSelector(
RocketMQTagSelector<? super T> tagSelector) {
checkState(this.topicSelector == null, "Topic selector already set.");
RocketMQMessageSerializationSchemaBuilder<T> self = self();
self.tagSelector = checkNotNull(tagSelector);
return self;
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.rocketmq.flink.sink2;
package org.apache.rocketmq.flink.sink2.writer.serializer;

import java.io.Serializable;

Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.rocketmq.flink.sink2;
package org.apache.rocketmq.flink.sink2.writer.serializer;

import java.io.Serializable;
import java.util.function.Function;
Expand Down

0 comments on commit 21e95fd

Please sign in to comment.