Skip to content

Commit

Permalink
[FLINK-8468][RabbitMQ] Make RabbitMQ connector to take advantage of A…
Browse files Browse the repository at this point in the history
…MQP features (routing key, exchange and message properties)

[FLINK-8468] Make the connector to take advantage of AMQP features (routing key, exchange and message properties) and Test

[FLINK-8468] Make the connector to take advantage of AMQP features (routing key, exchange and message properties) and Test

[FLINK-8468] Make the connector to take advantage of AMQP features (routing key, exchange and message properties) and Test

[FLINK-8468] Make the connector to take advantage of AMQP features (routing key, exchange and message properties) and Test

[FLINK-8468] Make the connector to take advantage of AMQP features (routing key, exchange and message properties) and Test

[FLINK-8468] Make the connector to take advantage of AMQP features (routing key, exchange and message properties) and Test

Fix test assertions and imports

Modified according to code reviews.

[FLINK-8468] Take advantage of AMQP features (routing key, exchange, flags, message properties and returnListener)

Update README.md

Update Readme.md

[FLINK-8468] Take advantage of AMQP features (routing key, exchange, flags, message properties and returnListener)

[FLINK-8468] Take advantage of AMQP features (routing key, exchange, flags, message properties and returnListener)

[FLINK-8468] Take advantage of AMQP features (routing key, exchange, flags, message properties and returnListener)

[FLINK-8468] Take advantage of AMQP features (routing key, exchange, flags, message properties and returnListener)
  • Loading branch information
Philippe Duveau authored and tillrohrmann committed Jun 27, 2018
1 parent 217c312 commit 32dea9a
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 5 deletions.
4 changes: 4 additions & 0 deletions flink-connectors/flink-connector-rabbitmq/README.md
Expand Up @@ -9,3 +9,7 @@ nor packages binaries from the "RabbitMQ AMQP Java Client".
Users that create and publish derivative work based on Flink's Users that create and publish derivative work based on Flink's
RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").

# This version provides a mechanism to handle AMQ Messaging features

One of its Constructor uses an implemented interface object with five methods and an optionnal returned message handler. See RMQSinkFeatureTest class to get a sample of the methods to implement. The returned message handler is an implementation of the standard com.rabbitmq.client.ReturnListener interface. As this mechasnism uses RoutingKeys, queueName is null then the queue can not be declared to RabbitMQ during start.
Expand Up @@ -17,6 +17,7 @@


package org.apache.flink.streaming.connectors.rabbitmq; package org.apache.flink.streaming.connectors.rabbitmq;


import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
Expand All @@ -25,6 +26,7 @@
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -47,15 +49,58 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
protected SerializationSchema<IN> schema; protected SerializationSchema<IN> schema;
private boolean logFailuresOnly = false; private boolean logFailuresOnly = false;


private final RMQSinkPublishOptions<IN> publishOptions;
private final ReturnListener returnListener;

/** /**
* @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}.
* @param queueName The queue to publish messages to. * @param queueName The queue to publish messages to.
* @param schema A {@link SerializationSchema} for turning the Java objects received into bytes * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
* @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties
* @param returnListener A ReturnListener implementation object to handle returned message event
*/ */
public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema) { private RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema,
RMQSinkPublishOptions<IN> publishOptions, ReturnListener returnListener) {
this.rmqConnectionConfig = rmqConnectionConfig; this.rmqConnectionConfig = rmqConnectionConfig;
this.queueName = queueName; this.queueName = queueName;
this.schema = schema; this.schema = schema;
this.publishOptions = publishOptions;
this.returnListener = returnListener;
}

/**
* @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}.
* @param queueName The queue to publish messages to.
* @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
*/
@PublicEvolving
public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema) {
this(rmqConnectionConfig, queueName, schema, null, null);
}

