-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
ServiceBusClientBuilder.java
2821 lines (2601 loc) · 148 KB
/
ServiceBusClientBuilder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus;
import com.azure.core.amqp.AmqpClientOptions;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.client.traits.AmqpTrait;
import com.azure.core.amqp.implementation.AzureTokenManagerProvider;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.ConnectionStringProperties;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorConnectionCache;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.models.CbsAuthorizationType;
import com.azure.core.annotation.ServiceClientBuilder;
import com.azure.core.annotation.ServiceClientProtocol;
import com.azure.core.client.traits.AzureNamedKeyCredentialTrait;
import com.azure.core.client.traits.AzureSasCredentialTrait;
import com.azure.core.client.traits.ConfigurationTrait;
import com.azure.core.client.traits.ConnectionStringTrait;
import com.azure.core.client.traits.TokenCredentialTrait;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.credential.TokenCredential;
import com.azure.core.exception.AzureException;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.Configuration;
import com.azure.core.util.ConfigurationProperty;
import com.azure.core.util.ConfigurationPropertyBuilder;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.metrics.MeterProvider;
import com.azure.core.util.tracing.Tracer;
import com.azure.core.util.tracing.TracerProvider;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpLinkProvider;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.ServiceBusReactorAmqpConnection;
import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusSenderInstrumentation;
import com.azure.messaging.servicebus.implementation.ServiceBusSharedKeyCredential;
import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.messaging.servicebus.models.SubQueue;
import org.apache.qpid.proton.engine.SslDomain;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.Duration;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.messaging.servicebus.ReceiverOptions.createNonSessionOptions;
import static com.azure.messaging.servicebus.ReceiverOptions.createUnnamedSessionOptions;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;
/**
* <p>This class provides a fluent builder API to aid the instantiation of clients to send and receive messages to/from
* Service Bus entities.</p>
*
* <p>
* <strong>Credentials are required</strong> to perform operations against Azure Service Bus. They can be set by using
* one of the following methods:
*
* <ul>
* <li>{@link #connectionString(String)} with a connection string to the Service Bus <i>namespace</i>.</li>
* <li>{@link #credential(String, TokenCredential)}, {@link #credential(String, AzureSasCredential)}, and
* {@link #credential(String, AzureNamedKeyCredential)} overloads can be used with the respective credentials
* that has access to the fully-qualified Service Bus namespace.</li>
* <li>{@link #credential(TokenCredential)}, {@link #credential(AzureSasCredential)}, and
* {@link #credential(AzureNamedKeyCredential)} overloads can be used with its respective credentials.
* {@link #fullyQualifiedNamespace(String)} <b>must be</b> set.</li>
* </ul>
*
* <p>The credential used in the following samples is {@code DefaultAzureCredential} for authentication. It is
* appropriate for most scenarios, including local development and production environments. Additionally, we recommend
* using
* <a href="https://learn.microsoft.com/azure/active-directory/managed-identities-azure-resources/">managed identity</a>
* for authentication in production environments. You can find more information on different ways of authenticating and
* their corresponding credential types in the
* <a href="https://learn.microsoft.com/java/api/overview/azure/identity-readme">Azure Identity documentation"</a>.
* </p>
*
* <h2>Clients and sub-builders</h2>
*
* <p>{@link ServiceBusClientBuilder} can instantiate several clients. The client to instantiate depends on whether
* users are publishing or receiving messages and if the entity has
* <a href="https://learn.microsoft.com/azure/service-bus-messaging/message-sessions">Service Bus sessions</a> enabled.
* </p>
*
* <ul>
* <li><strong>Sending messages</strong>: Use the {@link #sender() sender()} sub-builder to create
* {@link ServiceBusSenderAsyncClient} and {@link ServiceBusSenderClient}.</li>
*
* <li><strong>Receiving messages</strong>: Use the {@link #receiver() receiver()} sub-builder to create
* {@link ServiceBusReceiverAsyncClient} and {@link ServiceBusReceiverAsyncClient}.</li>
*
* <li><strong>Receiving messages from a session-enabled Service Bus entity</strong>: Use the
* {@link #sessionReceiver() sessionReceiver()} sub-builder to create {@link ServiceBusSessionReceiverAsyncClient}
* and {@link ServiceBusSessionReceiverClient}.</li>
*
* <li><strong>Receiving messages using a callback-based processor</strong>: Use the
* {@link #processor() processor()} sub-builder to create {@link ServiceBusProcessorClient}.</li>
*
* <li><strong>Receiving messages from a session-enabled Service Bus entity using a callback-based processor
* </strong>: Use the {@link #sessionProcessor() sessionProcessor()} sub-builder to create
* {@link ServiceBusProcessorClient}.</li>
* </ul>
*
* <h2>Sending messages</h2>
*
* <p><strong>Sample: Instantiate a synchronous sender and send a message</strong></p>
*
* <p>The following code sample demonstrates the creation of the synchronous client {@link ServiceBusSenderClient}
* and sending a message. The {@code fullyQualifiedNamespace} is the Service Bus namespace's host name. It is listed
* under the "Essentials" panel after navigating to the Service Bus namespace via Azure Portal. The credential used is
* {@code DefaultAzureCredential} because it combines commonly used credentials in deployment and development and
* chooses the credential to used based on its running environment. When performance is important, consider using
* {@link com.azure.messaging.servicebus.ServiceBusMessageBatch} to publish multiple messages at once.</p>
*
* <!-- src_embed com.azure.messaging.servicebus.servicebussenderclient.instantiation -->
* <pre>
* TokenCredential credential = new DefaultAzureCredentialBuilder().build();
*
* // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
* ServiceBusSenderClient sender = new ServiceBusClientBuilder()
* .credential(fullyQualifiedNamespace, credential)
* .sender()
* .queueName(queueName)
* .buildClient();
*
* sender.sendMessage(new ServiceBusMessage("Foo bar"));
* </pre>
* <!-- end com.azure.messaging.servicebus.servicebussenderclient.instantiation -->
*
* <h2>Consuming messages</h2>
*
* <p>There are multiple clients for consuming messages from a Service Bus entity (that is not have
* <a href="https://learn.microsoft.com/azure/service-bus-messaging/message-sessions">Service Bus sessions</a>
* enabled).</p>
*
* <p><strong>Sample: Instantiate an asynchronous receiver</strong></p>
*
* <p>The code example below demonstrates creating an async receiver. The credential used is
* {@code DefaultAzureCredential} for authentication. It is appropriate for most scenarios, including local development
* and production environments. {@link ServiceBusReceiveMode#PEEK_LOCK} and
* {@link ServiceBusReceiverClientBuilder#disableAutoComplete() disableAutoComplete()} are <strong>strongly</strong>
* recommended so users have control over message settlement.</p>
*
* <!-- src_embed com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation -->
* <pre>
* TokenCredential credential = new DefaultAzureCredentialBuilder().build();
*
* // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
* // 'disableAutoComplete' indicates that users will explicitly settle their message.
* ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
* .credential(fullyQualifiedNamespace, credential)
* .receiver()
* .disableAutoComplete()
* .queueName(queueName)
* .buildAsyncClient();
*
* // When users are done with the receiver, dispose of the receiver.
* // Clients should be long-lived objects as they require resources
* // and time to establish a connection to the service.
* asyncReceiver.close();
* </pre>
* <!-- end com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation -->
*
* <p><strong>Sample: Instantiate {@link ServiceBusProcessorClient}</strong></p>
*
* <p>The code example below demonstrates creating a processor client. The processor client is recommended for most
* production scenarios because it offers connection recovery. The credential used is {@code DefaultAzureCredential}
* for authentication. It is appropriate for most scenarios, including local development and production environments.
* {@link ServiceBusReceiveMode#PEEK_LOCK} and
* {@link ServiceBusProcessorClientBuilder#disableAutoComplete() disableAutoComplete()} are <strong>strongly</strong>
* recommended so users have control over message settlement.
*
* <!-- src_embed com.azure.messaging.servicebus.servicebusprocessorclient#receive-mode-peek-lock-instantiation -->
* <pre>
* // Function that gets called whenever a message is received.
* Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
* final ServiceBusReceivedMessage message = context.getMessage();
* // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
* // handling message reaches desired state such that it doesn't require Service Bus to redeliver
* // the same message, then context.complete() should be called otherwise context.abandon().
* final boolean success = Math.random() < 0.5;
* if (success) {
* try {
* context.complete();
* } catch (RuntimeException error) {
* System.out.printf("Completion of the message %s failed.%n Error: %s%n",
* message.getMessageId(), error);
* }
* } else {
* try {
* context.abandon();
* } catch (RuntimeException error) {
* System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
* message.getMessageId(), error);
* }
* }
* };
*
* // Sample code that gets called if there's an error
* Consumer<ServiceBusErrorContext> processError = errorContext -> {
* if (errorContext.getException() instanceof ServiceBusException) {
* ServiceBusException exception = (ServiceBusException) errorContext.getException();
*
* System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
* exception.getReason());
* } else {
* System.out.printf("Error occurred: %s%n", errorContext.getException());
* }
* };
*
* TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
*
* // Create the processor client via the builder and its sub-builder
* // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
* ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
* .credential(fullyQualifiedNamespace, tokenCredential)
* .processor()
* .queueName(queueName)
* .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
* .disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
* .processMessage(processMessage)
* .processError(processError)
* .disableAutoComplete()
* .buildProcessorClient();
*
* // Starts the processor in the background. Control returns immediately.
* processorClient.start();
*
* // Stop processor and dispose when done processing messages.
* processorClient.stop();
* processorClient.close();
* </pre>
* <!-- end com.azure.messaging.servicebus.servicebusprocessorclient#receive-mode-peek-lock-instantiation -->
*
* <h2>Consuming messages from a session-enabled Service Bus entity</h2>
*
* <p>Service Bus supports joint and ordered handling of unbounded sequences of messages through
* <a href="https://learn.microsoft.com/azure/service-bus-messaging/message-sessions">Service Bus sessions</a>.
* Sessions can be used as a first in, first out (FIFO) processing of messages. Queues and topics/subscriptions
* support Service Bus sessions, however, it must be
* <a href="https://learn.microsoft.com/azure/service-bus-messaging/enable-message-sessions">enabled at the time of
* entity creation</a>.</p>
*
* <p><strong>Sample: Sending a message to a session-enabled queue</strong></p>
*
* <p>The snippet below demonstrates sending a message to a
* <a href="https://learn.microsoft.com/azure/service-bus-messaging/message-sessions">Service Bus sessions</a>
* enabled queue. Setting {@link ServiceBusMessage#setMessageId(String)} property to "greetings" will send the message
* to a Service Bus session with an id of "greetings".</p>
*
* <!-- src_embed com.azure.messaging.servicebus.servicebussenderclient.sendMessage-session -->
* <pre>
* // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
* ServiceBusSenderClient sender = new ServiceBusClientBuilder()
* .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
* .sender()
* .queueName(sessionEnabledQueueName)
* .buildClient();
*
* // Setting sessionId publishes that message to a specific session, in this case, "greeting".
* ServiceBusMessage message = new ServiceBusMessage("Hello world")
* .setSessionId("greetings");
*
* sender.sendMessage(message);
*
* // Dispose of the sender.
* sender.close();
* </pre>
* <!-- end com.azure.messaging.servicebus.servicebussenderclient.sendMessage-session -->
*
* <p><strong>Sample: Receive messages from first available session</strong></p>
*
* <p>To process messages from the first available session, switch to {@link ServiceBusSessionReceiverClientBuilder}
* and build the session receiver client. Use
* {@link ServiceBusSessionReceiverAsyncClient#acceptNextSession() acceptNextSession()} to find the first available
* session to process messages from.</p>
*
* <!-- src_embed com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation#nextsession -->
* <pre>
* TokenCredential credential = new DefaultAzureCredentialBuilder().build();
*
* // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
* // 'disableAutoComplete' indicates that users will explicitly settle their message.
* ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
* .credential(fullyQualifiedNamespace, credential)
* .sessionReceiver()
* .disableAutoComplete()
* .queueName(sessionEnabledQueueName)
* .buildAsyncClient();
*
* // Creates a client to receive messages from the first available session. It waits until
* // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
* // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
* Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
*
* Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
* receiver -> receiver.receiveMessages().flatMap(message -> {
* System.out.println("Received message: " + message.getBody());
*
* // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
* if (isMessageProcessed) {
* return receiver.complete(message);
* } else {
* return receiver.abandon(message);
* }
* }),
* receiver -> Mono.fromRunnable(() -> {
* // Dispose of the receiver and sessionReceiver when done receiving messages.
* receiver.close();
* sessionReceiver.close();
* }));
*
* // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
* // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
* // receiving messages.
* Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
* }, error -> System.out.println("Error occurred: " + error),
* () -> System.out.println("Receiving complete."));
* </pre>
* <!-- end com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation#nextsession -->
* <p><strong>Sample: Process messages from all sessions</strong></p>
*
* <p>The following code sample demonstrates the creation the {@link ServiceBusProcessorClient} that processes all
* available sessions in the queue. {@link ServiceBusSessionProcessorClientBuilder#maxConcurrentSessions(int)}
* indicates how many sessions the processor will process at the same time. The credential used is
* {@code DefaultAzureCredential} for authentication. It is appropriate for most scenarios, including local development
* and production environments. {@link ServiceBusReceiveMode#PEEK_LOCK} and
* {@link ServiceBusProcessorClientBuilder#disableAutoComplete() disableAutoComplete()} are <strong>strongly</strong>
* recommended so users have control over message settlement.</p>
*
* <!-- src_embed com.azure.messaging.servicebus.servicebusprocessorclient#session-instantiation -->
* <pre>
* // Function that gets called whenever a message is received.
* Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
* ServiceBusReceivedMessage message = context.getMessage();
* System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
* message.getSessionId(), message.getSequenceNumber(), message.getBody());
* };
*
* Consumer<ServiceBusErrorContext> onError = context -> {
* System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
* context.getFullyQualifiedNamespace(), context.getEntityPath());
*
* if (context.getException() instanceof ServiceBusException) {
* ServiceBusException exception = (ServiceBusException) context.getException();
*
* System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
* exception.getReason());
* } else {
* System.out.printf("Error occurred: %s%n", context.getException());
* }
* };
*
* TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
*
* // Create the processor client via the builder and its sub-builder
* // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
* ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
* .credential(fullyQualifiedNamespace, tokenCredential)
* .sessionProcessor()
* .queueName(sessionEnabledQueueName)
* .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
* .disableAutoComplete()
* .maxConcurrentSessions(2)
* .processMessage(onMessage)
* .processError(onError)
* .buildProcessorClient();
*
* // Starts the processor in the background. Control returns immediately.
* sessionProcessor.start();
*
* // Stop processor and dispose when done processing messages.
* sessionProcessor.stop();
* sessionProcessor.close();
* </pre>
* <!-- end com.azure.messaging.servicebus.servicebusprocessorclient#session-instantiation -->
*
* <h2>Connection sharing</h2>
*
* <p>The creation of a connection to Service Bus requires resources. If your architecture allows, an application
* should share connection between clients which can be achieved by sharing the top level builder as shown below.</p>
*
* <p><strong>Sharing a connection between clients</strong></p>
*
* <!-- src_embed com.azure.messaging.servicebus.connection.sharing -->
* <pre>
* TokenCredential credential = new DefaultAzureCredentialBuilder().build();
*
* // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
* // Any clients created from this builder will share the underlying connection.
* ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
* .credential(fullyQualifiedNamespace, credential);
*
* // Create receiver and sender which will share the connection.
* ServiceBusReceiverClient receiver = sharedConnectionBuilder
* .receiver()
* .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
* .queueName(queueName)
* .buildClient();
* ServiceBusSenderClient sender = sharedConnectionBuilder
* .sender()
* .queueName(queueName)
* .buildClient();
*
* // Use the clients and finally close them.
* try {
* sender.sendMessage(new ServiceBusMessage("payload"));
* receiver.receiveMessages(1);
* } finally {
* // Clients should be long-lived objects as they require resources
* // and time to establish a connection to the service.
* sender.close();
* receiver.close();
* }
* </pre>
* <!-- end com.azure.messaging.servicebus.connection.sharing -->
*
*/
@ServiceClientBuilder(serviceClients = {ServiceBusReceiverAsyncClient.class, ServiceBusSenderAsyncClient.class,
ServiceBusSenderClient.class, ServiceBusReceiverClient.class, ServiceBusProcessorClient.class},
protocol = ServiceClientProtocol.AMQP)
public final class ServiceBusClientBuilder implements
TokenCredentialTrait<ServiceBusClientBuilder>,
AzureNamedKeyCredentialTrait<ServiceBusClientBuilder>,
ConnectionStringTrait<ServiceBusClientBuilder>,
AzureSasCredentialTrait<ServiceBusClientBuilder>,
AmqpTrait<ServiceBusClientBuilder>,
ConfigurationTrait<ServiceBusClientBuilder> {
private static final AmqpRetryOptions DEFAULT_RETRY =
new AmqpRetryOptions().setTryTimeout(ServiceBusConstants.OPERATION_TIMEOUT);
private static final String SERVICE_BUS_PROPERTIES_FILE = "azure-messaging-servicebus.properties";
private static final String SUBSCRIPTION_ENTITY_PATH_FORMAT = "%s/subscriptions/%s";
private static final String DEAD_LETTER_QUEUE_NAME_SUFFIX = "/$deadletterqueue";
private static final String TRANSFER_DEAD_LETTER_QUEUE_NAME_SUFFIX = "/$Transfer/$deadletterqueue";
// Using 0 pre-fetch count for both receive modes, to avoid message lock lost exceptions in application
// receiving messages at a slow rate. Applications can set it to a higher value if they need better performance.
private static final int DEFAULT_PREFETCH_COUNT = 0;
private static final String NAME_KEY = "name";
private static final String VERSION_KEY = "version";
private static final String UNKNOWN = "UNKNOWN";
private static final String LIBRARY_NAME;
private static final String LIBRARY_VERSION;
private static final Duration MAX_LOCK_RENEW_DEFAULT_DURATION = Duration.ofMinutes(5);
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusClientBuilder.class);
private final Object connectionLock = new Object();
private final MessageSerializer messageSerializer = new ServiceBusMessageSerializer();
private ClientOptions clientOptions;
private Configuration configuration;
private ServiceBusConnectionProcessor sharedConnection;
private String connectionStringEntityName;
private TokenCredential credentials;
private String fullyQualifiedNamespace;
private ProxyOptions proxyOptions;
private AmqpRetryOptions retryOptions;
private Scheduler scheduler;
private AmqpTransportType transport = AmqpTransportType.AMQP;
private SslDomain.VerifyMode verifyMode;
private boolean crossEntityTransactions;
private URL customEndpointAddress;
private final V2StackSupport v2StackSupport = new V2StackSupport();
/**
* Keeps track of the open clients that were created from this builder when there is a shared connection.
*/
private final AtomicInteger openClients = new AtomicInteger();
static {
final Map<String, String> properties = CoreUtils.getProperties(SERVICE_BUS_PROPERTIES_FILE);
LIBRARY_NAME = properties.getOrDefault(NAME_KEY, UNKNOWN);
LIBRARY_VERSION = properties.getOrDefault(VERSION_KEY, UNKNOWN);
}
/**
* Creates a new instance with the default transport {@link AmqpTransportType#AMQP}.
*/
public ServiceBusClientBuilder() {
}
/**
* Sets the {@link ClientOptions} to be sent from the client built from this builder, enabling customization of
* certain properties, as well as support the addition of custom header information. Refer to the {@link
* ClientOptions} documentation for more information.
*
* @param clientOptions to be set on the client.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
@Override
public ServiceBusClientBuilder clientOptions(ClientOptions clientOptions) {
this.clientOptions = clientOptions;
return this;
}
/**
* Sets the fully-qualified namespace for the Service Bus.
*
* @param fullyQualifiedNamespace The fully-qualified namespace for the Service Bus.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
public ServiceBusClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace) {
this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) {
throw LOGGER.logExceptionAsError(
new IllegalArgumentException("'fullyQualifiedNamespace' cannot be an empty string."));
}
return this;
}
private String getAndValidateFullyQualifiedNamespace() {
if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) {
throw LOGGER.logExceptionAsError(
new IllegalArgumentException("'fullyQualifiedNamespace' cannot be an empty string."));
}
return fullyQualifiedNamespace;
}
/**
* Sets a custom endpoint address when connecting to the Service Bus service. This can be useful when your network
* does not allow connecting to the standard Azure Service Bus endpoint address, but does allow connecting through
* an intermediary. For example: {@literal https://my.custom.endpoint.com:55300}.
* <p>
* If no port is specified, the default port for the {@link #transportType(AmqpTransportType) transport type} is
* used.
*
* @param customEndpointAddress The custom endpoint address.
* @return The updated {@link ServiceBusClientBuilder} object.
* @throws IllegalArgumentException if {@code customEndpointAddress} cannot be parsed into a valid {@link URL}.
*/
public ServiceBusClientBuilder customEndpointAddress(String customEndpointAddress) {
if (customEndpointAddress == null) {
this.customEndpointAddress = null;
return this;
}
try {
this.customEndpointAddress = new URL(customEndpointAddress);
} catch (MalformedURLException e) {
throw LOGGER.logExceptionAsError(
new IllegalArgumentException(String.format("(%s) : is not a valid URL,", customEndpointAddress), e));
}
return this;
}
/**
* Sets the connection string for a Service Bus namespace or a specific Service Bus resource.
*
* @param connectionString Connection string for a Service Bus namespace or a specific Service Bus resource.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
public ServiceBusClientBuilder connectionString(String connectionString) {
final ConnectionStringProperties properties = new ConnectionStringProperties(connectionString);
final TokenCredential tokenCredential;
try {
tokenCredential = getTokenCredential(properties);
} catch (Exception e) {
throw LOGGER.logExceptionAsError(
new AzureException("Could not create the ServiceBusSharedKeyCredential.", e));
}
this.fullyQualifiedNamespace = properties.getEndpoint().getHost();
String entityPath = properties.getEntityPath();
if (!CoreUtils.isNullOrEmpty(entityPath)) {
LOGGER.atInfo()
.addKeyValue(ENTITY_PATH_KEY, entityPath)
.log("Setting entity from connection string.");
this.connectionStringEntityName = entityPath;
}
return credential(properties.getEndpoint().getHost(), tokenCredential);
}
/**
* Enable cross entity transaction on the connection to Service bus. Use this feature only when your transaction
* scope spans across different Service Bus entities. This feature is achieved by routing all the messages through
* one 'send-via' entity on server side as explained next.
* Once clients are created for multiple entities, the first entity that an operation occurs on becomes the
* entity through which all subsequent sends will be routed through ('send-via' entity). This enables the service to
* perform a transaction that is meant to span multiple entities. This means that subsequent entities that perform
* their first operation need to either be senders, or if they are receivers they need to be on the same entity as
* the initial entity through which all sends are routed through (otherwise the service would not be able to ensure
* that the transaction is committed because it cannot route a receive operation through a different entity). For
* instance, if you have SenderA (For entity A) and ReceiverB (For entity B) that are created from a client with
* cross-entity transactions enabled, you would need to receive first with ReceiverB to allow this to work. If you
* first send to entity A, and then attempted to receive from entity B, an exception would be thrown.
*
* <p><strong>Avoid using non-transaction API on this client</strong></p>
* Since this feature will set up connection to Service Bus optimised to enable this feature. Once all the clients
* have been setup, the first receiver or sender used will initialize 'send-via' queue as a single message transfer
* entity. All the messages will flow via this queue. Thus this client is not suitable for any non-transaction API.
*
* <p><strong>When not to enable this feature</strong></p>
* If your transaction is involved in one Service bus entity only. For example you are receiving from one
* queue/subscription and you want to settle your own messages which are part of one transaction.
*
* @return The updated {@link ServiceBusSenderClientBuilder} object.
*
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-transactions#transfers-and-send-via">Service Bus transactions</a>
*/
public ServiceBusClientBuilder enableCrossEntityTransactions() {
this.crossEntityTransactions = true;
return this;
}
private TokenCredential getTokenCredential(ConnectionStringProperties properties) {
TokenCredential tokenCredential;
if (properties.getSharedAccessSignature() == null) {
tokenCredential = new ServiceBusSharedKeyCredential(properties.getSharedAccessKeyName(),
properties.getSharedAccessKey(), ServiceBusConstants.TOKEN_VALIDITY);
} else {
tokenCredential = new ServiceBusSharedKeyCredential(properties.getSharedAccessSignature());
}
return tokenCredential;
}
/**
* Sets the configuration store that is used during construction of the service client.
*
* If not specified, the default configuration store is used to configure Service Bus clients. Use {@link
* Configuration#NONE} to bypass using configuration settings during construction.
*
* @param configuration The configuration store used to configure Service Bus clients.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
@Override
public ServiceBusClientBuilder configuration(Configuration configuration) {
this.configuration = configuration;
return this;
}
/**
* Sets the credential by using a {@link TokenCredential} for the Service Bus resource.
* <a href="https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/identity/azure-identity">
* azure-identity</a> has multiple {@link TokenCredential} implementations that can be used to authenticate
* the access to the Service Bus resource.
*
* @param fullyQualifiedNamespace The fully-qualified namespace for the Service Bus.
* @param credential The token credential to use for authentication. Access controls may be specified by the
* ServiceBus namespace or the requested Service Bus entity, depending on Azure configuration.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, TokenCredential credential) {
this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
this.credentials = Objects.requireNonNull(credential, "'credential' cannot be null.");
if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) {
throw LOGGER.logExceptionAsError(
new IllegalArgumentException("'fullyQualifiedNamespace' cannot be an empty string."));
}
return this;
}
/**
* Sets the {@link TokenCredential} used to authorize requests sent to the service. Refer to the Azure SDK for Java
* <a href="https://aka.ms/azsdk/java/docs/identity">identity and authentication</a>
* documentation for more details on proper usage of the {@link TokenCredential} type.
*
* @param credential The token credential to use for authentication. Access controls may be specified by the
* ServiceBus namespace or the requested Service Bus entity, depending on Azure configuration.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
@Override
public ServiceBusClientBuilder credential(TokenCredential credential) {
this.credentials = Objects.requireNonNull(credential, "'credential' cannot be null.");
return this;
}
/**
* Sets the credential with the shared access policies for the Service Bus resource.
* You can find the shared access policies on the azure portal or Azure CLI.
* For instance, on the portal, "Shared Access policies" has 'policy' and its 'Primary Key' and 'Secondary Key'.
* The 'name' attribute of the {@link AzureNamedKeyCredential} is the 'policy' on portal and the 'key' attribute
* can be either 'Primary Key' or 'Secondary Key'.
* This method and {@link #connectionString(String)} take the same information in different forms. But it allows
* you to update the name and key.
*
* @param fullyQualifiedNamespace The fully-qualified namespace for the Service Bus.
* @param credential {@link AzureNamedKeyCredential} to be used for authentication.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureNamedKeyCredential credential) {
this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
Objects.requireNonNull(credential, "'credential' cannot be null.");
if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) {
throw LOGGER.logExceptionAsError(
new IllegalArgumentException("'fullyQualifiedNamespace' cannot be an empty string."));
}
this.credentials = new ServiceBusSharedKeyCredential(credential.getAzureNamedKey().getName(),
credential.getAzureNamedKey().getKey(), ServiceBusConstants.TOKEN_VALIDITY);
return this;
}
/**
* Sets the credential with the shared access policies for the Service Bus resource.
* You can find the shared access policies on the azure portal or Azure CLI.
* For instance, on the portal, "Shared Access policies" has 'policy' and its 'Primary Key' and 'Secondary Key'.
* The 'name' attribute of the {@link AzureNamedKeyCredential} is the 'policy' on portal and the 'key' attribute
* can be either 'Primary Key' or 'Secondary Key'.
* This method and {@link #connectionString(String)} take the same information in different forms. But it allows
* you to update the name and key.
*
* @param credential {@link AzureNamedKeyCredential} to be used for authentication.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
@Override
public ServiceBusClientBuilder credential(AzureNamedKeyCredential credential) {
Objects.requireNonNull(credential, "'credential' cannot be null.");
this.credentials = new ServiceBusSharedKeyCredential(credential.getAzureNamedKey().getName(),
credential.getAzureNamedKey().getKey(), ServiceBusConstants.TOKEN_VALIDITY);
return this;
}
/**
* Sets the credential with Shared Access Signature for the Service Bus resource.
* Refer to <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-sas">
* Service Bus access control with Shared Access Signatures</a>.
*
* @param fullyQualifiedNamespace The fully-qualified namespace for the Service Bus.
* @param credential {@link AzureSasCredential} to be used for authentication.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureSasCredential credential) {
this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
Objects.requireNonNull(credential, "'credential' cannot be null.");
if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) {
throw LOGGER.logExceptionAsError(
new IllegalArgumentException("'fullyQualifiedNamespace' cannot be an empty string."));
}
this.credentials = new ServiceBusSharedKeyCredential(credential.getSignature());
return this;
}
/**
* Sets the credential with Shared Access Signature for the Service Bus resource.
* Refer to <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-sas">
* Service Bus access control with Shared Access Signatures</a>.
*
* @param credential {@link AzureSasCredential} to be used for authentication.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
@Override
public ServiceBusClientBuilder credential(AzureSasCredential credential) {
Objects.requireNonNull(credential, "'credential' cannot be null.");
this.credentials = new ServiceBusSharedKeyCredential(credential.getSignature());
return this;
}
/**
* Sets the proxy configuration to use for {@link ServiceBusSenderAsyncClient}. When a proxy is configured, {@link
* AmqpTransportType#AMQP_WEB_SOCKETS} must be used for the transport type.
*
* @param proxyOptions The proxy configuration to use.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
@Override
public ServiceBusClientBuilder proxyOptions(ProxyOptions proxyOptions) {
this.proxyOptions = proxyOptions;
return this;
}
/**
* Package-private method that sets the verify mode for this connection.
*
* @param verifyMode The verification mode.
* @return The updated {@link ServiceBusClientBuilder} object.
*/
ServiceBusClientBuilder verifyMode(SslDomain.VerifyMode verifyMode) {
this.verifyMode = verifyMode;
return this;
}
/**
* Sets the retry options for Service Bus clients. If not specified, the default retry options are used.
*
* @param retryOptions The retry options to use.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
@Override
public ServiceBusClientBuilder retryOptions(AmqpRetryOptions retryOptions) {
this.retryOptions = retryOptions;
return this;
}
/**
* Sets the scheduler to use.
*
* @param scheduler Scheduler to be used.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
ServiceBusClientBuilder scheduler(Scheduler scheduler) {
this.scheduler = scheduler;
return this;
}
/**
* Sets the transport type by which all the communication with Azure Service Bus occurs. Default value is {@link
* AmqpTransportType#AMQP}.
*
* @param transportType The transport type to use.
*
* @return The updated {@link ServiceBusClientBuilder} object.
*/
@Override
public ServiceBusClientBuilder transportType(AmqpTransportType transportType) {
this.transport = transportType;
return this;
}
/**
* A new instance of {@link ServiceBusSenderClientBuilder} used to configure Service Bus message senders.
*
* @return A new instance of {@link ServiceBusSenderClientBuilder}.
*/
public ServiceBusSenderClientBuilder sender() {
return new ServiceBusSenderClientBuilder();
}
/**
* A new instance of {@link ServiceBusReceiverClientBuilder} used to configure Service Bus message receivers.
*
* @return A new instance of {@link ServiceBusReceiverClientBuilder}.
*/
public ServiceBusReceiverClientBuilder receiver() {
return new ServiceBusReceiverClientBuilder();
}
/**
* A new instance of {@link ServiceBusSessionReceiverClientBuilder} used to configure <b>session aware</b> Service
* Bus message receivers.
*
* @return A new instance of {@link ServiceBusSessionReceiverClientBuilder}.
*/
public ServiceBusSessionReceiverClientBuilder sessionReceiver() {
return new ServiceBusSessionReceiverClientBuilder();
}
/**
* A new instance of {@link ServiceBusProcessorClientBuilder} used to configure {@link ServiceBusProcessorClient}
* instance.
*
* @return A new instance of {@link ServiceBusProcessorClientBuilder}.
*/
public ServiceBusProcessorClientBuilder processor() {
return new ServiceBusProcessorClientBuilder();
}
/**
* A new instance of {@link ServiceBusSessionProcessorClientBuilder} used to configure a Service Bus processor
* instance that processes sessions.
* @return A new instance of {@link ServiceBusSessionProcessorClientBuilder}.
*/
public ServiceBusSessionProcessorClientBuilder sessionProcessor() {
return new ServiceBusSessionProcessorClientBuilder();
}
/**
* A new instance of {@link ServiceBusRuleManagerBuilder} used to configure a Service Bus rule manager instance.
*
* @return A new instance of {@link ServiceBusRuleManagerBuilder}.
*/
public ServiceBusRuleManagerBuilder ruleManager() {
return new ServiceBusRuleManagerBuilder();
}
/**
* Called when a child client is closed. Disposes of the shared connection if there are no more clients.
*/
void onClientClose() {
synchronized (connectionLock) {
final int numberOfOpenClients = openClients.decrementAndGet();
LOGGER.atInfo()
.addKeyValue("numberOfOpenClients", numberOfOpenClients)
.log("Closing a dependent client.");
if (numberOfOpenClients > 0) {
return;
}
if (numberOfOpenClients < 0) {
LOGGER.atWarning()
.addKeyValue("numberOfOpenClients", numberOfOpenClients)
.log("There should not be less than 0 clients.");
}
LOGGER.info("No more open clients, closing shared connection.");
if (sharedConnection != null) {
sharedConnection.dispose();
sharedConnection = null;
} else {
LOGGER.warning("Shared ServiceBusConnectionProcessor was already disposed.");
}
}
}
// Connection-caching for the V1-Stack.
private ServiceBusConnectionProcessor getOrCreateConnectionProcessor(MessageSerializer serializer, Meter meter) {
synchronized (connectionLock) {
if (sharedConnection == null) {
final ConnectionOptions connectionOptions = getConnectionOptions();
final Flux<ServiceBusAmqpConnection> connectionFlux = Mono.fromCallable(() -> {
final String connectionId = StringUtil.getRandomString("MF");
final ReactorProvider provider = new ReactorProvider();
final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider, meter);
final TokenManagerProvider tokenManagerProvider = new AzureTokenManagerProvider(
connectionOptions.getAuthorizationType(), connectionOptions.getFullyQualifiedNamespace(),
connectionOptions.getAuthorizationScope());
final ServiceBusAmqpLinkProvider linkProvider = new ServiceBusAmqpLinkProvider();
// For the V1-Stack, tell the connection to continue creating receivers on v1 stack.
final boolean isV2 = false;
return (ServiceBusAmqpConnection) new ServiceBusReactorAmqpConnection(connectionId,
connectionOptions, provider, handlerProvider, linkProvider, tokenManagerProvider, serializer,
crossEntityTransactions, isV2);
}).repeat();
sharedConnection = connectionFlux.subscribeWith(new ServiceBusConnectionProcessor(
connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getRetry()));
}
}
final int numberOfOpenClients = openClients.incrementAndGet();
LOGGER.info("# of open clients with shared connection: {}", numberOfOpenClients);
return sharedConnection;
}
private ConnectionOptions getConnectionOptions() {
configuration = configuration == null ? Configuration.getGlobalConfiguration().clone() : configuration;
if (retryOptions == null) {
retryOptions = DEFAULT_RETRY;
}
if (scheduler == null) {
scheduler = Schedulers.boundedElastic();
}
if (credentials == null) {
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Credentials have not been set. "
+ "They can be set using: connectionString(String), connectionString(String, String), "
+ "or credentials(String, String, TokenCredential)"