diff --git a/core-spi/src/main/java/org/openhubframework/openhub/spi/msg/MessageService.java b/core-spi/src/main/java/org/openhubframework/openhub/spi/msg/MessageService.java index b38f2bf1..b040feb9 100644 --- a/core-spi/src/main/java/org/openhubframework/openhub/spi/msg/MessageService.java +++ b/core-spi/src/main/java/org/openhubframework/openhub/spi/msg/MessageService.java @@ -30,7 +30,6 @@ import org.apache.camel.Header; import org.apache.camel.Properties; -import org.openhubframework.openhub.api.entity.ExternalSystemExtEnum; import org.openhubframework.openhub.api.entity.Message; import org.openhubframework.openhub.api.entity.MessageFilter; import org.openhubframework.openhub.api.entity.MsgStateEnum; @@ -259,4 +258,14 @@ List getMessagesForGuaranteedOrderForFunnel(String funnelValue, Duratio */ @Nullable Message findPartlyFailedMessage(Duration interval); + + /** + * Finds one message in state {@link MsgStateEnum#POSTPONED} or {@link MsgStateEnum#PARTLY_FAILED} + * + * @param postponedInterval Interval (in seconds) after that can be postponed message processed again + * @param partiallyFailedInterval Interval (in seconds) after that can be partly failed message processed again + * @return message or null if there is no any message + */ + @Nullable + Message findPostponedOrPartlyFailedMessage(Duration postponedInterval, Duration partiallyFailedInterval); } diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/asynch/msg/MessageServiceImpl.java b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/msg/MessageServiceImpl.java index 0fbbdb31..5971c71f 100644 --- a/core/src/main/java/org/openhubframework/openhub/core/common/asynch/msg/MessageServiceImpl.java +++ b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/msg/MessageServiceImpl.java @@ -511,4 +511,13 @@ public Message findPartlyFailedMessage(Duration interval) { return messageDao.findPartlyFailedMessage(interval); } + + @Nullable + @Override + public Message findPostponedOrPartlyFailedMessage(Duration postponedInterval, Duration partiallyFailedInterval) { + Assert.notNull(postponedInterval, "postponed interval must not be null"); + Assert.notNull(partiallyFailedInterval, "partly failed interval must not be null"); + + return messageDao.findPostponedOrPartlyFailedMessage(postponedInterval, partiallyFailedInterval); + } } diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/asynch/queue/MessagesPoolImpl.java b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/queue/MessagesPoolImpl.java index 99a3e8ac..2fff7ebd 100644 --- a/core/src/main/java/org/openhubframework/openhub/core/common/asynch/queue/MessagesPoolImpl.java +++ b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/queue/MessagesPoolImpl.java @@ -64,12 +64,7 @@ public Message getNextMessage() { // is there next message for processing? // firstly try postponed messages - Message msg = findPostponedMessage(); - - // then partly failed messages - if (msg == null) { - msg = findPartlyFailedMessage(); - } + Message msg = findPostponedOrPartlyFailedMessage(); if (msg == null) { LOG.debug("No POSTPONED and PARTLY_FAILED message found for re-processing."); @@ -86,12 +81,8 @@ public Message getNextMessage() { } @Nullable - private Message findPostponedMessage() { - return messageService.findPostponedMessage(postponedInterval.getValue().toDuration()); - } - - @Nullable - private Message findPartlyFailedMessage() { - return messageService.findPartlyFailedMessage(partlyFailedInterval.getValue().toDuration()); + private Message findPostponedOrPartlyFailedMessage() { + return messageService.findPostponedOrPartlyFailedMessage(postponedInterval.getValue().toDuration(), + partlyFailedInterval.getValue().toDuration()); } } diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDao.java b/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDao.java index 388723b7..26c8a975 100644 --- a/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDao.java +++ b/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDao.java @@ -125,6 +125,16 @@ public interface MessageDao { @Nullable Message findPostponedMessage(Duration interval); + /** + * Finds ONE message in state {@link MsgStateEnum#POSTPONED} or {@link MsgStateEnum#PARTLY_FAILED} + * + * @param postponedInterval Interval (in seconds) after that can be postponed message processed again + * @param partiallyFailedInterval Interval (in seconds) after that can be partly failed message processed again + * @return message or null if there is no any message + */ + @Nullable + Message findPostponedOrPartlyFailedMessage(Duration postponedInterval, Duration partiallyFailedInterval); + /** * Updates {@link Message} into state {@link MsgStateEnum#PROCESSING} (set start timestamp of processing) * - gets lock for message. diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDaoJpaImpl.java b/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDaoJpaImpl.java index b3215c81..f49639f7 100644 --- a/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDaoJpaImpl.java +++ b/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDaoJpaImpl.java @@ -207,6 +207,32 @@ public Message findPostponedMessage(Duration interval) { } } + @Nullable + @Override + public Message findPostponedOrPartlyFailedMessage(Duration postponedInterval, Duration partiallyFailedInterval) { + // find message that was lastly processed before specified intervals + + String jSql = "SELECT m " + + "FROM " + Message.class.getName() + " m " + + "WHERE (m.state = '" + MsgStateEnum.POSTPONED + "'" + + " AND m.lastUpdateTimestamp < :lastTimePostponed)" + + " OR (m.state = '" + MsgStateEnum.PARTLY_FAILED + "'" + + " AND m.lastUpdateTimestamp < :lastTimePartlyFailed)" + + " ORDER BY m.msgTimestamp"; + + TypedQuery q = em.createQuery(jSql, Message.class); + q.setParameter("lastTimePostponed", Instant.now().minus(postponedInterval)); + q.setParameter("lastTimePartlyFailed", Instant.now().minus(partiallyFailedInterval)); + q.setMaxResults(1); + List messages = q.getResultList(); + + if (messages.isEmpty()) { + return null; + } else { + return messages.get(0); + } + } + @Override @Transactional(propagation = Propagation.MANDATORY) public boolean updateMessageProcessingUnderLock(Message msg, Node processingNode) { diff --git a/core/src/test/java/org/openhubframework/openhub/core/common/asynch/db/PartlyFailedMessagesPoolDbTest.java b/core/src/test/java/org/openhubframework/openhub/core/common/asynch/db/MessagesPoolDbTest.java similarity index 61% rename from core/src/test/java/org/openhubframework/openhub/core/common/asynch/db/PartlyFailedMessagesPoolDbTest.java rename to core/src/test/java/org/openhubframework/openhub/core/common/asynch/db/MessagesPoolDbTest.java index 614a2906..395406fc 100644 --- a/core/src/test/java/org/openhubframework/openhub/core/common/asynch/db/PartlyFailedMessagesPoolDbTest.java +++ b/core/src/test/java/org/openhubframework/openhub/core/common/asynch/db/MessagesPoolDbTest.java @@ -46,7 +46,7 @@ * @author Petr Juza */ @Transactional -public class PartlyFailedMessagesPoolDbTest extends AbstractCoreDbTest { +public class MessagesPoolDbTest extends AbstractCoreDbTest { @Autowired private MessagesPool messagesPool; @@ -56,14 +56,15 @@ public class PartlyFailedMessagesPoolDbTest extends AbstractCoreDbTest { @Before public void prepareData() { - // set failed limit + // set partly failed and postponed limit setPrivateField(messagesPool, "partlyFailedInterval", new FixedConfigurationItem<>(Seconds.ZERO)); + setPrivateField(messagesPool, "postponedInterval", new FixedConfigurationItem<>(Seconds.ZERO)); } @Test public void testGetNextMessage() { // add one message and try to lock it - insertNewMessage("1234_4567", MsgStateEnum.PARTLY_FAILED); + insertNewMessage("1234_4567", MsgStateEnum.PARTLY_FAILED, Instant.now()); Message nextMsg = messagesPool.getNextMessage(); assertThat(nextMsg, notNullValue()); @@ -84,7 +85,49 @@ public void testGetNextMessage_noNextMessage() { assertThat(nextMsg, nullValue()); } - private void insertNewMessage(String correlationId, MsgStateEnum state) { + @Test + public void testGetNextMessage_noFailedMessage() { + insertNewMessage("id1", MsgStateEnum.PARTLY_FAILED, Instant.now()); + insertNewMessage("id2", MsgStateEnum.FAILED, Instant.now()); + insertNewMessage("id3", MsgStateEnum.POSTPONED, Instant.now()); + + Message nextMsg = messagesPool.getNextMessage(); + assertThat(nextMsg, notNullValue()); + assertThat(nextMsg.getState(), is(MsgStateEnum.IN_QUEUE)); + assertThat(nextMsg.getCorrelationId(), is("id1")); + + nextMsg = messagesPool.getNextMessage(); + assertThat(nextMsg, notNullValue()); + assertThat(nextMsg.getState(), is(MsgStateEnum.IN_QUEUE)); + assertThat(nextMsg.getCorrelationId(), is("id3")); + + nextMsg = messagesPool.getNextMessage(); + assertThat(nextMsg, nullValue()); + } + + @Test + public void testGetNextMessage_byPoolIntervals() { + setPrivateField(messagesPool, "partlyFailedInterval", new FixedConfigurationItem<>(Seconds.of(300))); + setPrivateField(messagesPool, "postponedInterval", new FixedConfigurationItem<>(Seconds.of(120))); + + insertNewMessage("id1", MsgStateEnum.POSTPONED, Instant.now()); + insertNewMessage("id2", MsgStateEnum.POSTPONED, Instant.now().minusSeconds(150)); + insertNewMessage("id3", MsgStateEnum.PARTLY_FAILED, Instant.now().minusSeconds(150)); + insertNewMessage("id4", MsgStateEnum.PARTLY_FAILED, Instant.now().minusSeconds(400)); + + Message nextMsg = messagesPool.getNextMessage(); + assertThat(nextMsg, notNullValue()); + assertThat(nextMsg.getCorrelationId(), is("id2")); + + nextMsg = messagesPool.getNextMessage(); + assertThat(nextMsg, notNullValue()); + assertThat(nextMsg.getCorrelationId(), is("id4")); + + nextMsg = messagesPool.getNextMessage(); + assertThat(nextMsg, nullValue()); + } + + private void insertNewMessage(String correlationId, MsgStateEnum state, Instant lastUpdateTimestamp) { Instant currDate = Instant.now(); Message msg = new Message(); @@ -92,7 +135,7 @@ private void insertNewMessage(String correlationId, MsgStateEnum state) { msg.setMsgTimestamp(currDate); msg.setReceiveTimestamp(currDate); - msg.setLastUpdateTimestamp(currDate); + msg.setLastUpdateTimestamp(lastUpdateTimestamp); msg.setSourceSystem(ExternalSystemTestEnum.CRM); msg.setCorrelationId(correlationId);