/**
* @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}.
* @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
* @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties
* In this case the computeMandatoy or computeImmediate MUST return false otherwise an
* IllegalStateException is raised during runtime.
*/
@PublicEvolving
public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema<IN> schema,
RMQSinkPublishOptions<IN> publishOptions) {
this(rmqConnectionConfig, null, schema, publishOptions, null);
}

/**
* @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}.
* @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
* @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties
* @param returnListener A ReturnListener implementation object to handle returned message event
*/
@PublicEvolving
public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema<IN> schema,
RMQSinkPublishOptions<IN> publishOptions, ReturnListener returnListener) {
this(rmqConnectionConfig, null, schema, publishOptions, returnListener);
} }


/** /**
Expand All @@ -64,7 +109,9 @@ public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, Serial
* defining custom queue parameters) * defining custom queue parameters)
*/ */
protected void setupQueue() throws IOException { protected void setupQueue() throws IOException {
channel.queueDeclare(queueName, false, false, false, null); if (queueName != null) {
channel.queueDeclare(queueName, false, false, false, null);
}
} }


/** /**
Expand All @@ -89,6 +136,9 @@ public void open(Configuration config) throws Exception {
throw new RuntimeException("None of RabbitMQ channels are available"); throw new RuntimeException("None of RabbitMQ channels are available");
} }
setupQueue(); setupQueue();
if (returnListener != null) {
channel.addReturnListener(returnListener);
}
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Error while creating the channel", e); throw new RuntimeException("Error while creating the channel", e);
} }
Expand All @@ -105,7 +155,29 @@ public void invoke(IN value) {
try { try {
byte[] msg = schema.serialize(value); byte[] msg = schema.serialize(value);


channel.basicPublish("", queueName, null, msg); if (publishOptions == null) {
channel.basicPublish("", queueName, null, msg);
} else {
boolean mandatory = publishOptions.computeMandatory(value);
boolean immediate = publishOptions.computeImmediate(value);

if (returnListener == null && (mandatory || immediate)) {
throw new IllegalStateException("Setting mandatory and/or immediate flags to true requires a ReturnListener.");
} else {
String rk = publishOptions.computeRoutingKey(value);
if (rk == null) {
throw new NullPointerException("computeRoutingKey returned an anormal 'null' value.");
}
String exchange = publishOptions.computeExchange(value);
if (exchange == null) {
throw new NullPointerException("computeExchange returned an anormal 'null' value.");
}

channel.basicPublish(exchange, rk, mandatory, immediate,
publishOptions.computeProperties(value), msg);
}
}

} catch (IOException e) { } catch (IOException e) {
if (logFailuresOnly) { if (logFailuresOnly) {
LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e); LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e);
Expand Down
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.rabbitmq;

import org.apache.flink.annotation.PublicEvolving;

import com.rabbitmq.client.AMQP.BasicProperties;

/**
* The message computation provides methods to compute the message routing key and/or the properties.
*
* @param <IN> The type of the data used by the sink.
*/
@PublicEvolving
public interface RMQSinkPublishOptions<IN> extends java.io.Serializable {

/**
* Compute the message's routing key from the data.
* @param a The data used by the sink
* @return The routing key of the message
* null will raise a NullPointerException
*/
String computeRoutingKey(IN a);

/**
* Compute the message's properties from the data.
* @param a The data used by the sink
* @return The message's properties (can be null)
*/
BasicProperties computeProperties(IN a);

/**
* Compute the exchange from the data.
* @param a The data used by the sink
* @return The exchange to publish the message to
* null will raise a NullPointerException
*/
String computeExchange(IN a);

/**
* Compute the mandatory flag used in basic.publish method
* See AMQP API help for values.
* A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false)
* @param a The data used by the sink
* @return The mandatory flag
*/
boolean computeMandatory(IN a);

/**
* Compute the immediate flag
* See AMQP API help for values.
* A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false)
* @param a The data used by the sink
* @return The mandatory flag
*/
boolean computeImmediate(IN a);
}

0 comments on commit 32dea9a

Please sign in to comment.