|
16 | 16 | */
|
17 | 17 | package org.apache.activemq.artemis.jms.example;
|
18 | 18 |
|
19 |
| - |
20 |
| -import org.fusesource.mqtt.client.*; |
| 19 | +import org.fusesource.mqtt.client.BlockingConnection; |
| 20 | +import org.fusesource.mqtt.client.MQTT; |
| 21 | +import org.fusesource.mqtt.client.Message; |
| 22 | +import org.fusesource.mqtt.client.QoS; |
| 23 | +import org.fusesource.mqtt.client.Topic; |
21 | 24 |
|
22 | 25 | import java.util.concurrent.TimeUnit;
|
23 | 26 |
|
|
27 | 30 | */
|
28 | 31 | public class ClusteredQueueMQTTExample {
|
29 | 32 |
|
| 33 | + public static void main(final String[] args) throws Exception { |
| 34 | + // Create a new MQTT connection to the broker. We are not setting the client ID. The broker will pick one for us. |
| 35 | + System.out.println("Connecting to Artemis using MQTT"); |
| 36 | + BlockingConnection connection1 = retrieveMQTTConnection("tcp://localhost:1883"); |
| 37 | + BlockingConnection connection2 = retrieveMQTTConnection("tcp://localhost:1884"); |
| 38 | + System.out.println("Connected to Artemis 1 "); |
30 | 39 |
|
31 |
| - public static void main(final String[] args) throws Exception { |
32 |
| - // Create a new MQTT connection to the broker. We are not setting the client ID. The broker will pick one for us. |
33 |
| - System.out.println("Connecting to Artemis using MQTT"); |
34 |
| - BlockingConnection connection1 = retrieveMQTTConnection("tcp://localhost:1883"); |
35 |
| - BlockingConnection connection2 = retrieveMQTTConnection("tcp://localhost:1884"); |
36 |
| - System.out.println("Connected to Artemis 1 "); |
37 |
| - |
38 |
| - // Subscribe to topics |
39 |
| - Topic[] topics = {new Topic("test/+/some/#", QoS.AT_MOST_ONCE)}; |
40 |
| - connection1.subscribe(topics); |
41 |
| - connection2.subscribe(topics); |
42 |
| - System.out.println("Subscribed to topics."); |
| 40 | + // Subscribe to topics |
| 41 | + Topic[] topics = {new Topic("test/+/some/#", QoS.AT_MOST_ONCE)}; |
| 42 | + connection1.subscribe(topics); |
| 43 | + connection2.subscribe(topics); |
| 44 | + System.out.println("Subscribed to topics."); |
43 | 45 |
|
44 |
| - // Publish Messages |
45 |
| - String payload1 = "This is message 1"; |
46 |
| - String payload2 = "This is message 2"; |
47 |
| - String payload3 = "This is message 3"; |
| 46 | + // Publish Messages |
| 47 | + String payload1 = "This is message 1"; |
| 48 | + String payload2 = "This is message 2"; |
| 49 | + String payload3 = "This is message 3"; |
48 | 50 |
|
49 |
| - connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); |
50 |
| - connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); |
51 |
| - connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); |
52 |
| - System.out.println("Sent messages."); |
| 51 | + connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); |
| 52 | + connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); |
| 53 | + connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); |
| 54 | + System.out.println("Sent messages."); |
53 | 55 |
|
54 |
| - Message message1 = connection1.receive(5, TimeUnit.SECONDS); |
55 |
| - Message message2 = connection2.receive(5, TimeUnit.SECONDS); |
56 |
| - Message message3 = connection1.receive(5, TimeUnit.SECONDS); |
57 |
| - System.out.println("Received messages."); |
| 56 | + Message message1 = connection1.receive(5, TimeUnit.SECONDS); |
| 57 | + Message message2 = connection2.receive(5, TimeUnit.SECONDS); |
| 58 | + Message message3 = connection1.receive(5, TimeUnit.SECONDS); |
| 59 | + System.out.println("Received messages."); |
58 | 60 |
|
59 |
| - System.out.println("Broker 1: " + new String(message1.getPayload())); |
60 |
| - System.out.println("Broker 2: " + new String(message2.getPayload())); |
61 |
| - System.out.println("Broker 1: " + new String(message3.getPayload())); |
62 |
| - } |
| 61 | + System.out.println("Broker 1: " + new String(message1.getPayload())); |
| 62 | + System.out.println("Broker 2: " + new String(message2.getPayload())); |
| 63 | + System.out.println("Broker 1: " + new String(message3.getPayload())); |
| 64 | + } |
63 | 65 |
|
64 |
| - private static BlockingConnection retrieveMQTTConnection(String host) throws Exception { |
65 |
| - MQTT mqtt = new MQTT(); |
66 |
| - mqtt.setHost(host); |
67 |
| - mqtt.setUserName("admin"); |
68 |
| - mqtt.setPassword("admin"); |
69 |
| - BlockingConnection connection = mqtt.blockingConnection(); |
70 |
| - connection.connect(); |
71 |
| - return connection; |
72 |
| - } |
| 66 | + private static BlockingConnection retrieveMQTTConnection(String host) throws Exception { |
| 67 | + MQTT mqtt = new MQTT(); |
| 68 | + mqtt.setHost(host); |
| 69 | + mqtt.setUserName("admin"); |
| 70 | + mqtt.setPassword("admin"); |
| 71 | + BlockingConnection connection = mqtt.blockingConnection(); |
| 72 | + connection.connect(); |
| 73 | + return connection; |
| 74 | + } |
73 | 75 |
|
74 | 76 | }
|
0 commit comments