forked from youngj/EnvayaSMS
-
Notifications
You must be signed in to change notification settings - Fork 12
/
AmqpConsumerThread.java
105 lines (95 loc) · 4.05 KB
/
AmqpConsumerThread.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package org.btc4all.gateway;
import java.io.IOException;
import java.util.LinkedHashMap;
import com._37coins.EnvayaClient;
import com._37coins.EnvayaClientException;
import com._37coins.pojo.EnvayaEvent;
import com._37coins.pojo.EnvayaMessage;
import com._37coins.pojo.EnvayaEvent.Event;
import com._37coins.pojo.EnvayaRequest.Status;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.plivo.helper.api.client.RestAPI;
import com.plivo.helper.exception.PlivoException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class AmqpConsumerThread extends Thread {
private boolean isActive = true;
private String queueName;
private ConnectionFactory factory;
private ObjectMapper om;
private EnvayaClient client;
public AmqpConsumerThread(String queueName, ConnectionFactory factory, EnvayaClient client){
isActive = true;
this.queueName = queueName;
this.factory = factory;
this.om = new ObjectMapper();
this.client = client;
}
@Override
public void run() {
while (isActive){
Connection connection = null;
QueueingConsumer consumer = null;
Channel channel = null;
try {
factory.setAutomaticRecoveryEnabled(true);
connection = factory.newConnection();
channel = connection.createChannel();
consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
} catch (IOException e) {
e.printStackTrace();
isActive = false;
}
while (isActive&&connection.isOpen()) {
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
EnvayaEvent e = om.readValue(delivery.getBody(), EnvayaEvent.class);
System.out.println("event received: " + om.writeValueAsString(e));
if (e.getEvent()==Event.SEND){
EnvayaMessage message = e.getMessages().get(0);
LinkedHashMap<String, String> params = new LinkedHashMap<String, String>();
params.put("src", GatewayServletConfig.mobile);
params.put("dst", message.getTo());
params.put("text", message.getMessage());
try {
RestAPI restAPI = new RestAPI(GatewayServletConfig.plivoKey,
GatewayServletConfig.plivoSecret, "v1");
System.out.println("plivo api set up.");
restAPI.sendMessage(params);
System.out.println("sent message to plivo successfully.");
client.sendStatus(Status.SENT, message.getId(), null);
System.out.println("envaya notified");
} catch (PlivoException | EnvayaClientException ex) {
ex.printStackTrace();
}
}else{
System.out.println("undigested message: "+om.writeValueAsString(e));
}
} catch (ShutdownSignalException | ConsumerCancelledException| IOException e) {
e.printStackTrace();
} catch (InterruptedException e2){
e2.printStackTrace();
isActive = false;
} finally{
if (null!=connection){
try {
channel.close();
connection.close();
} catch (IOException e) {
}
}
}
}
System.out.println("amqp connection closed");
}
}
public void kill() {
isActive = false;
this.interrupt();
}
}