-
Notifications
You must be signed in to change notification settings - Fork 2k
/
ReceiveSingleSessionAsyncSample.java
104 lines (89 loc) · 4.89 KB
/
ReceiveSingleSessionAsyncSample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.servicebus;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Demonstrates how to receive from the first available session.
*/
public class ReceiveSingleSessionAsyncSample {
String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");
String topicName = System.getenv("AZURE_SERVICEBUS_SAMPLE_TOPIC_NAME");
String subscriptionName = System.getenv("AZURE_SERVICEBUS_SAMPLE_SESSION_SUBSCRIPTION_NAME");
/**
* Main method to invoke this demo on how to receive messages from the first available session in a Service Bus
* topic subscription.
*
* @param args Unused arguments to the program.
*
* @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete.
*/
public static void main(String[] args) throws InterruptedException {
ReceiveSingleSessionAsyncSample sample = new ReceiveSingleSessionAsyncSample();
sample.run();
}
/**
* This method to invoke this demo on how to receive messages from the first available session in a Service Bus
* topic subscription.
*
* @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete.
*/
@Test
public void run() throws InterruptedException {
AtomicBoolean sampleSuccessful = new AtomicBoolean(true);
CountDownLatch countdownLatch = new CountDownLatch(1);
// The connection string value can be obtained by:
// 1. Going to your Service Bus namespace in Azure Portal.
// 2. Go to "Shared access policies"
// 3. Copy the connection string for the "RootManageSharedAccessKey" policy.
// The 'connectionString' format is shown below.
// 1. "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
// 2. "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// 3. "topicName" will be the name of the Service Bus topic instance you created in the Service Bus namespace.
// 4."subscriptionName" will be the name of the session-enabled subscription.
// Create a receiver.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.connectionString(connectionString)
.sessionReceiver()
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.topicName(topicName)
.subscriptionName(subscriptionName)
.buildAsyncClient();
// Receiving messages from the first available sessions. It waits up to the AmqpRetryOptions.getTryTimeout().
// If no session is available within that operation timeout, it completes with an error. Otherwise, a receiver
// is returned when a lock on the session is acquired.
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
// If the session is successfully accepted, begin receiving messages from it.
// Flux.usingWhen is used to dispose of the receiver after consuming messages completes.
Disposable subscription = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages(),
receiver -> Mono.fromRunnable(() -> receiver.close()))
.subscribe(message -> {
// Process message.
System.out.printf("Session: %s. Sequence #: %s. Contents: %s%n", message.getSessionId(),
message.getSequenceNumber(), message.getBody());
// When this message function completes, the message is automatically completed. If an exception is
// thrown in here, the message is abandoned.
// To disable this behaviour, toggle ServiceBusSessionReceiverClientBuilder.disableAutoComplete()
// when building the session receiver.
}, error -> {
System.err.println("Error occurred: " + error);
sampleSuccessful.set(false);
});
// Subscribe is not a blocking call so we wait here so the program does not end.
countdownLatch.await(30, TimeUnit.SECONDS);
// Disposing of the subscription will cancel the receive() operation.
subscription.dispose();
// Close the receiver.
sessionReceiver.close();
// This assertion is to ensure that samples are working. Users should remove this.
Assertions.assertTrue(sampleSuccessful.get());
}
}