import java.time.Duration; import java.util.Collection; import java.util.Date; import java.util.concurrent.CompletableFuture; import com.microsoft.azure.servicebus.*; import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; import com.microsoft.azure.servicebus.primitives.ServiceBusException; import com.microsoft.azure.servicebus.primitives.MessageLockLostException; public class TopicSendReceiveTest { private static final int CONCURRENT_SESSIONS = 4; private static final int CONCURRENT_CALLS_PER_SESSION = 1; private static final int RENEW_TIME_MINUTES = 2; private static TopicClient topicClient; private static SubscriptionClient subscriptionClient; private static String nameSpace = "nameSpace"; private static String topicName = "topicName"; private static String subName = "topicName/subscriptions/subName"; private static String keyName = "keyName"; private static String key = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz="; public static void main(String[] args) throws Exception { // Send System.out.println("Sending messages ..."); ConnectionStringBuilder conStrSend = new ConnectionStringBuilder(nameSpace, topicName, keyName, key); topicClient = new TopicClient(conStrSend.toString()); for(int i = 1; i <= 2; i ++) { Message message = new Message("Test"); message.setMessageId(Integer.toString(i)); message.setSessionId(Integer.toString(i*2)); topicClient.send(message); System.out.println("Message Sent - SessionId: " + message.getSessionId() + " MessageId: " + message.getMessageId()); } topicClient.close(); // Receive System.out.println("Receiving messages ..."); ConnectionStringBuilder conStrReceive = new ConnectionStringBuilder(nameSpace, subName, keyName, key); subscriptionClient = new SubscriptionClient(conStrReceive.toString(), ReceiveMode.PeekLock); // Collection sess= subscriptionClient.getMessageSessions(); // System.out.println("Sessions: " + sess.size()); receiveMessages(); Thread.sleep(10000 * 1000); subscriptionClient.close(); System.out.println("Receive sample completed."); System.exit(0); } private static void completeMessage(IMessageSession session, IMessage msg) throws InterruptedException, ServiceBusException { try { session.complete(msg.getLockToken()); } catch (MessageLockLostException ex) { System.out.println("Exception occurred: %s" + ex.getMessage()); } catch (Exception ex) { throw ex; } } private static void receiveMessages() throws InterruptedException, ServiceBusException { subscriptionClient.registerSessionHandler(new ISessionHandler(){ @Override public CompletableFuture onMessageAsync(IMessageSession session, IMessage message) { System.out.println("Message Received - SessionId: " + session.getSessionId() + " MessageId: " + message.getMessageId()); try { completeMessage(session, message); } catch (Exception ex) { System.out.println("Exception occurred: %s" + ex.getMessage()); } return CompletableFuture.completedFuture(null); } @Override public CompletableFuture OnCloseSessionAsync(IMessageSession session) { System.out.println("Session closed.- " + session.getSessionId() + " " + new Date().toString()); return CompletableFuture.completedFuture(null); } @Override public void notifyException(Throwable exception, ExceptionPhase phase) { System.out.println(phase + " encountered exception: " + exception.getMessage()); } }, new SessionHandlerOptions(CONCURRENT_SESSIONS, CONCURRENT_CALLS_PER_SESSION, false, Duration.ofMinutes(RENEW_TIME_MINUTES))); } }