/
BbbAppsIsAliveMonitorService.java
executable file
·138 lines (114 loc) · 4.51 KB
/
BbbAppsIsAliveMonitorService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package org.bigbluebutton.red5.monitoring;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.bigbluebutton.common.messages.MessageHeader;
import org.bigbluebutton.common.messages.MessagingConstants;
import org.bigbluebutton.common.messages.PubSubPingMessage;
import org.bigbluebutton.common.messages.payload.PubSubPingMessagePayload;
import org.bigbluebutton.red5.client.messaging.ConnectionInvokerService;
import org.bigbluebutton.red5.client.messaging.DisconnectAllMessage;
import org.bigbluebutton.red5.pubsub.redis.MessageSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
public class BbbAppsIsAliveMonitorService {
private static Logger log = LoggerFactory.getLogger(BbbAppsIsAliveMonitorService.class);
private static final Executor msgSenderExec = Executors.newFixedThreadPool(1);
private static final Executor runExec = Executors.newFixedThreadPool(1);
private ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
private BlockingQueue<IKeepAliveMessage> messages = new LinkedBlockingQueue<IKeepAliveMessage>();
private volatile boolean processMessages = false;
private KeepAliveTask task = new KeepAliveTask();
private ConnectionInvokerService service;
private Long lastKeepAliveMessage = 0L;
private MessageSender sender;
private final String SYSTEM_NAME = "BbbAppsRed5";
public void setMessageSender(MessageSender sender) {
this.sender = sender;
}
public void setConnectionInvokerService(ConnectionInvokerService s) {
this.service = s;
}
public void start() {
scheduledThreadPool.scheduleWithFixedDelay(task, 5000, 10000, TimeUnit.MILLISECONDS);
processKeepAliveMessage();
}
public void stop() {
processMessages = false;
scheduledThreadPool.shutdownNow();
}
public void handleKeepAliveMessage(String system, Long timestamp) {
if (system.equals(SYSTEM_NAME)) {
queueMessage(new KeepAliveMessage(system, timestamp));
}
}
private void queueMessage(IKeepAliveMessage msg) {
messages.add(msg);
}
private void processKeepAliveMessage() {
processMessages = true;
Runnable sender = new Runnable() {
public void run() {
while (processMessages) {
IKeepAliveMessage message;
try {
message = messages.take();
processMessage(message);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
//log.error("Catching exception [{}]", e.toString());
}
}
}
};
msgSenderExec.execute(sender);
}
private void processMessage(final IKeepAliveMessage msg) {
Runnable task = new Runnable() {
public void run() {
if (msg instanceof KeepAliveMessage) {
processKeepAliveMessage((KeepAliveMessage) msg);
} else if (msg instanceof CheckIsAliveTimer) {
processCheckIsAliveTimer((CheckIsAliveTimer) msg);
}
}
};
runExec.execute(task);
}
private void processKeepAliveMessage(KeepAliveMessage msg) {
//log.info("BBB Apps Red5 pubsub pong!" + msg.system);
lastKeepAliveMessage = System.currentTimeMillis();
}
private void processCheckIsAliveTimer(CheckIsAliveTimer msg) {
Long now = System.currentTimeMillis();
if (lastKeepAliveMessage != 0 && (now - lastKeepAliveMessage > 30000)) {
log.error("BBB Apps Red5 pubsub error!");
service.sendMessage(new DisconnectAllMessage());
}
}
class KeepAliveTask implements Runnable {
public void run() {
CheckIsAliveTimer ping = new CheckIsAliveTimer();
queueMessage(ping);
PubSubPingMessage msg = new PubSubPingMessage();
MessageHeader header = new MessageHeader();
header.name = PubSubPingMessage.PUBSUB_PING;
header.timestamp = System.nanoTime();
header.replyTo = "BbbRed5";
header.version = "0.0.1";
PubSubPingMessagePayload payload = new PubSubPingMessagePayload();
payload.system = SYSTEM_NAME;
payload.timestamp = System.currentTimeMillis();
msg.header = header;
msg.payload = payload;
Gson gson = new Gson();
sender.send(MessagingConstants.TO_SYSTEM_CHANNEL, gson.toJson(msg));
}
}
}