Skip to content

Commit

Permalink
[FLINK-8468][RabbitMQ] Improve RabbitMQ connector
Browse files Browse the repository at this point in the history
Remove text from README.md because it is covered by Javadocs.

[FLINK-8468][RabbitMQ] Introduce SerializableReturnListener interface.

[FLINK-8468][RabbitMQ] Update RMQSinkPublishOptions.

Improve Javadocs.
Add default implementation for methods computeMandatory and computeImmediate.

[FLINK-8468][RabbitMQ] Replace statements with Preconditions.checkState

This closes #5410.
  • Loading branch information
GJL authored and tillrohrmann committed Jun 27, 2018
1 parent 32dea9a commit a161606
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 103 deletions.
4 changes: 0 additions & 4 deletions flink-connectors/flink-connector-rabbitmq/README.md
Expand Up @@ -9,7 +9,3 @@ nor packages binaries from the "RabbitMQ AMQP Java Client".
Users that create and publish derivative work based on Flink's Users that create and publish derivative work based on Flink's
RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").

# This version provides a mechanism to handle AMQ Messaging features

One of its Constructor uses an implemented interface object with five methods and an optionnal returned message handler. See RMQSinkFeatureTest class to get a sample of the methods to implement. The returned message handler is an implementation of the standard com.rabbitmq.client.ReturnListener interface. As this mechasnism uses RoutingKeys, queueName is null then the queue can not be declared to RabbitMQ during start.
Expand Up @@ -22,14 +22,16 @@
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Preconditions;


import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import javax.annotation.Nullable;

import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;


Expand All @@ -42,25 +44,34 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {


private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);


@Nullable
protected final String queueName; protected final String queueName;

private final RMQConnectionConfig rmqConnectionConfig; private final RMQConnectionConfig rmqConnectionConfig;
protected transient Connection connection; protected transient Connection connection;
protected transient Channel channel; protected transient Channel channel;
protected SerializationSchema<IN> schema; protected SerializationSchema<IN> schema;
private boolean logFailuresOnly = false; private boolean logFailuresOnly = false;


@Nullable
private final RMQSinkPublishOptions<IN> publishOptions; private final RMQSinkPublishOptions<IN> publishOptions;
private final ReturnListener returnListener;
@Nullable
private final SerializableReturnListener returnListener;


/** /**
* @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}.
* @param queueName The queue to publish messages to. * @param queueName The queue to publish messages to.
* @param schema A {@link SerializationSchema} for turning the Java objects received into bytes * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
* @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties * @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties
* @param returnListener A ReturnListener implementation object to handle returned message event * @param returnListener A SerializableReturnListener implementation object to handle returned message event
*/ */
private RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema, private RMQSink(
RMQSinkPublishOptions<IN> publishOptions, ReturnListener returnListener) { RMQConnectionConfig rmqConnectionConfig,
@Nullable String queueName,
SerializationSchema<IN> schema,
@Nullable RMQSinkPublishOptions<IN> publishOptions,
@Nullable SerializableReturnListener returnListener) {
this.rmqConnectionConfig = rmqConnectionConfig; this.rmqConnectionConfig = rmqConnectionConfig;
this.queueName = queueName; this.queueName = queueName;
this.schema = schema; this.schema = schema;
Expand Down Expand Up @@ -95,11 +106,11 @@ public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema<IN>
* @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}. * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}.
* @param schema A {@link SerializationSchema} for turning the Java objects received into bytes * @param schema A {@link SerializationSchema} for turning the Java objects received into bytes
* @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties * @param publishOptions A {@link RMQSinkPublishOptions} for providing message's routing key and/or properties
* @param returnListener A ReturnListener implementation object to handle returned message event * @param returnListener A SerializableReturnListener implementation object to handle returned message event
*/ */
@PublicEvolving @PublicEvolving
public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema<IN> schema, public RMQSink(RMQConnectionConfig rmqConnectionConfig, SerializationSchema<IN> schema,
RMQSinkPublishOptions<IN> publishOptions, ReturnListener returnListener) { RMQSinkPublishOptions<IN> publishOptions, SerializableReturnListener returnListener) {
this(rmqConnectionConfig, null, schema, publishOptions, returnListener); this(rmqConnectionConfig, null, schema, publishOptions, returnListener);
} }


