From 00a38fc4cf8cb05c2a22925328adcae6dd749f41 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Tue, 17 Jan 2017 11:38:50 +0100 Subject: [PATCH] [FLINK-5512] [doc] Improve RabbitMQ documentation --- docs/dev/connectors/rabbitmq.md | 148 +++++++++++++++++++------------- 1 file changed, 89 insertions(+), 59 deletions(-) diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md index 1b621c02d2bdc..c9be4c7291fdc 100644 --- a/docs/dev/connectors/rabbitmq.md +++ b/docs/dev/connectors/rabbitmq.md @@ -40,88 +40,118 @@ Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.co #### RabbitMQ Source -A class which provides an interface for receiving data from RabbitMQ. - -The followings have to be provided for the `RMQSource(…)` constructor in order: - -- RMQConnectionConfig. -- queueName: The RabbitMQ queue name. -- usesCorrelationId: `true` when correlation ids should be used, `false` otherwise (default is `false`). -- deserializationSchema: Deserialization schema to turn messages into Java objects. - -This source can be operated in three different modes: - -1. Exactly-once (when checkpointed) with RabbitMQ transactions and messages with - unique correlation IDs. -2. At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism - (correlation id is not set). -3. No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode. - -Correlation ids are a RabbitMQ application feature. You have to set it in the message properties -when injecting messages into RabbitMQ. If you set `usesCorrelationId` to true and do not supply -unique correlation ids, the source will throw an exception (if the correlation id is null) or ignore -messages with non-unique correlation ids. If you set `usesCorrelationId` to false, then you don't -have to supply correlation ids. - -Example: +This connector provides a `RMQSource` class to consume messages from a RabbitMQ +queue. This source provides three different levels of guarantees, depending +on how it is configured with Flink: + +1. **Exactly-once**: In order to achieve exactly-once guarantees with the +RabbitMQ source, the following is required - + - *Enable checkpointing*: With checkpointing enabled, messages are only + acknowledged (hence, removed from the RabbitMQ queue) when checkpoints + are completed. + - *Use correlation ids*: Correlation ids are a RabbitMQ application feature. + You have to set it in the message properties when injecting messages into RabbitMQ. + The correlation id is used by the source to deduplicate any messages that + have been reproccessed when restoring from a checkpoint. + - *Non-parallel source*: The source must be non-parallel (parallelism set + to 1) in order to achieve exactly-once. This limitation is mainly due to + RabbitMQ's approach to dispatching messages of a queue across multiple + consumers. + + +2. **At-least-once**: When checkpointing is enabled, but correlation ids +are not used or the source is parallel, the source only provides at-least-once +guarantees. + +3. **No guarantee**: If checkpointing isn't enabled, the source does not +have any strong delivery guarantees. Under this setting, instead of +collaborating with Flink's checkpointing, messages will be automatically +acknowledged once the source receives and processes them. + +Below is a code example for setting up an exactly-once RabbitMQ source. +Inline comments explain which parts of the configuration can be ignored +for more relaxed guarantees.
{% highlight java %} -RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() -.setHost("localhost").setPort(5000).setUserName(..) -.setPassword(..).setVirtualHost("/").build(); -DataStream streamWithoutCorrelationIds = env - .addSource(new RMQSource(connectionConfig, "hello", new SimpleStringSchema())) - .print - -DataStream streamWithCorrelationIds = env - .addSource(new RMQSource(connectionConfig, "hello", true, new SimpleStringSchema())) - .print +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// checkpointing is required for exactly-once or at-least-once guarantees +env.enableCheckpointing(...); + +final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost("localhost") + .setPort(5000) + ... + .build(); + +final DataStream stream = env + .addSource(new RMQSource( + connectionConfig, // config for the RabbitMQ connection + "queueName", // name of the RabbitMQ queue to consume + true, // use correlation ids; can be false if only at-least-once is required + new SimpleStringSchema())) // deserialization schema to turn messages into Java objects + .setParallelism(1); // non-parallel source is only required for exactly-once {% endhighlight %}
{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +// checkpointing is required for exactly-once or at-least-once guarantees +env.enableCheckpointing(...) + val connectionConfig = new RMQConnectionConfig.Builder() -.setHost("localhost").setPort(5000).setUserName(..) -.setPassword(..).setVirtualHost("/").build() -streamWithoutCorrelationIds = env - .addSource(new RMQSource[String](connectionConfig, "hello", new SimpleStringSchema)) - .print - -streamWithCorrelationIds = env - .addSource(new RMQSource[String](connectionConfig, "hello", true, new SimpleStringSchema)) - .print + .setHost("localhost") + .setPort(5000) + ... + .build + +val stream = env + .addSource(new RMQSource[String]( + connectionConfig, // config for the RabbitMQ connection + "queueName", // name of the RabbitMQ queue to consume + true, // use correlation ids; can be false if only at-least-once is required + new SimpleStringSchema)) // deserialization schema to turn messages into Java objects + .setParallelism(1) // non-parallel source is only required for exactly-once {% endhighlight %}
#### RabbitMQ Sink -A class providing an interface for sending data to RabbitMQ. - -The followings have to be provided for the `RMQSink(…)` constructor in order: - -1. RMQConnectionConfig -2. The queue name -3. Serialization schema - -Example: +This connector provides a `RMQSink` class for sending messages to a RabbitMQ +queue. Below is a code example for setting up a RabbitMQ sink.
{% highlight java %} -RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() -.setHost("localhost").setPort(5000).setUserName(..) -.setPassword(..).setVirtualHost("/").build(); -stream.addSink(new RMQSink(connectionConfig, "hello", new SimpleStringSchema())); +final DataStream stream = ... + +final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost("localhost") + .setPort(5000) + ... + .build(); + +stream.addSink(new RMQSink( + connectionConfig, // config for the RabbitMQ connection + "queueName", // name of the RabbitMQ queue to send messages to + new SimpleStringSchema())); // serialization schema to turn Java objects to messages {% endhighlight %}
{% highlight scala %} +val stream: DataStream[String] = ... + val connectionConfig = new RMQConnectionConfig.Builder() -.setHost("localhost").setPort(5000).setUserName(..) -.setPassword(..).setVirtualHost("/").build() -stream.addSink(new RMQSink[String](connectionConfig, "hello", new SimpleStringSchema)) + .setHost("localhost") + .setPort(5000) + ... + .build + +stream.addSink(new RMQSink[String]( + connectionConfig, // config for the RabbitMQ connection + "queueName", // name of the RabbitMQ queue to send messages to + new SimpleStringSchema)) // serialization schema to turn Java objects to messages {% endhighlight %}