diff --git a/flink-connectors/flink-connector-rabbitmq/README.md b/flink-connectors/flink-connector-rabbitmq/README.md index b270d05eab54e..de8d1d89c69be 100644 --- a/flink-connectors/flink-connector-rabbitmq/README.md +++ b/flink-connectors/flink-connector-rabbitmq/README.md @@ -9,7 +9,3 @@ nor packages binaries from the "RabbitMQ AMQP Java Client". Users that create and publish derivative work based on Flink's RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). - -# This version provides a mechanism to handle AMQ Messaging features - -One of its Constructor uses an implemented interface object with five methods and an optionnal returned message handler. See RMQSinkFeatureTest class to get a sample of the methods to implement. The returned message handler is an implementation of the standard com.rabbitmq.client.ReturnListener interface. As this mechasnism uses RoutingKeys, queueName is null then the queue can not be declared to RabbitMQ during start. diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java index 0b447c1f2c5b7..b1665bd19ad30 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java +++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java @@ -22,14 +22,16 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; +import org.apache.flink.util.Preconditions; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.ReturnListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.concurrent.TimeoutException; @@ -42,25 +44,34 @@ public class RMQSink extends RichSinkFunction { private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); + @Nullable protected final String queueName; + private final RMQConnectionConfig rmqConnectionConfig; protected transient Connection connection; protected transient Channel channel; protected SerializationSchema schema; private boolean logFailuresOnly = false; + @Nullable private final RMQSinkPublishOptions publishOptions; - private final ReturnListener returnListener; + + @Nullable + private final SerializableReturnListener returnListener; /** * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. * @param queueName The queue to publish messages to. * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes * @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties - * @param returnListener A ReturnListener implementation object to handle returned message event + * @param returnListener A SerializableReturnListener implementation object to handle returned message event */ - private RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema, - RMQSinkPublishOptions publishOptions, ReturnListener returnListener) { + private RMQSink( + RMQConnectionConfig rmqConnectionConfig, + @Nullable String queueName, + SerializationSchema schema, + @Nullable RMQSinkPublishOptions publishOptions, + @Nullable SerializableReturnListener returnListener) { this.rmqConnectionConfig = rmqConnectionConfig; this.queueName = queueName; this.schema = schema; @@ -95,11 +106,11 @@ public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes * @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties - * @param returnListener A ReturnListener implementation object to handle returned message event + * @param returnListener A SerializableReturnListener implementation object to handle returned message event */ @PublicEvolving public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema schema, - RMQSinkPublishOptions publishOptions, ReturnListener returnListener) { + RMQSinkPublishOptions publishOptions, SerializableReturnListener returnListener) { this(rmqConnectionConfig, null, schema, publishOptions, returnListener); } @@ -161,23 +172,15 @@ public void invoke(IN value) { boolean mandatory = publishOptions.computeMandatory(value); boolean immediate = publishOptions.computeImmediate(value); - if (returnListener == null && (mandatory || immediate)) { - throw new IllegalStateException("Setting mandatory and/or immediate flags to true requires a ReturnListener."); - } else { - String rk = publishOptions.computeRoutingKey(value); - if (rk == null) { - throw new NullPointerException("computeRoutingKey returned an anormal 'null' value."); - } - String exchange = publishOptions.computeExchange(value); - if (exchange == null) { - throw new NullPointerException("computeExchange returned an anormal 'null' value."); - } - - channel.basicPublish(exchange, rk, mandatory, immediate, - publishOptions.computeProperties(value), msg); - } - } + Preconditions.checkState(!(returnListener == null && (mandatory || immediate)), + "Setting mandatory and/or immediate flags to true requires a ReturnListener."); + + String rk = publishOptions.computeRoutingKey(value); + String exchange = publishOptions.computeExchange(value); + channel.basicPublish(exchange, rk, mandatory, immediate, + publishOptions.computeProperties(value), msg); + } } catch (IOException e) { if (logFailuresOnly) { LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e); diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java index 62a0832349320..9e22e9a645a2a 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java +++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java @@ -1,72 +1,74 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.rabbitmq; - -import org.apache.flink.annotation.PublicEvolving; - -import com.rabbitmq.client.AMQP.BasicProperties; - -/** - * The message computation provides methods to compute the message routing key and/or the properties. - * - * @param The type of the data used by the sink. - */ -@PublicEvolving -public interface RMQSinkPublishOptions extends java.io.Serializable { - - /** - * Compute the message's routing key from the data. - * @param a The data used by the sink - * @return The routing key of the message - * null will raise a NullPointerException - */ - String computeRoutingKey(IN a); - - /** - * Compute the message's properties from the data. - * @param a The data used by the sink - * @return The message's properties (can be null) - */ - BasicProperties computeProperties(IN a); - - /** - * Compute the exchange from the data. - * @param a The data used by the sink - * @return The exchange to publish the message to - * null will raise a NullPointerException - */ - String computeExchange(IN a); - - /** - * Compute the mandatory flag used in basic.publish method - * See AMQP API help for values. - * A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false) - * @param a The data used by the sink - * @return The mandatory flag - */ - boolean computeMandatory(IN a); - - /** - * Compute the immediate flag - * See AMQP API help for values. - * A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false) - * @param a The data used by the sink - * @return The mandatory flag - */ - boolean computeImmediate(IN a); -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.annotation.PublicEvolving; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * The message computation provides methods to compute the message routing key and/or the properties. + * + * @param The type of the data used by the sink. + */ +@PublicEvolving +public interface RMQSinkPublishOptions extends java.io.Serializable { + + /** + * Compute the message's routing key from the data. + * @param a The data used by the sink + * @return The routing key of the message + * null will raise a NullPointerException + */ + String computeRoutingKey(IN a); + + /** + * Compute the message's properties from the data. + * @param a The data used by the sink + * @return The message's properties (can be null) + */ + BasicProperties computeProperties(IN a); + + /** + * Compute the exchange from the data. + * @param a The data used by the sink + * @return The exchange to publish the message to + * null will raise a NullPointerException + */ + String computeExchange(IN a); + + /** + * Compute the mandatory flag passed to method {@link com.rabbitmq.client.Channel#basicPublish(String, String, boolean, boolean, BasicProperties, byte[])}. + * A {@link SerializableReturnListener} is mandatory if this flag can be true. + * @param a The data used by the sink + * @return The mandatory flag + */ + default boolean computeMandatory(IN a) { + return false; + } + + /** + * Compute the immediate flag passed to method {@link com.rabbitmq.client.Channel#basicPublish(String, String, boolean, boolean, BasicProperties, byte[])}. + * A {@link SerializableReturnListener} is mandatory if this flag can be true. + * @param a The data used by the sink + * @return The mandatory flag + */ + default boolean computeImmediate(IN a) { + return false; + } +} diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/SerializableReturnListener.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/SerializableReturnListener.java new file mode 100644 index 0000000000000..8c667076ca368 --- /dev/null +++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/SerializableReturnListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import com.rabbitmq.client.ReturnListener; + +import java.io.Serializable; + +/** + * A serializable {@link ReturnListener}. + */ +public interface SerializableReturnListener extends Serializable, ReturnListener { +} diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java index 011ffcc5faffe..9c3c9acf0f2c6 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java @@ -27,7 +27,6 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.ReturnListener; import org.junit.Before; import org.junit.Test; @@ -259,10 +258,12 @@ public boolean computeImmediate(String a) { } } - private class DummyReturnHandler implements ReturnListener { + private class DummyReturnHandler implements SerializableReturnListener { + + private static final long serialVersionUID = 1L; + @Override - public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5) - throws IOException { + public void handleReturn(final int replyCode, final String replyText, final String exchange, final String routingKey, final BasicProperties properties, final byte[] body) { } }