diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java index 58447beb6..8b747a85c 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java @@ -20,6 +20,7 @@ import java.util.List; import javax.annotation.CheckReturnValue; +import java.util.ServiceConfigurationError; import java.util.UUID; @Singleton @@ -39,7 +40,7 @@ public IndexClient(ServiceDescriptors descriptors, this.messageQueueFactory = messageQueueFactory; String inboxName = ServiceId.Index.name; - String outboxName = System.getProperty("service-name:"+nodeId, UUID.randomUUID().toString()); + String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString()); outbox = messageQueueFactory.createOutbox(inboxName, nodeId, outboxName, nodeId, UUID.randomUUID()); setTimeout(30); } diff --git a/code/common/db/src/main/resources/db/migration/V24_01_0_003__mqaudit.sql b/code/common/db/src/main/resources/db/migration/V24_01_0_003__mqaudit.sql new file mode 100644 index 000000000..62bc939ce --- /dev/null +++ b/code/common/db/src/main/resources/db/migration/V24_01_0_003__mqaudit.sql @@ -0,0 +1 @@ +ALTER TABLE MESSAGE_QUEUE ADD COLUMN AUDIT_RELATED_ID LONG NOT NULL DEFAULT -1 COMMENT 'To be applied to any new messages created while handling a message'; \ No newline at end of file diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java index 7411b6e24..049f2bb2b 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java @@ -2,6 +2,7 @@ import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.persistence.MqMessageHandlerRegistry; import nu.marginalia.mq.persistence.MqPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,7 +115,9 @@ private void notifySubscribers() { for (var eventSubscriber : eventSubscribers) { if (eventSubscriber.filter(msg)) { + handleMessageWithSubscriber(eventSubscriber, msg); + handled = true; break; } @@ -136,16 +139,21 @@ private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage ms private void respondToMessage(MqSubscription subscriber, MqMessage msg) { try { + MqMessageHandlerRegistry.register(msg.msgId()); final var rsp = subscriber.onRequest(msg); + if (msg.expectsResponse()) { sendResponse(msg, rsp.state(), rsp.message()); } else { registerResponse(msg, rsp.state()); } + } catch (Exception ex) { logger.error("Message Queue subscriber threw exception", ex); registerResponse(msg, MqMessageState.ERR); + } finally { + MqMessageHandlerRegistry.deregister(); } } diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java index 19645c647..8b7613837 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java @@ -3,6 +3,7 @@ import lombok.SneakyThrows; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.persistence.MqMessageHandlerRegistry; import nu.marginalia.mq.persistence.MqPersistence; import java.sql.SQLException; @@ -45,7 +46,11 @@ public Optional waitForMessage(long timeout, TimeUnit unit) throws In var messages = persistence.pollInbox(inboxName, instanceUUID, i, 1); - if (messages.size() > 0) { + if (!messages.isEmpty()) { + for (var message : messages) { + MqMessageHandlerRegistry.register(message.msgId()); + } + return Optional.of(messages.iterator().next()); } diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java index 9fa0ef4db..99a81c1e6 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java @@ -2,6 +2,7 @@ import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.persistence.MqMessageHandlerRegistry; import nu.marginalia.mq.persistence.MqPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +82,10 @@ public void stop() throws InterruptedException { private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage msg) { try { + MqMessageHandlerRegistry.register(msg.msgId()); + final var rsp = subscriber.onRequest(msg); + if (msg.expectsResponse()) { sendResponse(msg, rsp.state(), rsp.message()); } @@ -92,6 +96,9 @@ private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage ms logger.error("Message Queue subscriber threw exception", ex); registerResponse(msg, MqMessageState.ERR); } + finally { + MqMessageHandlerRegistry.deregister(); + } } private void registerResponse(MqMessage msg, MqMessageState state) { diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java index f144e300d..b1f0a44b8 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java @@ -122,6 +122,7 @@ public long sendAsync(Object request) throws Exception { gson.toJson(request), null); } + /** Blocks until a response arrives for the given message id (possibly forever) */ public MqMessage waitResponse(long id) throws Exception { synchronized (pendingResponses) { diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqMessageHandlerRegistry.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqMessageHandlerRegistry.java new file mode 100644 index 000000000..b3acc3946 --- /dev/null +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqMessageHandlerRegistry.java @@ -0,0 +1,35 @@ +package nu.marginalia.mq.persistence; + +import java.util.concurrent.ConcurrentHashMap; + +/** Keeps track of which thread is handling a message, to be able to + * paint outgoing messages with a AUDIT_RELATED_ID to relate the + * outgoing message to the incoming message that triggered it. + *

+ * This is a pure audit field, a weaker version of the RELATED_ID, + * which is used by e.g. state machines to relate a series of messages to each other. + *

+ * The class is thread-safe, and tracks the thread ID of the thread that + * is currently handling a message. It can be cleaned up by calling + * deregister() when the message has been handled. + */ +public class MqMessageHandlerRegistry { + // There is some small risk of a memory leak here, if the registry entries aren't cleaned up properly, + // but due to the low volume of messages being sent, this is not a big concern. Since the average + // message rate is less than 1 per second, even if the process ran for 60 years, and we leaked every ID + // put in, the total amount of memory leaked would only be about of order 2 MB. + + private static final ConcurrentHashMap handlerRegistry = new ConcurrentHashMap<>(); + + public static void register(long msgId) { + handlerRegistry.put(Thread.currentThread().threadId(), msgId); + } + + public static long getOriginMessage() { + return handlerRegistry.getOrDefault(Thread.currentThread().threadId(), -1L); + } + + public static void deregister() { + handlerRegistry.remove(Thread.currentThread().threadId()); + } +} diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java index f32213b44..02740232b 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java @@ -55,8 +55,8 @@ public long sendNewMessage(String recipientInboxName, ) throws Exception { try (var conn = dataSource.getConnection(); var stmt = conn.prepareStatement(""" - INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, RELATED_ID, FUNCTION, PAYLOAD, TTL) - VALUES(?, ?, ?, ?, ?, ?) + INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, RELATED_ID, AUDIT_RELATED_ID, FUNCTION, PAYLOAD, TTL) + VALUES(?, ?, ?, ?, ?, ?, ?) """); var lastIdQuery = conn.prepareStatement("SELECT LAST_INSERT_ID()")) { @@ -67,11 +67,12 @@ INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, RELATED_ID, FUNCTION, P // Translate null to -1, as 0 is a valid id stmt.setLong(3, Objects.requireNonNullElse(relatedMessageId, -1L)); + stmt.setLong(4, MqMessageHandlerRegistry.getOriginMessage()); - stmt.setString(4, function); - stmt.setString(5, payload); - if (ttl == null) stmt.setNull(6, java.sql.Types.BIGINT); - else stmt.setLong(6, ttl.toSeconds()); + stmt.setString(5, function); + stmt.setString(6, payload); + if (ttl == null) stmt.setNull(7, java.sql.Types.BIGINT); + else stmt.setLong(7, ttl.toSeconds()); stmt.executeUpdate(); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/model/MessageQueueEntry.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/model/MessageQueueEntry.java index b63a614c2..61a842f3e 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/model/MessageQueueEntry.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/model/MessageQueueEntry.java @@ -5,6 +5,7 @@ public record MessageQueueEntry ( long id, long relatedId, + long auditRelatedId, String senderInbox, String recipientInbox, String function, @@ -20,6 +21,9 @@ public record MessageQueueEntry ( public boolean hasRelatedMessage() { return relatedId > 0; } + public boolean hasAuditRelation() { + return auditRelatedId > 0; + } public String stateCode() { if (state == null) { diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java index 270607390..cfef65773 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java @@ -57,7 +57,7 @@ public ControlSysActionsService(MessageQueueFactory mqFactory, */ private MqOutbox createApiOutbox(MessageQueueFactory mqFactory) { String inboxName = ServiceId.Api.name + ":" + "0"; - String outboxName = System.getProperty("service-name", UUID.randomUUID().toString()); + String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString()); return mqFactory.createOutbox(inboxName, 0, outboxName, 0, UUID.randomUUID()); } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/MessageQueueService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/MessageQueueService.java index 7e74ed94c..922d4a206 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/MessageQueueService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/MessageQueueService.java @@ -180,7 +180,7 @@ public Object editMessageState(Request request, Response response) throws SQLExc public List getLastEntries(int n) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE ORDER BY ID DESC LIMIT ? @@ -202,7 +202,7 @@ public List getLastEntries(int n) { public MessageQueueEntry getMessage(long id) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE WHERE ID=? """)) { @@ -223,7 +223,7 @@ public MessageQueueEntry getMessage(long id) { public Object getLastEntriesForInbox(String inbox, int n) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE WHERE RECIPIENT_INBOX=? ORDER BY ID DESC @@ -247,7 +247,7 @@ public Object getLastEntriesForInbox(String inbox, int n) { public List getEntriesForInbox(String inbox, long afterId, int n) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE WHERE ID < ? AND (RECIPIENT_INBOX = ? OR SENDER_INBOX = ?) ORDER BY ID DESC @@ -274,7 +274,7 @@ public List getEntriesForInbox(String inbox, long afterId, in public List getEntriesForInstance(String instance, long afterId, int n) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE WHERE ID < ? AND OWNER_INSTANCE = ? ORDER BY ID DESC @@ -300,7 +300,7 @@ public List getEntriesForInstance(String instance, long after public List getEntries(long afterId, int n) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE WHERE ID < ? ORDER BY ID DESC @@ -335,9 +335,10 @@ public List getRelatedMessages(long relatedId) { // and only available within the operator user interface. try (var conn = dataSource.getConnection(); var ps = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE - WHERE ID = ? OR RELATED_ID = ? + WHERE (ID = ? OR RELATED_ID = ? OR AUDIT_RELATED_ID = ?) + AND STATE != 'DEAD' AND FUNCTION != 'MONITOR' ORDER BY ID DESC """)) { @@ -349,6 +350,7 @@ public List getRelatedMessages(long relatedId) { ps.setLong(1, nextId); ps.setLong(2, nextId); + ps.setLong(3, nextId); var rs = ps.executeQuery(); while (rs.next()) { @@ -361,6 +363,8 @@ public List getRelatedMessages(long relatedId) { newRelatedIds.add(entry.id()); if (entry.hasRelatedMessage() && !queriedIds.contains(entry.relatedId())) newRelatedIds.add(entry.relatedId()); + if (entry.hasAuditRelation() && !queriedIds.contains(entry.auditRelatedId())) + newRelatedIds.add(entry.auditRelatedId()); } } } @@ -376,6 +380,7 @@ private MessageQueueEntry newEntry(ResultSet rs) throws SQLException { return new MessageQueueEntry( rs.getLong("ID"), rs.getLong("RELATED_ID"), + rs.getLong("AUDIT_RELATED_ID"), rs.getString("SENDER_INBOX"), rs.getString("RECIPIENT_INBOX"), rs.getString("FUNCTION"), diff --git a/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb b/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb index a31bc1cef..2959fa49c 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb @@ -50,11 +50,9 @@ {{ttl}} - {{#if hasRelatedMessage}} - {{relatedId}} - {{else}} - {{relatedId}} - {{/if}} + {{#if hasRelatedMessage}}{{relatedId}}{{else}}{{relatedId}}{{/if}} + / + {{#if hasAuditRelation}}{{auditRelatedId}}{{else}}{{auditRelatedId}}{{/if}} {{senderInbox}} {{payload}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/sys/view-message.hdb b/code/services-core/control-service/src/main/resources/templates/control/sys/view-message.hdb index 69686f051..0cda4d5c6 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/sys/view-message.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/sys/view-message.hdb @@ -22,11 +22,10 @@ state{{state}}[Edit State] senderInbox{{senderInbox}}{{#if senderInbox}}[Reply]{{/if}} relatedId - {{#if hasRelatedMessage}} - {{relatedId}} - {{else}} - {{relatedId}} - {{/if}} + {{#if hasRelatedMessage}} {{relatedId}} {{else}} {{relatedId}} {{/if}} + + auditRelatedId + {{#if hasAuditRelation}}{{auditRelatedId}}{{else}}{{auditRelatedId}}{{/if}} function{{function}} payload @@ -42,20 +41,25 @@

Related Messages

- + - + + + {{#each relatedMessages}} - + - + {{/each}}
IDID/RelatedID Recipient InboxSender Inbox Function State
Payload
{{id}} + {{id}} + {{#if hasRelatedMessage}}/ {{relatedId}}{{/if}} + {{recipientInbox}}{{senderInbox}} {{function}} {{state}}
{{payload}}
{{/if}} diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/monitor/AbstractProcessSpawnerActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/monitor/AbstractProcessSpawnerActor.java index a05700fef..cb5ad1bd5 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/monitor/AbstractProcessSpawnerActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/monitor/AbstractProcessSpawnerActor.java @@ -5,6 +5,7 @@ import com.google.inject.Singleton; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.*; +import nu.marginalia.mq.persistence.MqMessageHandlerRegistry; import nu.marginalia.process.ProcessService; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.persistence.MqPersistence; @@ -59,6 +60,9 @@ case Monitor (int errorAttempts) -> { } // else continue } else { + // Special: Associate this thread with the message so that we can get tracking + MqMessageHandlerRegistry.register(messages.getFirst().msgId()); + yield new Run(0); } }