Skip to content

Commit

Permalink
Merge 64d008e into f3c021d
Browse files Browse the repository at this point in the history
  • Loading branch information
sabolm committed Apr 24, 2020
2 parents f3c021d + 64d008e commit 79329f3
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 19 deletions.
Expand Up @@ -30,7 +30,6 @@
import org.apache.camel.Header;
import org.apache.camel.Properties;

import org.openhubframework.openhub.api.entity.ExternalSystemExtEnum;
import org.openhubframework.openhub.api.entity.Message;
import org.openhubframework.openhub.api.entity.MessageFilter;
import org.openhubframework.openhub.api.entity.MsgStateEnum;
Expand Down Expand Up @@ -259,4 +258,14 @@ List<Message> getMessagesForGuaranteedOrderForFunnel(String funnelValue, Duratio
*/
@Nullable
Message findPartlyFailedMessage(Duration interval);

/**
* Finds one message in state {@link MsgStateEnum#POSTPONED} or {@link MsgStateEnum#PARTLY_FAILED}
*
* @param postponedInterval Interval (in seconds) after that can be postponed message processed again
* @param partiallyFailedInterval Interval (in seconds) after that can be partly failed message processed again
* @return message or null if there is no any message
*/
@Nullable
Message findPostponedOrPartlyFailedMessage(Duration postponedInterval, Duration partiallyFailedInterval);
}
Expand Up @@ -511,4 +511,13 @@ public Message findPartlyFailedMessage(Duration interval) {

return messageDao.findPartlyFailedMessage(interval);
}

