diff --git a/flink-connectors/flink-connector-rabbitmq/README.md b/flink-connectors/flink-connector-rabbitmq/README.md index de8d1d89c69be..b270d05eab54e 100644 --- a/flink-connectors/flink-connector-rabbitmq/README.md +++ b/flink-connectors/flink-connector-rabbitmq/README.md @@ -9,3 +9,7 @@ nor packages binaries from the "RabbitMQ AMQP Java Client". Users that create and publish derivative work based on Flink's RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). + +# This version provides a mechanism to handle AMQ Messaging features + +One of its Constructor uses an implemented interface object with five methods and an optionnal returned message handler. See RMQSinkFeatureTest class to get a sample of the methods to implement. The returned message handler is an implementation of the standard com.rabbitmq.client.ReturnListener interface. As this mechasnism uses RoutingKeys, queueName is null then the queue can not be declared to RabbitMQ during start. diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java index c1118ed6b0e54..0b447c1f2c5b7 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java +++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.rabbitmq; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -25,6 +26,7 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.ReturnListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,15 +49,58 @@ public class RMQSink extends RichSinkFunction { protected SerializationSchema schema; private boolean logFailuresOnly = false; + private final RMQSinkPublishOptions publishOptions; + private final ReturnListener returnListener; + /** * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. * @param queueName The queue to publish messages to. * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes + * @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties + * @param returnListener A ReturnListener implementation object to handle returned message event */ - public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema) { + private RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema, + RMQSinkPublishOptions publishOptions, ReturnListener returnListener) { this.rmqConnectionConfig = rmqConnectionConfig; this.queueName = queueName; this.schema = schema; + this.publishOptions = publishOptions; + this.returnListener = returnListener; + } + + /** + * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. + * @param queueName The queue to publish messages to. + * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes + */ + @PublicEvolving + public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema schema) { + this(rmqConnectionConfig, queueName, schema, null, null); + } + + /** + * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. + * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes + * @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties + * In this case the computeMandatoy or computeImmediate MUST return false otherwise an + * IllegalStateException is raised during runtime. + */ + @PublicEvolving + public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema schema, + RMQSinkPublishOptions publishOptions) { + this(rmqConnectionConfig, null, schema, publishOptions, null); + } + + /** + * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. + * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes + * @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties + * @param returnListener A ReturnListener implementation object to handle returned message event + */ + @PublicEvolving + public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema schema, + RMQSinkPublishOptions publishOptions, ReturnListener returnListener) { + this(rmqConnectionConfig, null, schema, publishOptions, returnListener); } /** @@ -64,7 +109,9 @@ public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, Serial * defining custom queue parameters) */ protected void setupQueue() throws IOException { - channel.queueDeclare(queueName, false, false, false, null); + if (queueName != null) { + channel.queueDeclare(queueName, false, false, false, null); + } } /** @@ -89,6 +136,9 @@ public void open(Configuration config) throws Exception { throw new RuntimeException("None of RabbitMQ channels are available"); } setupQueue(); + if (returnListener != null) { + channel.addReturnListener(returnListener); + } } catch (IOException e) { throw new RuntimeException("Error while creating the channel", e); } @@ -105,7 +155,29 @@ public void invoke(IN value) { try { byte[] msg = schema.serialize(value); - channel.basicPublish("", queueName, null, msg); + if (publishOptions == null) { + channel.basicPublish("", queueName, null, msg); + } else { + boolean mandatory = publishOptions.computeMandatory(value); + boolean immediate = publishOptions.computeImmediate(value); + + if (returnListener == null && (mandatory || immediate)) { + throw new IllegalStateException("Setting mandatory and/or immediate flags to true requires a ReturnListener."); + } else { + String rk = publishOptions.computeRoutingKey(value); + if (rk == null) { + throw new NullPointerException("computeRoutingKey returned an anormal 'null' value."); + } + String exchange = publishOptions.computeExchange(value); + if (exchange == null) { + throw new NullPointerException("computeExchange returned an anormal 'null' value."); + } + + channel.basicPublish(exchange, rk, mandatory, immediate, + publishOptions.computeProperties(value), msg); + } + } + } catch (IOException e) { if (logFailuresOnly) { LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e); diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java new file mode 100644 index 0000000000000..62a0832349320 --- /dev/null +++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkPublishOptions.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.annotation.PublicEvolving; + +import com.rabbitmq.client.AMQP.BasicProperties; + +/** + * The message computation provides methods to compute the message routing key and/or the properties. + * + * @param The type of the data used by the sink. + */ +@PublicEvolving +public interface RMQSinkPublishOptions extends java.io.Serializable { + + /** + * Compute the message's routing key from the data. + * @param a The data used by the sink + * @return The routing key of the message + * null will raise a NullPointerException + */ + String computeRoutingKey(IN a); + + /** + * Compute the message's properties from the data. + * @param a The data used by the sink + * @return The message's properties (can be null) + */ + BasicProperties computeProperties(IN a); + + /** + * Compute the exchange from the data. + * @param a The data used by the sink + * @return The exchange to publish the message to + * null will raise a NullPointerException + */ + String computeExchange(IN a); + + /** + * Compute the mandatory flag used in basic.publish method + * See AMQP API help for values. + * A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false) + * @param a The data used by the sink + * @return The mandatory flag + */ + boolean computeMandatory(IN a); + + /** + * Compute the immediate flag + * See AMQP API help for values. + * A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false) + * @param a The data used by the sink + * @return The mandatory flag + */ + boolean computeImmediate(IN a); +} diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java index 53b834d2ead45..011ffcc5faffe 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java @@ -22,17 +22,22 @@ import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.ReturnListener; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.Collections; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -43,14 +48,23 @@ public class RMQSinkTest { private static final String QUEUE_NAME = "queue"; + private static final String EXCHANGE = "exchange"; + private static final String ROUTING_KEY = "application.component.error"; + private static final String EXPIRATION = "10000"; private static final String MESSAGE_STR = "msg"; private static final byte[] MESSAGE = new byte[1]; + private static AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() + .headers(Collections.singletonMap("Test", "My Value")) + .expiration(EXPIRATION) + .build(); private RMQConnectionConfig rmqConnectionConfig; private ConnectionFactory connectionFactory; private Connection connection; private Channel channel; private SerializationSchema serializationSchema; + private DummyPublishOptions publishOptions; + private DummyReturnHandler returnListener; @Before public void before() throws Exception { @@ -66,12 +80,19 @@ public void before() throws Exception { } @Test - public void openCallDeclaresQueue() throws Exception { + public void openCallDeclaresQueueInStandardMode() throws Exception { createRMQSink(); verify(channel).queueDeclare(QUEUE_NAME, false, false, false, null); } + @Test + public void openCallDontDeclaresQueueInWithOptionsMode() throws Exception { + createRMQSinkWithOptions(false, false); + + verify(channel, never()).queueDeclare(null, false, false, false, null); + } + @Test public void throwExceptionIfChannelIsNull() throws Exception { when(connection.createChannel()).thenReturn(null); @@ -83,7 +104,22 @@ public void throwExceptionIfChannelIsNull() throws Exception { } private RMQSink createRMQSink() throws Exception { - RMQSink rmqSink = new RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + RMQSink rmqSink = new RMQSink<>(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + rmqSink.open(new Configuration()); + return rmqSink; + } + + private RMQSink createRMQSinkWithOptions(boolean mandatory, boolean immediate) throws Exception { + publishOptions = new DummyPublishOptions(mandatory, immediate); + RMQSink rmqSink = new RMQSink<>(rmqConnectionConfig, serializationSchema, publishOptions); + rmqSink.open(new Configuration()); + return rmqSink; + } + + private RMQSink createRMQSinkWithOptionsAndReturnHandler(boolean mandatory, boolean immediate) throws Exception { + publishOptions = new DummyPublishOptions(mandatory, immediate); + returnListener = new DummyReturnHandler(); + RMQSink rmqSink = new RMQSink<>(rmqConnectionConfig, serializationSchema, publishOptions, returnListener); rmqSink.open(new Configuration()); return rmqSink; } @@ -124,7 +160,115 @@ public void closeAllResources() throws Exception { verify(connection).close(); } + @Test + public void invokePublishBytesToQueueWithOptions() throws Exception { + RMQSink rmqSink = createRMQSinkWithOptions(false, false); + + rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0)); + verify(serializationSchema).serialize(MESSAGE_STR); + verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false, + publishOptions.computeProperties(""), MESSAGE); + } + + @Test(expected = IllegalStateException.class) + public void invokePublishBytesToQueueWithOptionsMandatory() throws Exception { + RMQSink rmqSink = createRMQSinkWithOptions(true, false); + + rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0)); + } + + @Test(expected = IllegalStateException.class) + public void invokePublishBytesToQueueWithOptionsImmediate() throws Exception { + RMQSink rmqSink = createRMQSinkWithOptions(false, true); + + rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0)); + } + + @Test + public void invokePublishBytesToQueueWithOptionsMandatoryReturnHandler() throws Exception { + RMQSink rmqSink = createRMQSinkWithOptionsAndReturnHandler(true, false); + + rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0)); + verify(serializationSchema).serialize(MESSAGE_STR); + verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, true, false, + publishOptions.computeProperties(""), MESSAGE); + } + + @Test + public void invokePublishBytesToQueueWithOptionsImmediateReturnHandler() throws Exception { + RMQSink rmqSink = createRMQSinkWithOptionsAndReturnHandler(false, true); + + rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0)); + verify(serializationSchema).serialize(MESSAGE_STR); + verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, true, + publishOptions.computeProperties(""), MESSAGE); + } + + @Test(expected = RuntimeException.class) + public void exceptionDuringWithOptionsPublishingIsNotIgnored() throws Exception { + RMQSink rmqSink = createRMQSinkWithOptions(false, false); + + doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false, + publishOptions.computeProperties(""), MESSAGE); + rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0)); + } + + @Test + public void exceptionDuringWithOptionsPublishingIsIgnoredIfLogFailuresOnly() throws Exception { + RMQSink rmqSink = createRMQSinkWithOptions(false, false); + rmqSink.setLogFailuresOnly(true); + + doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, false, + publishOptions.computeProperties(""), MESSAGE); + rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0)); + } + + private class DummyPublishOptions implements RMQSinkPublishOptions { + private static final long serialVersionUID = 1L; + private boolean mandatory = false; + private boolean immediate = false; + + public DummyPublishOptions(boolean mandatory, boolean immediate) { + this.mandatory = mandatory; + this.immediate = immediate; + } + + @Override + public String computeRoutingKey(String a) { + return ROUTING_KEY; + } + + @Override + public BasicProperties computeProperties(String a) { + return props; + } + + @Override + public String computeExchange(String a) { + return EXCHANGE; + } + + @Override + public boolean computeMandatory(String a) { + return mandatory; + } + + @Override + public boolean computeImmediate(String a) { + return immediate; + } + } + + private class DummyReturnHandler implements ReturnListener { + @Override + public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5) + throws IOException { + } + } + private class DummySerializationSchema implements SerializationSchema { + private static final long serialVersionUID = 1L; + @Override public byte[] serialize(String element) { return MESSAGE;