Expand Down Expand Up @@ -161,23 +172,15 @@ public void invoke(IN value) {
boolean mandatory = publishOptions.computeMandatory(value); boolean mandatory = publishOptions.computeMandatory(value);
boolean immediate = publishOptions.computeImmediate(value); boolean immediate = publishOptions.computeImmediate(value);


if (returnListener == null && (mandatory || immediate)) { Preconditions.checkState(!(returnListener == null && (mandatory || immediate)),
throw new IllegalStateException("Setting mandatory and/or immediate flags to true requires a ReturnListener."); "Setting mandatory and/or immediate flags to true requires a ReturnListener.");
} else {
String rk = publishOptions.computeRoutingKey(value); String rk = publishOptions.computeRoutingKey(value);
if (rk == null) { String exchange = publishOptions.computeExchange(value);
throw new NullPointerException("computeRoutingKey returned an anormal 'null' value.");
}
String exchange = publishOptions.computeExchange(value);
if (exchange == null) {
throw new NullPointerException("computeExchange returned an anormal 'null' value.");
}

channel.basicPublish(exchange, rk, mandatory, immediate,
publishOptions.computeProperties(value), msg);
}
}


channel.basicPublish(exchange, rk, mandatory, immediate,
publishOptions.computeProperties(value), msg);
}
} catch (IOException e) { } catch (IOException e) {
if (logFailuresOnly) { if (logFailuresOnly) {
LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e); LOG.error("Cannot send RMQ message {} at {}", queueName, rmqConnectionConfig.getHost(), e);
Expand Down
@@ -1,72 +1,74 @@
/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */


package org.apache.flink.streaming.connectors.rabbitmq; package org.apache.flink.streaming.connectors.rabbitmq;


import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.PublicEvolving;


import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.AMQP.BasicProperties;


/** /**
* The message computation provides methods to compute the message routing key and/or the properties. * The message computation provides methods to compute the message routing key and/or the properties.
* *
* @param <IN> The type of the data used by the sink. * @param <IN> The type of the data used by the sink.
*/ */
@PublicEvolving @PublicEvolving
public interface RMQSinkPublishOptions<IN> extends java.io.Serializable { public interface RMQSinkPublishOptions<IN> extends java.io.Serializable {


/** /**
* Compute the message's routing key from the data. * Compute the message's routing key from the data.
* @param a The data used by the sink * @param a The data used by the sink
* @return The routing key of the message * @return The routing key of the message
* null will raise a NullPointerException * null will raise a NullPointerException
*/ */
String computeRoutingKey(IN a); String computeRoutingKey(IN a);


/** /**
* Compute the message's properties from the data. * Compute the message's properties from the data.
* @param a The data used by the sink * @param a The data used by the sink
* @return The message's properties (can be null) * @return The message's properties (can be null)
*/ */
BasicProperties computeProperties(IN a); BasicProperties computeProperties(IN a);


/** /**
* Compute the exchange from the data. * Compute the exchange from the data.
* @param a The data used by the sink * @param a The data used by the sink
* @return The exchange to publish the message to * @return The exchange to publish the message to
* null will raise a NullPointerException * null will raise a NullPointerException
*/ */
String computeExchange(IN a); String computeExchange(IN a);


/** /**
* Compute the mandatory flag used in basic.publish method * Compute the mandatory flag passed to method {@link com.rabbitmq.client.Channel#basicPublish(String, String, boolean, boolean, BasicProperties, byte[])}.
* See AMQP API help for values. * A {@link SerializableReturnListener} is mandatory if this flag can be true.
* A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false) * @param a The data used by the sink
* @param a The data used by the sink * @return The mandatory flag
* @return The mandatory flag */
*/ default boolean computeMandatory(IN a) {
boolean computeMandatory(IN a); return false;

}
/**
* Compute the immediate flag /**
* See AMQP API help for values. * Compute the immediate flag passed to method {@link com.rabbitmq.client.Channel#basicPublish(String, String, boolean, boolean, BasicProperties, byte[])}.
* A ReturnListener is mandatory if this flag can be true (if not it is ignored and forced to false) * A {@link SerializableReturnListener} is mandatory if this flag can be true.
* @param a The data used by the sink * @param a The data used by the sink
* @return The mandatory flag * @return The mandatory flag
*/ */
boolean computeImmediate(IN a); default boolean computeImmediate(IN a) {
} return false;
}
}
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.rabbitmq;

import com.rabbitmq.client.ReturnListener;

import java.io.Serializable;

/**
* A serializable {@link ReturnListener}.
*/
public interface SerializableReturnListener extends Serializable, ReturnListener {
}
Expand Up @@ -27,7 +27,6 @@
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;


Expand Down Expand Up @@ -259,10 +258,12 @@ public boolean computeImmediate(String a) {
} }
} }


private class DummyReturnHandler implements ReturnListener { private class DummyReturnHandler implements SerializableReturnListener {

private static final long serialVersionUID = 1L;

@Override @Override
public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5) public void handleReturn(final int replyCode, final String replyText, final String exchange, final String routingKey, final BasicProperties properties, final byte[] body) {
throws IOException {
} }
} }


Expand Down

0 comments on commit a161606

Please sign in to comment.