@Nullable
@Override
public Message findPostponedOrPartlyFailedMessage(Duration postponedInterval, Duration partiallyFailedInterval) {
Assert.notNull(postponedInterval, "postponed interval must not be null");
Assert.notNull(partiallyFailedInterval, "partly failed interval must not be null");

return messageDao.findPostponedOrPartlyFailedMessage(postponedInterval, partiallyFailedInterval);
}
}
Expand Up @@ -64,12 +64,7 @@ public Message getNextMessage() {
// is there next message for processing?

// firstly try postponed messages
Message msg = findPostponedMessage();

// then partly failed messages
if (msg == null) {
msg = findPartlyFailedMessage();
}
Message msg = findPostponedOrPartlyFailedMessage();

if (msg == null) {
LOG.debug("No POSTPONED and PARTLY_FAILED message found for re-processing.");
Expand All @@ -86,12 +81,8 @@ public Message getNextMessage() {
}

@Nullable
private Message findPostponedMessage() {
return messageService.findPostponedMessage(postponedInterval.getValue().toDuration());
}

@Nullable
private Message findPartlyFailedMessage() {
return messageService.findPartlyFailedMessage(partlyFailedInterval.getValue().toDuration());
private Message findPostponedOrPartlyFailedMessage() {
return messageService.findPostponedOrPartlyFailedMessage(postponedInterval.getValue().toDuration(),
partlyFailedInterval.getValue().toDuration());
}
}
Expand Up @@ -125,6 +125,16 @@ public interface MessageDao {
@Nullable
Message findPostponedMessage(Duration interval);

/**
* Finds ONE message in state {@link MsgStateEnum#POSTPONED} or {@link MsgStateEnum#PARTLY_FAILED}
*
* @param postponedInterval Interval (in seconds) after that can be postponed message processed again
* @param partiallyFailedInterval Interval (in seconds) after that can be partly failed message processed again
* @return message or null if there is no any message
*/
@Nullable
Message findPostponedOrPartlyFailedMessage(Duration postponedInterval, Duration partiallyFailedInterval);

/**
* Updates {@link Message} into state {@link MsgStateEnum#PROCESSING} (set start timestamp of processing)
* - gets lock for message.
Expand Down
Expand Up @@ -207,6 +207,32 @@ public Message findPostponedMessage(Duration interval) {
}
}

@Nullable
@Override
public Message findPostponedOrPartlyFailedMessage(Duration postponedInterval, Duration partiallyFailedInterval) {
// find message that was lastly processed before specified intervals

String jSql = "SELECT m "
+ "FROM " + Message.class.getName() + " m "
+ "WHERE (m.state = '" + MsgStateEnum.POSTPONED + "'"
+ " AND m.lastUpdateTimestamp < :lastTimePostponed)"
+ " OR (m.state = '" + MsgStateEnum.PARTLY_FAILED + "'"
+ " AND m.lastUpdateTimestamp < :lastTimePartlyFailed)"
+ " ORDER BY m.msgTimestamp";

TypedQuery<Message> q = em.createQuery(jSql, Message.class);
q.setParameter("lastTimePostponed", Instant.now().minus(postponedInterval));
q.setParameter("lastTimePartlyFailed", Instant.now().minus(partiallyFailedInterval));
q.setMaxResults(1);
List<Message> messages = q.getResultList();

if (messages.isEmpty()) {
return null;
} else {
return messages.get(0);
}
}

@Override
@Transactional(propagation = Propagation.MANDATORY)
public boolean updateMessageProcessingUnderLock(Message msg, Node processingNode) {
Expand Down
Expand Up @@ -46,7 +46,7 @@
* @author Petr Juza
*/
@Transactional
public class PartlyFailedMessagesPoolDbTest extends AbstractCoreDbTest {
public class MessagesPoolDbTest extends AbstractCoreDbTest {

@Autowired
private MessagesPool messagesPool;
Expand All @@ -56,14 +56,15 @@ public class PartlyFailedMessagesPoolDbTest extends AbstractCoreDbTest {

@Before
public void prepareData() {
// set failed limit
// set partly failed and postponed limit
setPrivateField(messagesPool, "partlyFailedInterval", new FixedConfigurationItem<>(Seconds.ZERO));
setPrivateField(messagesPool, "postponedInterval", new FixedConfigurationItem<>(Seconds.ZERO));
}

@Test
public void testGetNextMessage() {
// add one message and try to lock it
insertNewMessage("1234_4567", MsgStateEnum.PARTLY_FAILED);
insertNewMessage("1234_4567", MsgStateEnum.PARTLY_FAILED, Instant.now());

Message nextMsg = messagesPool.getNextMessage();
assertThat(nextMsg, notNullValue());
Expand All @@ -84,15 +85,57 @@ public void testGetNextMessage_noNextMessage() {
assertThat(nextMsg, nullValue());
}

private void insertNewMessage(String correlationId, MsgStateEnum state) {
@Test
public void testGetNextMessage_noFailedMessage() {
insertNewMessage("id1", MsgStateEnum.PARTLY_FAILED, Instant.now());
insertNewMessage("id2", MsgStateEnum.FAILED, Instant.now());
insertNewMessage("id3", MsgStateEnum.POSTPONED, Instant.now());

Message nextMsg = messagesPool.getNextMessage();
assertThat(nextMsg, notNullValue());
assertThat(nextMsg.getState(), is(MsgStateEnum.IN_QUEUE));
assertThat(nextMsg.getCorrelationId(), is("id1"));

nextMsg = messagesPool.getNextMessage();
assertThat(nextMsg, notNullValue());
assertThat(nextMsg.getState(), is(MsgStateEnum.IN_QUEUE));
assertThat(nextMsg.getCorrelationId(), is("id3"));

nextMsg = messagesPool.getNextMessage();
assertThat(nextMsg, nullValue());
}

@Test
public void testGetNextMessage_byPoolIntervals() {
setPrivateField(messagesPool, "partlyFailedInterval", new FixedConfigurationItem<>(Seconds.of(300)));
setPrivateField(messagesPool, "postponedInterval", new FixedConfigurationItem<>(Seconds.of(120)));

insertNewMessage("id1", MsgStateEnum.POSTPONED, Instant.now());
insertNewMessage("id2", MsgStateEnum.POSTPONED, Instant.now().minusSeconds(150));
insertNewMessage("id3", MsgStateEnum.PARTLY_FAILED, Instant.now().minusSeconds(150));
insertNewMessage("id4", MsgStateEnum.PARTLY_FAILED, Instant.now().minusSeconds(400));

Message nextMsg = messagesPool.getNextMessage();
assertThat(nextMsg, notNullValue());
assertThat(nextMsg.getCorrelationId(), is("id2"));

nextMsg = messagesPool.getNextMessage();
assertThat(nextMsg, notNullValue());
assertThat(nextMsg.getCorrelationId(), is("id4"));

nextMsg = messagesPool.getNextMessage();
assertThat(nextMsg, nullValue());
}

private void insertNewMessage(String correlationId, MsgStateEnum state, Instant lastUpdateTimestamp) {
Instant currDate = Instant.now();

Message msg = new Message();
msg.setState(state);

msg.setMsgTimestamp(currDate);
msg.setReceiveTimestamp(currDate);
msg.setLastUpdateTimestamp(currDate);
msg.setLastUpdateTimestamp(lastUpdateTimestamp);
msg.setSourceSystem(ExternalSystemTestEnum.CRM);
msg.setCorrelationId(correlationId);

Expand Down

0 comments on commit 79329f3

Please sign in to comment.