-
-
Notifications
You must be signed in to change notification settings - Fork 22
/
MqAsynchronousInbox.java
214 lines (173 loc) · 6.68 KB
/
MqAsynchronousInbox.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package nu.marginalia.mq.inbox;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.sql.SQLException;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/** Message queue inbox that spawns news threads for each message */
public class MqAsynchronousInbox implements MqInboxIf {
private final Logger logger = LoggerFactory.getLogger(MqAsynchronousInbox.class);
private final String inboxName;
private final String instanceUUID;
private final ExecutorService threadPool;
private final MqPersistence persistence;
private volatile boolean run = true;
private final int pollIntervalMs = Integer.getInteger("mq.inbox.poll-interval-ms", 1000);
private final int maxPollCount = Integer.getInteger("mq.inbox.max-poll-count", 10);
private final List<MqSubscription> eventSubscribers = new ArrayList<>();
private final LinkedBlockingQueue<MqMessage> queue = new LinkedBlockingQueue<>(32);
private Thread pollDbThread;
private Thread notifyThread;
public MqAsynchronousInbox(MqPersistence persistence,
String inboxName,
UUID instanceUUID)
{
this(persistence, inboxName, instanceUUID, Executors.newCachedThreadPool());
}
public MqAsynchronousInbox(MqPersistence persistence,
String inboxName,
UUID instanceUUID,
ExecutorService executorService)
{
this.threadPool = executorService;
this.persistence = persistence;
this.inboxName = inboxName;
this.instanceUUID = instanceUUID.toString();
}
/** Subscribe to messages on this inbox. Must be run before start()! */
@Override
public void subscribe(MqSubscription subscription) {
eventSubscribers.add(subscription);
}
/** Start receiving messages. <p>
* <b>Note:</b> Subscribe to messages before calling this method.
* </p> */
@Override
public void start() {
run = true;
if (eventSubscribers.isEmpty()) {
logger.error("No subscribers for inbox {}, registering shredder", inboxName);
}
// Add a final handler that fails any message that is not handled
eventSubscribers.add(new MqInboxShredder());
pollDbThread = new Thread(this::pollDb, "mq-inbox-update-thread:"+inboxName);
pollDbThread.setDaemon(true);
pollDbThread.start();
notifyThread = new Thread(this::notifySubscribers, "mq-inbox-notify-thread:"+inboxName);
notifyThread.setDaemon(true);
notifyThread.start();
}
/** Stop receiving messages and shut down all threads */
@Override
public void stop() throws InterruptedException {
if (!run)
return;
logger.info("Shutting down inbox {}", inboxName);
run = false;
pollDbThread.join();
notifyThread.join();
threadPool.shutdownNow();
while (!threadPool.awaitTermination(5, TimeUnit.SECONDS));
}
private void notifySubscribers() {
try {
while (run) {
MqMessage msg = queue.poll(pollIntervalMs, TimeUnit.MILLISECONDS);
if (msg == null)
continue;
logger.info("Notifying subscribers of message {}", msg.msgId());
boolean handled = false;
for (var eventSubscriber : eventSubscribers) {
if (eventSubscriber.filter(msg)) {
handleMessageWithSubscriber(eventSubscriber, msg);
handled = true;
break;
}
}
if (!handled) {
logger.error("No subscriber wanted to handle message {}", msg.msgId());
}
}
}
catch (InterruptedException ex) {
logger.error("MQ inbox notify thread interrupted", ex);
}
}
private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage msg) {
threadPool.execute(() -> respondToMessage(subscriber, msg));
}
private void respondToMessage(MqSubscription subscriber, MqMessage msg) {
try {
MqMessageHandlerRegistry.register(msg.msgId());
final var rsp = subscriber.onRequest(msg);
if (msg.expectsResponse()) {
sendResponse(msg, rsp.state(), rsp.message());
}
else {
registerResponse(msg, rsp.state());
}
} catch (Exception ex) {
logger.error("Message Queue subscriber threw exception", ex);
registerResponse(msg, MqMessageState.ERR);
} finally {
MqMessageHandlerRegistry.deregister();
}
}
private void registerResponse(MqMessage msg, MqMessageState state) {
try {
persistence.updateMessageState(msg.msgId(), state);
}
catch (SQLException ex) {
logger.error("Failed to update message state", ex);
}
}
private void sendResponse(MqMessage msg, MqMessageState mqMessageState, String response) {
try {
persistence.sendResponse(msg.msgId(), mqMessageState, response);
}
catch (SQLException ex) {
logger.error("Failed to update message state", ex);
}
}
private void pollDb() {
try {
for (long tick = 1; run; tick++) {
queue.addAll(pollInbox(tick));
TimeUnit.MILLISECONDS.sleep(pollIntervalMs);
}
}
catch (InterruptedException ex) {
logger.error("MQ inbox update thread interrupted", ex);
}
}
private Collection<MqMessage> pollInbox(long tick) {
try {
return persistence.pollInbox(inboxName, instanceUUID, tick, maxPollCount);
}
catch (SQLException ex) {
logger.error("Failed to poll inbox", ex);
return List.of();
}
}
/** Retrieve the last N messages from the inbox. */
@Override
public List<MqMessage> replay(int lastN) {
try {
return persistence.lastNMessages(inboxName, lastN);
}
catch (SQLException ex) {
logger.error("Failed to replay inbox", ex);
return List.of();
}
}
}