/
SpringRabbitMQEndpoint.java
782 lines (667 loc) · 33.6 KB
/
SpringRabbitMQEndpoint.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.springrabbit;
import java.util.Map;
import org.apache.camel.AsyncEndpoint;
import org.apache.camel.Category;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.util.PropertiesHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import static org.apache.camel.component.springrabbit.SpringRabbitMQConstants.DIRECT_MESSAGE_LISTENER_CONTAINER;
/**
* Send and receive messages from RabbitMQ using the Spring RabbitMQ client.
*/
@UriEndpoint(firstVersion = "3.8.0", scheme = "spring-rabbitmq", title = "Spring RabbitMQ",
syntax = "spring-rabbitmq:exchangeName",
category = { Category.MESSAGING }, headersClass = SpringRabbitMQConstants.class)
public class SpringRabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
public static final String ARG_PREFIX = "arg.";
public static final String CONSUMER_ARG_PREFIX = "consumer.";
public static final String EXCHANGE_ARG_PREFIX = "exchange.";
public static final String QUEUE_ARG_PREFIX = "queue.";
public static final String BINDING_ARG_PREFIX = "binding.";
public static final String DLQ_EXCHANGE_ARG_PREFIX = "dlq.exchange.";
public static final String DLQ_QUEUE_ARG_PREFIX = "dlq.queue.";
public static final String DLQ_BINDING_PREFIX = "dlq.binding.";
private static final Logger LOG = LoggerFactory.getLogger(SpringRabbitMQEndpoint.class);
@UriPath
@Metadata(required = true,
description = "The exchange name determines the exchange to which the produced messages will be sent to."
+ " In the case of consumers, the exchange name determines the exchange the queue will be bound to."
+ " Note: to use default exchange then do not use empty name, but use default instead.")
private String exchangeName;
@UriParam(label = "consumer", defaultValue = "direct", enums = "direct,fanout,headers,topic",
description = "The type of the exchange")
private String exchangeType = "direct";
@UriParam(label = "common",
description = "The value of a routing key to use. Default is empty which is not helpful when using the default (or any direct) exchange, but fine if the exchange is a headers exchange for instance.")
private String routingKey = "";
@UriParam(label = "common",
description = "The connection factory to be use. A connection factory must be configured either on the component or endpoint.")
private ConnectionFactory connectionFactory;
@UriParam(label = "common",
description = "The queue(s) to use for consuming or producing messages. Multiple queue names can be separated by comma."
+ " If none has been configured then Camel will generate an unique id as the queue name.")
private String queues;
@UriParam(label = "consumer", defaultValue = "true",
description = "Specifies whether the consumer container should auto-startup.")
private boolean autoStartup = true;
@UriParam(label = "consumer", defaultValue = "true",
description = "Specifies whether the consumer should auto declare binding between exchange, queue and routing key when starting.")
private boolean autoDeclare = true;
@UriParam(label = "consumer",
description = "Whether the consumer processes the Exchange asynchronously."
+ " If enabled then the consumer may pickup the next message from the queue,"
+ " while the previous message is being processed asynchronously (by the Asynchronous Routing Engine)."
+ " This means that messages may be processed not 100% strictly in order. If disabled (as default)"
+ " then the Exchange is fully processed before the consumer will pickup the next message from the queue.")
private boolean asyncConsumer;
@UriParam(description = "Specifies whether to test the connection on startup."
+ " This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker."
+ " If a connection cannot be granted then Camel throws an exception on startup."
+ " This ensures that Camel is not started with failed connections."
+ " The JMS producers is tested as well.")
private boolean testConnectionOnStartup;
@UriParam(label = "advanced",
description = "To use a custom MessageConverter so you can be in control how to map to/from a org.springframework.amqp.core.Message.")
private MessageConverter messageConverter;
@UriParam(label = "advanced",
description = "To use a custom MessagePropertiesConverter so you can be in control how to map to/from a org.springframework.amqp.core.MessageProperties.")
private MessagePropertiesConverter messagePropertiesConverter;
@UriParam(label = "advanced", prefix = ARG_PREFIX, multiValue = true,
description = "Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each element:"
+ " arg.consumer. arg.exchange. arg.queue. arg.binding. arg.dlq.exchange. arg.dlq.queue. arg.dlq.binding."
+ " For example to declare a queue with message ttl argument: args=arg.queue.x-message-ttl=60000")
private Map<String, Object> args;
@UriParam(label = "consumer",
description = "Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is to let the container handle the acknowledgements"
+ " (so the listener doesn't need to know about the channel or the message)."
+ " Set to AcknowledgeMode.MANUAL if the listener will send the acknowledgements itself using Channel.basicAck(long, boolean). Manual acks are consistent with either a transactional or non-transactional channel,"
+ " but if you are doing no other work on the channel at the same other than receiving a single message then the transaction is probably unnecessary."
+ " Set to AcknowledgeMode.NONE to tell the broker not to expect any acknowledgements, and it will assume all messages are acknowledged as soon as they are sent (this is autoack in native Rabbit broker terms)."
+ " If AcknowledgeMode.NONE then the channel cannot be transactional (so the container will fail on start up if that flag is accidentally set).")
private AcknowledgeMode acknowledgeMode = AcknowledgeMode.AUTO;
@UriParam(label = "consumer", description = "Set to true for an exclusive consumer")
private boolean exclusive;
@UriParam(label = "consumer", description = "Set to true for an no-local consumer")
private boolean noLocal;
@UriParam(label = "common", description = "The name of the dead letter exchange")
private String deadLetterExchange;
@UriParam(label = "common", description = "The name of the dead letter queue")
private String deadLetterQueue;
@UriParam(label = "common", description = "The routing key for the dead letter exchange")
private String deadLetterRoutingKey;
@UriParam(label = "common", defaultValue = "direct", enums = "direct,fanout,headers,topic",
description = "The type of the dead letter exchange")
private String deadLetterExchangeType = "direct";
@UriParam(label = "common",
description = "Specifies whether Camel ignores the ReplyTo header in messages. If true, Camel does not send a reply back to"
+ " the destination specified in the ReplyTo header. You can use this option if you want Camel to consume from a"
+ " route and you do not want Camel to automatically send back a reply message because another component in your code"
+ " handles the reply message. You can also use this option if you want to use Camel as a proxy between different"
+ " message brokers and you want to route message from one system to another.")
private boolean disableReplyTo;
@UriParam(label = "producer",
description = "Specifies whether the producer should auto declare binding between exchange, queue and routing key when starting.")
private boolean autoDeclareProducer;
@UriParam(label = "producer", javaType = "java.time.Duration", defaultValue = "30000",
description = "Specify the timeout in milliseconds to be used when waiting for a reply message when doing request/reply (InOut) messaging."
+ " The default value is 30 seconds. A negative value indicates an indefinite timeout (Beware that this will cause a memory leak if a reply is not received).")
private long replyTimeout = 30000;
@UriParam(label = "producer", javaType = "java.time.Duration", defaultValue = "5000",
description = "Specify the timeout in milliseconds to be used when waiting for a message sent to be confirmed by RabbitMQ when doing send only messaging (InOnly)."
+ " The default value is 5 seconds. A negative value indicates an indefinite timeout.")
private long confirmTimeout = 5000;
@UriParam(label = "producer", enums = "auto,enabled,disabled", defaultValue = "auto",
description = "Controls whether to wait for confirms. The connection factory must be configured for publisher confirms and this method."
+ " auto = Camel detects if the connection factory uses confirms or not. disabled = Confirms is disabled. enabled = Confirms is enabled.")
private String confirm = "auto";
@UriParam(label = "producer", defaultValue = "false",
description = "Use a separate connection for publishers and consumers")
private boolean usePublisherConnection;
@UriParam(label = "producer", defaultValue = "false",
description = "Whether to allow sending messages with no body. If this option is false and the message body is null, then an MessageConversionException is thrown.")
private boolean allowNullBody;
@UriParam(defaultValue = "false", label = "advanced",
description = "Sets whether synchronous processing should be strictly used")
private boolean synchronous;
@UriParam(label = "consumer,advanced",
description = "Tell the broker how many messages to send in a single request. Often this can be set quite high to improve throughput.")
private Integer prefetchCount;
@UriParam(label = "consumer,advanced", defaultValue = DIRECT_MESSAGE_LISTENER_CONTAINER, enums = "DMLC,SMLC",
description = "The type of the MessageListenerContainer")
private String messageListenerContainerType = DIRECT_MESSAGE_LISTENER_CONTAINER;
@UriParam(label = "consumer,advanced", description = "The number of consumers")
private Integer concurrentConsumers;
@UriParam(label = "consumer,advanced", description = "The maximum number of consumers (available only with SMLC)")
private Integer maxConcurrentConsumers;
@UriParam(label = "consumer,advanced", description = "Custom retry configuration to use. "
+ "If this is configured then the other settings such as maximumRetryAttempts for retry are not in use.")
private RetryOperationsInterceptor retry;
@UriParam(label = "consumer", defaultValue = "5",
description = "How many times a Rabbitmq consumer will retry the same message if Camel failed to process the message")
private int maximumRetryAttempts = 5;
@UriParam(label = "consumer", defaultValue = "1000",
description = "Delay in millis a Rabbitmq consumer will wait before redelivering a message that Camel failed to process")
private int retryDelay = 1000;
@UriParam(label = "consumer", defaultValue = "true",
description = "Whether a Rabbitmq consumer should reject the message without requeuing. This enables failed messages to be sent to a Dead Letter Exchange/Queue, if the broker is so configured.")
private boolean rejectAndDontRequeue = true;
public SpringRabbitMQEndpoint(String endpointUri, Component component, String exchangeName) {
super(endpointUri, component);
this.exchangeName = exchangeName;
}
@Override
public SpringRabbitMQComponent getComponent() {
return (SpringRabbitMQComponent) super.getComponent();
}
@Override
protected void doInit() throws Exception {
if (allowNullBody) {
// need to wrap message converter in allow null
messageConverter = new AllowNullBodyMessageConverter(messageConverter);
}
}
public String getExchangeName() {
return exchangeName;
}
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public String getQueues() {
return queues;
}
public void setQueues(String queues) {
this.queues = queues;
}
public boolean isAutoStartup() {
return autoStartup;
}
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}
public boolean isAutoDeclare() {
return autoDeclare;
}
public void setAutoDeclare(boolean autoDeclare) {
this.autoDeclare = autoDeclare;
}
public boolean isAutoDeclareProducer() {
return autoDeclareProducer;
}
public void setAutoDeclareProducer(boolean autoDeclareProducer) {
this.autoDeclareProducer = autoDeclareProducer;
}
public boolean isAsyncConsumer() {
return asyncConsumer;
}
public void setAsyncConsumer(boolean asyncConsumer) {
this.asyncConsumer = asyncConsumer;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public boolean isTestConnectionOnStartup() {
return testConnectionOnStartup;
}
public void setTestConnectionOnStartup(boolean testConnectionOnStartup) {
this.testConnectionOnStartup = testConnectionOnStartup;
}
public MessageConverter getMessageConverter() {
return messageConverter;
}
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
public MessagePropertiesConverter getMessagePropertiesConverter() {
return messagePropertiesConverter;
}
public void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter) {
this.messagePropertiesConverter = messagePropertiesConverter;
}
public String getExchangeType() {
return exchangeType;
}
public void setExchangeType(String exchangeType) {
this.exchangeType = exchangeType;
}
public Map<String, Object> getArgs() {
return args;
}
public void setArgs(Map<String, Object> args) {
this.args = args;
}
public AcknowledgeMode getAcknowledgeMode() {
return acknowledgeMode;
}
public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) {
this.acknowledgeMode = acknowledgeMode;
}
public boolean isExclusive() {
return exclusive;
}
public void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
}
public boolean isNoLocal() {
return noLocal;
}
public void setNoLocal(boolean noLocal) {
this.noLocal = noLocal;
}
public String getDeadLetterExchange() {
return deadLetterExchange;
}
public void setDeadLetterExchange(String deadLetterExchange) {
this.deadLetterExchange = deadLetterExchange;
}
public String getDeadLetterQueue() {
return deadLetterQueue;
}
public void setDeadLetterQueue(String deadLetterQueue) {
this.deadLetterQueue = deadLetterQueue;
}
public String getDeadLetterRoutingKey() {
return deadLetterRoutingKey;
}
public void setDeadLetterRoutingKey(String deadLetterRoutingKey) {
this.deadLetterRoutingKey = deadLetterRoutingKey;
}
public String getDeadLetterExchangeType() {
return deadLetterExchangeType;
}
public void setDeadLetterExchangeType(String deadLetterExchangeType) {
this.deadLetterExchangeType = deadLetterExchangeType;
}
public boolean isDisableReplyTo() {
return disableReplyTo;
}
public void setDisableReplyTo(boolean disableReplyTo) {
this.disableReplyTo = disableReplyTo;
}
public long getReplyTimeout() {
return replyTimeout;
}
public void setReplyTimeout(long replyTimeout) {
this.replyTimeout = replyTimeout;
}
public long getConfirmTimeout() {
return confirmTimeout;
}
public void setConfirmTimeout(long confirmTimeout) {
this.confirmTimeout = confirmTimeout;
}
public String getConfirm() {
return confirm;
}
public void setConfirm(String confirm) {
this.confirm = confirm;
}
public boolean isUsePublisherConnection() {
return usePublisherConnection;
}
public void setUsePublisherConnection(boolean usePublisherConnection) {
this.usePublisherConnection = usePublisherConnection;
}
public boolean isAllowNullBody() {
return allowNullBody;
}
public void setAllowNullBody(boolean allowNullBody) {
this.allowNullBody = allowNullBody;
}
public boolean isSynchronous() {
return synchronous;
}
public void setSynchronous(boolean synchronous) {
this.synchronous = synchronous;
}
public Integer getPrefetchCount() {
return prefetchCount;
}
public void setPrefetchCount(Integer prefetchCount) {
this.prefetchCount = prefetchCount;
}
public String getMessageListenerContainerType() {
return messageListenerContainerType;
}
public void setMessageListenerContainerType(String messageListenerContainerType) {
this.messageListenerContainerType = messageListenerContainerType;
}
public Integer getConcurrentConsumers() {
return concurrentConsumers;
}
public void setConcurrentConsumers(Integer concurrentConsumers) {
this.concurrentConsumers = concurrentConsumers;
}
public Integer getMaxConcurrentConsumers() {
return maxConcurrentConsumers;
}
public void setMaxConcurrentConsumers(Integer maxConcurrentConsumers) {
this.maxConcurrentConsumers = maxConcurrentConsumers;
}
public RetryOperationsInterceptor getRetry() {
return retry;
}
public void setRetry(RetryOperationsInterceptor retry) {
this.retry = retry;
}
public int getMaximumRetryAttempts() {
return maximumRetryAttempts;
}
public void setMaximumRetryAttempts(int maximumRetryAttempts) {
this.maximumRetryAttempts = maximumRetryAttempts;
}
public int getRetryDelay() {
return retryDelay;
}
public void setRetryDelay(int retryDelay) {
this.retryDelay = retryDelay;
}
public boolean isRejectAndDontRequeue() {
return rejectAndDontRequeue;
}
public void setRejectAndDontRequeue(boolean rejectAndDontRequeue) {
this.rejectAndDontRequeue = rejectAndDontRequeue;
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer();
SpringRabbitMQConsumer consumer = new SpringRabbitMQConsumer(this, processor, listenerContainer);
configureConsumer(consumer);
return consumer;
}
@Override
public PollingConsumer createPollingConsumer() throws Exception {
SpringRabbitPollingConsumer answer = new SpringRabbitPollingConsumer(this, createInOnlyTemplate());
configurePollingConsumer(answer);
return answer;
}
@Override
public Producer createProducer() throws Exception {
return new SpringRabbitMQProducer(this);
}
public Exchange createExchange(Message message) {
Object body = getMessageConverter().fromMessage(message);
Exchange exchange = super.createExchange();
exchange.getMessage().setBody(body);
Map<String, Object> headers
= getMessagePropertiesConverter().fromMessageProperties(message.getMessageProperties(), exchange);
if (!headers.isEmpty()) {
exchange.getMessage().setHeaders(headers);
}
return exchange;
}
public Map<String, Object> getConsumerArgs() {
Map<String, Object> answer = PropertiesHelper.extractProperties(args, CONSUMER_ARG_PREFIX, false);
prepareArgs(answer);
return answer;
}
public Map<String, Object> getExchangeArgs() {
Map<String, Object> answer = PropertiesHelper.extractProperties(args, EXCHANGE_ARG_PREFIX, false);
prepareArgs(answer);
return answer;
}
public Map<String, Object> getQueueArgs() {
Map<String, Object> answer = PropertiesHelper.extractProperties(args, QUEUE_ARG_PREFIX, false);
prepareArgs(answer);
return answer;
}
public Map<String, Object> getBindingArgs() {
Map<String, Object> answer = PropertiesHelper.extractProperties(args, BINDING_ARG_PREFIX, false);
prepareArgs(answer);
return answer;
}
public Map<String, Object> getDlqExchangeArgs() {
Map<String, Object> answer = PropertiesHelper.extractProperties(args, DLQ_EXCHANGE_ARG_PREFIX, false);
prepareArgs(answer);
return answer;
}
public Map<String, Object> getDlqQueueArgs() {
Map<String, Object> answer = PropertiesHelper.extractProperties(args, DLQ_QUEUE_ARG_PREFIX, false);
prepareArgs(answer);
return answer;
}
public Map<String, Object> getDlqBindingArgs() {
Map<String, Object> answer = PropertiesHelper.extractProperties(args, DLQ_BINDING_PREFIX, false);
prepareArgs(answer);
return answer;
}
/**
* Factory method for creating a new template for InOnly message exchanges
*/
public RabbitTemplate createInOnlyTemplate() {
RabbitTemplate template = new RabbitTemplate(getConnectionFactory());
template.setRoutingKey(getRoutingKey());
template.setUsePublisherConnection(usePublisherConnection);
return template;
}
/**
* Factory method for creating a new template for InOut message exchanges
*/
public AsyncRabbitTemplate createInOutTemplate() {
RabbitTemplate template = new RabbitTemplate(getConnectionFactory());
template.setRoutingKey(routingKey);
template.setUsePublisherConnection(usePublisherConnection);
AsyncRabbitTemplate asyncTemplate = new AsyncRabbitTemplate(template);
// use receive timeout (for reply timeout) on the async template
asyncTemplate.setReceiveTimeout(replyTimeout);
return asyncTemplate;
}
public AbstractMessageListenerContainer createMessageListenerContainer() {
return getComponent().getListenerContainerFactory().createListenerContainer(this);
}
public void configureMessageListener(EndpointMessageListener listener) {
listener.setAsync(isAsyncConsumer());
listener.setDisableReplyTo(isDisableReplyTo());
}
protected boolean parseArgsBoolean(Map<String, Object> args, String key, String defaultValue) {
Object answer = args.remove(key);
if (answer == null) {
answer = defaultValue;
}
if (answer != null) {
return getCamelContext().getTypeConverter().convertTo(boolean.class, answer);
} else {
return false;
}
}
protected String parseArgsString(Map<String, Object> args, String key, String defaultValue) {
Object answer = args.remove(key);
if (answer == null) {
answer = defaultValue;
}
if (answer != null) {
return getCamelContext().getTypeConverter().convertTo(String.class, answer);
} else {
return null;
}
}
public void declareElements(AbstractMessageListenerContainer container) {
if (container instanceof MessageListenerContainer) {
AmqpAdmin admin = ((MessageListenerContainer) container).getAmqpAdmin();
declareElements(container, admin);
}
}
public void declareElements(AbstractMessageListenerContainer container, AmqpAdmin admin) {
if (admin != null && autoDeclare) {
// bind dead letter exchange
if (deadLetterExchange != null) {
ExchangeBuilder eb = new ExchangeBuilder(deadLetterExchange, deadLetterExchangeType);
eb.withArguments(getDlqExchangeArgs());
final org.springframework.amqp.core.Exchange rabbitExchange = eb.build();
admin.declareExchange(rabbitExchange);
if (deadLetterQueue != null) {
QueueBuilder qb = QueueBuilder.durable(deadLetterQueue);
Map<String, Object> args = getDlqQueueArgs();
qb.withArguments(args);
final Queue rabbitQueue = qb.build();
admin.declareQueue(rabbitQueue);
Binding binding = new Binding(
rabbitQueue.getName(), Binding.DestinationType.QUEUE, rabbitExchange.getName(),
deadLetterRoutingKey,
getDlqBindingArgs());
admin.declareBinding(binding);
LOG.info("Auto-declaring durable DeadLetterExchange: {} routingKey: {}", deadLetterExchange,
deadLetterRoutingKey);
}
}
Map<String, Object> map = getExchangeArgs();
boolean durable = parseArgsBoolean(map, "durable", "true");
boolean autoDelete = parseArgsBoolean(map, "autoDelete", "false");
if (!durable || autoDelete) {
LOG.info("Auto-declaring a non-durable or auto-delete Exchange ({}) durable:{}, auto-delete:{}. "
+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
+ "reopening the connection.",
exchangeName, durable, autoDelete);
}
String en = SpringRabbitMQHelper.isDefaultExchange(getExchangeName()) ? "" : getExchangeName();
ExchangeBuilder eb = new ExchangeBuilder(en, getExchangeType());
eb.durable(durable);
if (autoDelete) {
eb.autoDelete();
}
eb.withArguments(map);
final org.springframework.amqp.core.Exchange rabbitExchange = eb.build();
admin.declareExchange(rabbitExchange);
// if the consumer has no specific queue names then auto-create an unique queue (auto deleted)
String queuesToDeclare = queues;
String autoDeleteDefault = "false";
boolean generateUniqueQueue = false;
if (queuesToDeclare == null) {
// no explicit queue names so use a single blank so we can create a single new unique queue for the consumer
queuesToDeclare = " ";
generateUniqueQueue = true;
}
for (String queue : queuesToDeclare.split(",")) {
queue = queue.trim();
map = getQueueArgs();
prepareDeadLetterQueueArgs(map);
durable = parseArgsBoolean(map, "durable", "false");
autoDelete = parseArgsBoolean(map, "autoDelete", autoDeleteDefault);
boolean exclusive = parseArgsBoolean(map, "exclusive", "false");
QueueBuilder qb;
if (queue.isEmpty()) {
qb = durable ? QueueBuilder.durable() : QueueBuilder.nonDurable();
} else {
qb = durable ? QueueBuilder.durable(queue) : QueueBuilder.nonDurable(queue);
}
if (autoDelete) {
qb.autoDelete();
}
if (exclusive) {
qb.exclusive();
}
// setup DLQ
String dle = parseArgsString(args, "x-dead-letter-exchange", deadLetterExchange);
if (dle != null) {
qb.deadLetterExchange(dle);
}
String dlrk = parseArgsString(args, "x-dead-letter-routing-key", deadLetterRoutingKey);
if (dlrk != null) {
qb.deadLetterRoutingKey(dlrk);
}
qb.withArguments(map);
final Queue rabbitQueue = qb.build();
if (!durable || autoDelete || exclusive) {
LOG.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ({})"
+ "durable:{}, auto-delete:{}, exclusive:{}. It will be redeclared if the broker stops and "
+ "is restarted while the connection factory is alive, but all messages will be lost.",
rabbitQueue.getName(), durable, autoDelete, exclusive);
}
String qn = admin.declareQueue(rabbitQueue);
// if we auto created a new unique queue then the container needs to know the queue name
if (generateUniqueQueue && container != null) {
container.setQueueNames(qn);
}
// bind queue to exchange
Binding binding = new Binding(
qn, Binding.DestinationType.QUEUE, rabbitExchange.getName(), routingKey,
getBindingArgs());
admin.declareBinding(binding);
}
}
}
private void prepareDeadLetterQueueArgs(Map<String, Object> args) {
if (deadLetterExchange != null) {
args.put(SpringRabbitMQConstants.DEAD_LETTER_EXCHANGE, deadLetterExchange);
if (deadLetterRoutingKey != null) {
args.put(SpringRabbitMQConstants.DEAD_LETTER_ROUTING_KEY, deadLetterRoutingKey);
}
}
}
private void prepareArgs(Map<String, Object> args) {
// some arguments must be in numeric values so we need to fix this
Object arg = args.get(SpringRabbitMQConstants.MAX_LENGTH);
if (arg instanceof String) {
args.put(SpringRabbitMQConstants.MAX_LENGTH, Long.parseLong((String) arg));
}
arg = args.get(SpringRabbitMQConstants.MAX_LENGTH_BYTES);
if (arg instanceof String) {
args.put(SpringRabbitMQConstants.MAX_LENGTH_BYTES, Long.parseLong((String) arg));
}
arg = args.get(SpringRabbitMQConstants.MAX_PRIORITY);
if (arg instanceof String) {
args.put(SpringRabbitMQConstants.MAX_PRIORITY, Integer.parseInt((String) arg));
}
arg = args.get(SpringRabbitMQConstants.DELIVERY_LIMIT);
if (arg instanceof String) {
args.put(SpringRabbitMQConstants.DELIVERY_LIMIT, Integer.parseInt((String) arg));
}
arg = args.get(SpringRabbitMQConstants.MESSAGE_TTL);
if (arg instanceof String) {
args.put(SpringRabbitMQConstants.MESSAGE_TTL, Long.parseLong((String) arg));
}
arg = args.get(SpringRabbitMQConstants.EXPIRES);
if (arg instanceof String) {
args.put(SpringRabbitMQConstants.EXPIRES, Long.parseLong((String) arg));
}
arg = args.get(SpringRabbitMQConstants.SINGLE_ACTIVE_CONSUMER);
if (arg instanceof String) {
args.put(SpringRabbitMQConstants.SINGLE_ACTIVE_CONSUMER, Boolean.parseBoolean((String) arg));
}
}
}