Skip to content

Commit

Permalink
(mq) Add relation tracking between MQ messages for easier tracking an…
Browse files Browse the repository at this point in the history
…d debugging.

The change adds a new column to the MESSAGE_QUEUE table called AUDIT_RELATED_ID.  This field is populated transparently, using a dictionary mapping Thread IDs to Message IDs, populated by the inbox handlers.

The existing RELATED_ID field has too many semantics associated with them,
among other things the FSM code uses them this field in tracking state changes.

The change set also improves the consistency of inbox names.  The IndexClient was buggy and populated its outbox with a UUID.  This is fixed. All Service2Service outboxes are now prefixed with 'pp:' to make them even easier to differentiate.
  • Loading branch information
vlofgren committed Jan 18, 2024
1 parent 175bd31 commit 6271d5d
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 31 deletions.
Expand Up @@ -20,6 +20,7 @@
import java.util.List;

import javax.annotation.CheckReturnValue;
import java.util.ServiceConfigurationError;
import java.util.UUID;

@Singleton
Expand All @@ -39,7 +40,7 @@ public IndexClient(ServiceDescriptors descriptors,
this.messageQueueFactory = messageQueueFactory;

String inboxName = ServiceId.Index.name;
String outboxName = System.getProperty("service-name:"+nodeId, UUID.randomUUID().toString());
String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString());
outbox = messageQueueFactory.createOutbox(inboxName, nodeId, outboxName, nodeId, UUID.randomUUID());
setTimeout(30);
}
Expand Down
@@ -0,0 +1 @@
ALTER TABLE MESSAGE_QUEUE ADD COLUMN AUDIT_RELATED_ID LONG NOT NULL DEFAULT -1 COMMENT 'To be applied to any new messages created while handling a message';
Expand Up @@ -2,6 +2,7 @@

