Skip to content

Commit 557e5c0

Browse files
authored
Merge pull request rabbitmq#115 from monosoul/updated-tutorial-six-java
Updated tutorial six Java code examples due to QueueingConsumer deprecation
2 parents 7116bd0 + 615cc3c commit 557e5c0

File tree

2 files changed

+86
-77
lines changed

2 files changed

+86
-77
lines changed

java/RPCClient.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,57 @@
11
import com.rabbitmq.client.ConnectionFactory;
22
import com.rabbitmq.client.Connection;
33
import com.rabbitmq.client.Channel;
4-
import com.rabbitmq.client.QueueingConsumer;
5-
import com.rabbitmq.client.AMQP.BasicProperties;
4+
import com.rabbitmq.client.DefaultConsumer;
5+
import com.rabbitmq.client.AMQP;
6+
import com.rabbitmq.client.Envelope;
7+
8+
import java.io.IOException;
69
import java.util.UUID;
10+
import java.util.concurrent.ArrayBlockingQueue;
11+
import java.util.concurrent.BlockingQueue;
12+
import java.util.concurrent.TimeoutException;
713

814
public class RPCClient {
915

1016
private Connection connection;
1117
private Channel channel;
1218
private String requestQueueName = "rpc_queue";
1319
private String replyQueueName;
14-
private QueueingConsumer consumer;
1520

16-
public RPCClient() throws Exception {
21+
public RPCClient() throws IOException, TimeoutException {
1722
ConnectionFactory factory = new ConnectionFactory();
1823
factory.setHost("localhost");
24+
1925
connection = factory.newConnection();
2026
channel = connection.createChannel();
2127

2228
replyQueueName = channel.queueDeclare().getQueue();
23-
consumer = new QueueingConsumer(channel);
24-
channel.basicConsume(replyQueueName, true, consumer);
2529
}
2630

27-
public String call(String message) throws Exception {
28-
String response = null;
31+
public String call(String message) throws IOException, InterruptedException {
2932
String corrId = UUID.randomUUID().toString();
3033

31-
BasicProperties props = new BasicProperties
32-
.Builder()
33-
.correlationId(corrId)
34-
.replyTo(replyQueueName)
35-
.build();
34+
AMQP.BasicProperties props = new AMQP.BasicProperties
35+
.Builder()
36+
.correlationId(corrId)
37+
.replyTo(replyQueueName)
38+
.build();
3639

3740
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
3841

39-
while (true) {
40-
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
41-
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
42-
response = new String(delivery.getBody(),"UTF-8");
43-
break;
42+
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
43+
44+
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
45+
@Override
46+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
47+
response.offer(new String(body, "UTF-8"));
4448
}
45-
}
49+
});
4650

47-
return response;
51+
return response.take();
4852
}
4953

50-
public void close() throws Exception {
54+
public void close() throws IOException {
5155
connection.close();
5256
}
5357

@@ -61,15 +65,15 @@ public static void main(String[] argv) {
6165
response = fibonacciRpc.call("30");
6266
System.out.println(" [.] Got '" + response + "'");
6367
}
64-
catch (Exception e) {
68+
catch (IOException | TimeoutException | InterruptedException e) {
6569
e.printStackTrace();
6670
}
6771
finally {
6872
if (fibonacciRpc!= null) {
6973
try {
7074
fibonacciRpc.close();
7175
}
72-
catch (Exception ignore) {}
76+
catch (IOException _ignore) {}
7377
}
7478
}
7579
}

java/RPCServer.java

Lines changed: 59 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,83 @@
11
import com.rabbitmq.client.ConnectionFactory;
22
import com.rabbitmq.client.Connection;
33
import com.rabbitmq.client.Channel;
4-
import com.rabbitmq.client.QueueingConsumer;
5-
import com.rabbitmq.client.AMQP.BasicProperties;
6-
4+
import com.rabbitmq.client.Consumer;
5+
import com.rabbitmq.client.DefaultConsumer;
6+
import com.rabbitmq.client.AMQP;
7+
import com.rabbitmq.client.Envelope;
8+
9+
import java.io.IOException;
10+
import java.util.concurrent.TimeoutException;
11+
712
public class RPCServer {
8-
13+
914
private static final String RPC_QUEUE_NAME = "rpc_queue";
10-
15+
1116
private static int fib(int n) {
1217
if (n ==0) return 0;
1318
if (n == 1) return 1;
1419
return fib(n-1) + fib(n-2);
1520
}
16-
21+
1722
public static void main(String[] argv) {
23+
ConnectionFactory factory = new ConnectionFactory();
24+
factory.setHost("localhost");
25+
1826
Connection connection = null;
19-
Channel channel = null;
2027
try {
21-
ConnectionFactory factory = new ConnectionFactory();
22-
factory.setHost("localhost");
23-
24-
connection = factory.newConnection();
25-
channel = connection.createChannel();
26-
28+
connection = factory.newConnection();
29+
Channel channel = connection.createChannel();
30+
2731
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
28-
32+
2933
channel.basicQos(1);
30-
31-
QueueingConsumer consumer = new QueueingConsumer(channel);
32-
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
33-
34+
3435
System.out.println(" [x] Awaiting RPC requests");
35-
36-
while (true) {
37-
String response = null;
38-
39-
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
40-
41-
BasicProperties props = delivery.getProperties();
42-
BasicProperties replyProps = new BasicProperties
43-
.Builder()
44-
.correlationId(props.getCorrelationId())
45-
.build();
46-
47-
try {
48-
String message = new String(delivery.getBody(),"UTF-8");
49-
int n = Integer.parseInt(message);
50-
51-
System.out.println(" [.] fib(" + message + ")");
52-
response = "" + fib(n);
53-
}
54-
catch (Exception e){
55-
System.out.println(" [.] " + e.toString());
56-
response = "";
57-
}
58-
finally {
59-
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
60-
61-
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
36+
37+
Consumer consumer = new DefaultConsumer(channel) {
38+
@Override
39+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
40+
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
41+
.Builder()
42+
.correlationId(properties.getCorrelationId())
43+
.build();
44+
45+
String response = "";
46+
47+
try {
48+
String message = new String(body,"UTF-8");
49+
int n = Integer.parseInt(message);
50+
51+
System.out.println(" [.] fib(" + message + ")");
52+
response += fib(n);
53+
}
54+
catch (RuntimeException e){
55+
System.out.println(" [.] " + e.toString());
56+
}
57+
finally {
58+
channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
59+
60+
channel.basicAck(envelope.getDeliveryTag(), false);
61+
}
6262
}
63+
};
64+
65+
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
66+
67+
//loop to prevent reaching finally block
68+
while(true) {
69+
try {
70+
Thread.sleep(100);
71+
} catch (InterruptedException _ignore) {}
6372
}
64-
}
65-
catch (Exception e) {
73+
} catch (IOException | TimeoutException e) {
6674
e.printStackTrace();
6775
}
6876
finally {
69-
if (connection != null) {
77+
if (connection != null)
7078
try {
7179
connection.close();
72-
}
73-
catch (Exception ignore) {}
74-
}
75-
}
80+
} catch (IOException _ignore) {}
81+
}
7682
}
77-
}
78-
83+
}

0 commit comments

Comments
 (0)