import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -114,7 +115,9 @@ private void notifySubscribers() {

for (var eventSubscriber : eventSubscribers) {
if (eventSubscriber.filter(msg)) {

handleMessageWithSubscriber(eventSubscriber, msg);

handled = true;
break;
}
Expand All @@ -136,16 +139,21 @@ private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage ms

private void respondToMessage(MqSubscription subscriber, MqMessage msg) {
try {
MqMessageHandlerRegistry.register(msg.msgId());
final var rsp = subscriber.onRequest(msg);

if (msg.expectsResponse()) {
sendResponse(msg, rsp.state(), rsp.message());
}
else {
registerResponse(msg, rsp.state());
}

} catch (Exception ex) {
logger.error("Message Queue subscriber threw exception", ex);
registerResponse(msg, MqMessageState.ERR);
} finally {
MqMessageHandlerRegistry.deregister();
}
}

Expand Down
Expand Up @@ -3,6 +3,7 @@
import lombok.SneakyThrows;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;

import java.sql.SQLException;
Expand Down Expand Up @@ -45,7 +46,11 @@ public Optional<MqMessage> waitForMessage(long timeout, TimeUnit unit) throws In

var messages = persistence.pollInbox(inboxName, instanceUUID, i, 1);

if (messages.size() > 0) {
if (!messages.isEmpty()) {
for (var message : messages) {
MqMessageHandlerRegistry.register(message.msgId());
}

return Optional.of(messages.iterator().next());
}

Expand Down
Expand Up @@ -2,6 +2,7 @@

import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,7 +82,10 @@ public void stop() throws InterruptedException {

private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage msg) {
try {
MqMessageHandlerRegistry.register(msg.msgId());

final var rsp = subscriber.onRequest(msg);

if (msg.expectsResponse()) {
sendResponse(msg, rsp.state(), rsp.message());
}
Expand All @@ -92,6 +96,9 @@ private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage ms
logger.error("Message Queue subscriber threw exception", ex);
registerResponse(msg, MqMessageState.ERR);
}
finally {
MqMessageHandlerRegistry.deregister();
}
}

private void registerResponse(MqMessage msg, MqMessageState state) {
Expand Down
Expand Up @@ -122,6 +122,7 @@ public long sendAsync(Object request) throws Exception {
gson.toJson(request),
null);
}

/** Blocks until a response arrives for the given message id (possibly forever) */
public MqMessage waitResponse(long id) throws Exception {
synchronized (pendingResponses) {
Expand Down
@@ -0,0 +1,35 @@
package nu.marginalia.mq.persistence;

import java.util.concurrent.ConcurrentHashMap;

/** Keeps track of which thread is handling a message, to be able to
* paint outgoing messages with a AUDIT_RELATED_ID to relate the
* outgoing message to the incoming message that triggered it.
* <p></p>
* This is a pure audit field, a weaker version of the RELATED_ID,
* which is used by e.g. state machines to relate a series of messages to each other.
* <p></p>
* The class is thread-safe, and tracks the thread ID of the thread that
* is currently handling a message. It can be cleaned up by calling
* deregister() when the message has been handled.
*/
public class MqMessageHandlerRegistry {
// There is some small risk of a memory leak here, if the registry entries aren't cleaned up properly,
// but due to the low volume of messages being sent, this is not a big concern. Since the average
// message rate is less than 1 per second, even if the process ran for 60 years, and we leaked every ID
// put in, the total amount of memory leaked would only be about of order 2 MB.

private static final ConcurrentHashMap<Long, Long> handlerRegistry = new ConcurrentHashMap<>();

public static void register(long msgId) {
handlerRegistry.put(Thread.currentThread().threadId(), msgId);
}

public static long getOriginMessage() {
return handlerRegistry.getOrDefault(Thread.currentThread().threadId(), -1L);
}

public static void deregister() {
handlerRegistry.remove(Thread.currentThread().threadId());
}
}
Expand Up @@ -55,8 +55,8 @@ public long sendNewMessage(String recipientInboxName,
) throws Exception {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, RELATED_ID, FUNCTION, PAYLOAD, TTL)
VALUES(?, ?, ?, ?, ?, ?)
INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, RELATED_ID, AUDIT_RELATED_ID, FUNCTION, PAYLOAD, TTL)
VALUES(?, ?, ?, ?, ?, ?, ?)
""");
var lastIdQuery = conn.prepareStatement("SELECT LAST_INSERT_ID()")) {

Expand All @@ -67,11 +67,12 @@ INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, RELATED_ID, FUNCTION, P

// Translate null to -1, as 0 is a valid id
stmt.setLong(3, Objects.requireNonNullElse(relatedMessageId, -1L));
stmt.setLong(4, MqMessageHandlerRegistry.getOriginMessage());

stmt.setString(4, function);
stmt.setString(5, payload);
if (ttl == null) stmt.setNull(6, java.sql.Types.BIGINT);
else stmt.setLong(6, ttl.toSeconds());
stmt.setString(5, function);
stmt.setString(6, payload);
if (ttl == null) stmt.setNull(7, java.sql.Types.BIGINT);
else stmt.setLong(7, ttl.toSeconds());

stmt.executeUpdate();

Expand Down
Expand Up @@ -5,6 +5,7 @@
public record MessageQueueEntry (
long id,
long relatedId,
long auditRelatedId,
String senderInbox,
String recipientInbox,
String function,
Expand All @@ -20,6 +21,9 @@ public record MessageQueueEntry (
public boolean hasRelatedMessage() {
return relatedId > 0;
}
public boolean hasAuditRelation() {
return auditRelatedId > 0;
}

public String stateCode() {
if (state == null) {
Expand Down
Expand Up @@ -57,7 +57,7 @@ public ControlSysActionsService(MessageQueueFactory mqFactory,
*/
private MqOutbox createApiOutbox(MessageQueueFactory mqFactory) {
String inboxName = ServiceId.Api.name + ":" + "0";
String outboxName = System.getProperty("service-name", UUID.randomUUID().toString());
String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString());
return mqFactory.createOutbox(inboxName, 0, outboxName, 0, UUID.randomUUID());
}

Expand Down
Expand Up @@ -180,7 +180,7 @@ public Object editMessageState(Request request, Response response) throws SQLExc
public List<MessageQueueEntry> getLastEntries(int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
ORDER BY ID DESC
LIMIT ?
Expand All @@ -202,7 +202,7 @@ public List<MessageQueueEntry> getLastEntries(int n) {
public MessageQueueEntry getMessage(long id) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID=?
""")) {
Expand All @@ -223,7 +223,7 @@ public MessageQueueEntry getMessage(long id) {
public Object getLastEntriesForInbox(String inbox, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE RECIPIENT_INBOX=?
ORDER BY ID DESC
Expand All @@ -247,7 +247,7 @@ public Object getLastEntriesForInbox(String inbox, int n) {
public List<MessageQueueEntry> getEntriesForInbox(String inbox, long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID < ? AND (RECIPIENT_INBOX = ? OR SENDER_INBOX = ?)
ORDER BY ID DESC
Expand All @@ -274,7 +274,7 @@ public List<MessageQueueEntry> getEntriesForInbox(String inbox, long afterId, in
public List<MessageQueueEntry> getEntriesForInstance(String instance, long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID < ? AND OWNER_INSTANCE = ?
ORDER BY ID DESC
Expand All @@ -300,7 +300,7 @@ public List<MessageQueueEntry> getEntriesForInstance(String instance, long after
public List<MessageQueueEntry> getEntries(long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID < ?
ORDER BY ID DESC
Expand Down Expand Up @@ -335,9 +335,10 @@ public List<MessageQueueEntry> getRelatedMessages(long relatedId) {
// and only available within the operator user interface.
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID = ? OR RELATED_ID = ?
WHERE (ID = ? OR RELATED_ID = ? OR AUDIT_RELATED_ID = ?)
AND STATE != 'DEAD' AND FUNCTION != 'MONITOR'
ORDER BY ID DESC
""")) {

Expand All @@ -349,6 +350,7 @@ public List<MessageQueueEntry> getRelatedMessages(long relatedId) {

ps.setLong(1, nextId);
ps.setLong(2, nextId);
ps.setLong(3, nextId);

var rs = ps.executeQuery();
while (rs.next()) {
Expand All @@ -361,6 +363,8 @@ public List<MessageQueueEntry> getRelatedMessages(long relatedId) {
newRelatedIds.add(entry.id());
if (entry.hasRelatedMessage() && !queriedIds.contains(entry.relatedId()))
newRelatedIds.add(entry.relatedId());
if (entry.hasAuditRelation() && !queriedIds.contains(entry.auditRelatedId()))
newRelatedIds.add(entry.auditRelatedId());
}
}
}
Expand All @@ -376,6 +380,7 @@ private MessageQueueEntry newEntry(ResultSet rs) throws SQLException {
return new MessageQueueEntry(
rs.getLong("ID"),
rs.getLong("RELATED_ID"),
rs.getLong("AUDIT_RELATED_ID"),
rs.getString("SENDER_INBOX"),
rs.getString("RECIPIENT_INBOX"),
rs.getString("FUNCTION"),
Expand Down
Expand Up @@ -50,11 +50,9 @@
<tr>
<td>{{ttl}}</td>
<td>
{{#if hasRelatedMessage}}
<a href="/message-queue/{{relatedId}}">{{relatedId}}</a>
{{else}}
{{relatedId}}
{{/if}}
{{#if hasRelatedMessage}}<a href="/message-queue/{{relatedId}}">{{relatedId}}</a>{{else}}{{relatedId}}{{/if}}
/
{{#if hasAuditRelation}}<a href="/message-queue/{{auditRelatedId}}">{{auditRelatedId}}</a>{{else}}{{auditRelatedId}}{{/if}}
</td>
<td><a href="/message-queue?inbox={{senderInbox}}">{{senderInbox}}</a></td>
<td style="word-break: break-all; font-family: monospace;">{{payload}}</td>
Expand Down
Expand Up @@ -22,11 +22,10 @@
<tr><td>state</td><td>{{state}}</td><td><a href="{{id}}/edit">[Edit&nbsp;State]</a></td></tr>
<tr><td>senderInbox</td><td><a href="/message-queue?inbox={{senderInbox}}">{{senderInbox}}</a></td><td>{{#if senderInbox}}<a href="{{id}}/reply">[Reply]</a>{{/if}}</td></tr>
<tr><td>relatedId</td><td>
{{#if hasRelatedMessage}}
<a href="{{relatedId}}">{{relatedId}}</a>
{{else}}
{{relatedId}}
{{/if}}
{{#if hasRelatedMessage}} <a href="{{relatedId}}">{{relatedId}}</a> {{else}} {{relatedId}} {{/if}}
</td><td></td></tr>
<tr><td>auditRelatedId</td><td>
{{#if hasAuditRelation}}<a href="/message-queue/{{auditRelatedId}}">{{auditRelatedId}}</a>{{else}}{{auditRelatedId}}{{/if}}
</td><td></td></tr>
<tr><td>function</td><td>{{function}}</td><td></td></tr>
<tr><td>payload</td><td>
Expand All @@ -42,20 +41,25 @@
<h2>Related Messages</h2>
<table class="table">
<tr>
<th>ID</th>
<th>ID/RelatedID</th>
<th>Recipient Inbox</th>
<th>Sender Inbox</th>
<th>Function</th>
<th>State</th>
</tr>
<tr>
<th colspan="4">Payload</th>
</tr>
{{#each relatedMessages}}
<tr>
<td><a href="{{id}}">{{id}}</a></td>
<td>
<a {{#eq id message.id}}style="font-weight: bold"{{/eq}} href="{{id}}">{{id}}</a>
{{#if hasRelatedMessage}}/ <a href="{{relatedId}}" {{#eq relatedId message.id}}style="font-weight: bold"{{/eq}}>{{relatedId}}</a>{{/if}}
</td>
<td><a href="/message-queue?inbox={{recipientInbox}}">{{recipientInbox}}</a></td>
<td><a href="/message-queue?inbox={{senderInbox}}">{{senderInbox}}</a></td>
<td>{{function}}</td>
<td>{{state}}</td>
</tr>
<tr><td colspan="4" style="word-break: break-all; font-family: monospace;">{{payload}}</td> </tr>
{{/each}}
</table>
{{/if}}
Expand Down
Expand Up @@ -5,6 +5,7 @@
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.*;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
Expand Down Expand Up @@ -59,6 +60,9 @@ case Monitor (int errorAttempts) -> {
}
// else continue
} else {
// Special: Associate this thread with the message so that we can get tracking
MqMessageHandlerRegistry.register(messages.getFirst().msgId());

yield new Run(0);
}
}
Expand Down

0 comments on commit 6271d5d

Please sign in to comment.