Skip to content

Commit

Permalink
[OHFJIRA-110] : guaranteed order messages query optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
sabolm committed May 4, 2020
1 parent c376f08 commit 114d301
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 11 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2014 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -208,10 +208,25 @@ void setStateFailed(@Header(MSG_HEADER) Message msg,
* @param funnelValue the funnel value
* @param excludeFailedState {@link MsgStateEnum#FAILED FAILED} state is used by default;
* use {@code true} if you want to exclude FAILED state
* @return list of messages ordered by {@link Message#getMsgTimestamp() message timestamp}
* @return list of all messages ordered by {@link Message#getMsgTimestamp() message timestamp}
* @deprecated use {@link #getMessagesForGuaranteedOrderForRoute(String, boolean, long)} instead, which returns
* specified number of records
*/
@Deprecated
List<Message> getMessagesForGuaranteedOrderForRoute(String funnelValue, boolean excludeFailedState);

/**
* Gets list of messages with specified funnel value for guaranteed processing order of whole routes.
*
* @param funnelValue the funnel value
* @param excludeFailedState {@link MsgStateEnum#FAILED FAILED} state is used by default;
* use {@code true} if you want to exclude FAILED state
* @param limit the limit of message count
* @return list of messages ordered by {@link Message#getMsgTimestamp() message timestamp} and
* {@link Message#getMsgId() message id}
*/
List<Message> getMessagesForGuaranteedOrderForRoute(String funnelValue, boolean excludeFailedState, long limit);

/**
* Gets list of messages with specified funnel value for guaranteed processing order of messages
* for specified funnel.
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2014 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,6 +85,8 @@ public class AsynchInMessageRoute extends AbstractBasicRoute {

static final String ROUTE_ID_GUARANTEED_ORDER = "guaranteedOrder" + AbstractBasicRoute.ROUTE_SUFFIX;

private static final long GUARANTEED_ORDER_MESSAGES_LIMIT = 2L;

@Autowired
private ThrottlingProcessor throttlingProcessor;

Expand Down Expand Up @@ -269,7 +271,7 @@ public boolean isMsgInGuaranteedOrder(@Body Message msg) {
} else {
// guaranteed order => is the message in the right order?
List<Message> messages = getBean(MessageService.class)
.getMessagesForGuaranteedOrderForRoute(msg.getFunnelValue(), msg.isExcludeFailedState());
.getMessagesForGuaranteedOrderForRoute(msg.getFunnelValue(), msg.isExcludeFailedState(), GUARANTEED_ORDER_MESSAGES_LIMIT);

if (messages.size() == 1) {
LOG.debug("There is only one processing message with funnel value: " + msg.getFunnelValue()
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2014 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -452,6 +452,11 @@ public List<Message> getMessagesForGuaranteedOrderForRoute(String funnelValue, b
return messageDao.getMessagesForGuaranteedOrderForRoute(funnelValue, excludeFailedState);
}

@Override
public List<Message> getMessagesForGuaranteedOrderForRoute(String funnelValue, boolean excludeFailedState, long limit) {
return messageDao.getMessagesForGuaranteedOrderForRoute(funnelValue, excludeFailedState, limit);
}

@Override
public List<Message> getMessagesForGuaranteedOrderForFunnel(String funnelValue, Duration idleInterval,
boolean excludeFailedState, String funnelCompId) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2014 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,6 +61,8 @@ public class MessagePollExecutor implements Runnable {

private static final int LOCK_FAILURE_LIMIT = 5;

private static final long GUARANTEED_ORDER_MESSAGES_LIMIT = 2L;

@Autowired
private MessagesPool messagesPool;

Expand Down Expand Up @@ -175,7 +177,7 @@ private boolean isMsgInGuaranteedOrder(Message msg) {
} else {
// guaranteed order => is the message in the right order?
List<Message> messages = messageService.getMessagesForGuaranteedOrderForRoute(msg.getFunnelValue(),
msg.isExcludeFailedState());
msg.isExcludeFailedState(), GUARANTEED_ORDER_MESSAGES_LIMIT);

if (messages.size() == 1) {
LOG.debug("There is only one processing message with funnel value: " + msg.getFunnelValue()
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2014 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -190,10 +190,25 @@ public interface MessageDao {
* @param funnelValue the funnel value
* @param excludeFailedState {@link MsgStateEnum#FAILED FAILED} state is used by default;
* use {@code true} if you want to exclude FAILED state
* @return list of messages ordered by {@link Message#getMsgTimestamp() message timestamp}
* @return list of all messages ordered by {@link Message#getMsgTimestamp() message timestamp}
* @deprecated use {@link #getMessagesForGuaranteedOrderForRoute(String, boolean, long)} instead, which returns
* specified number of records
*/
@Deprecated
List<Message> getMessagesForGuaranteedOrderForRoute(String funnelValue, boolean excludeFailedState);

/**
* Gets list of messages with specified funnel value for guaranteed processing order of whole routes.
*
* @param funnelValue the funnel value
* @param excludeFailedState {@link MsgStateEnum#FAILED FAILED} state is used by default;
* use {@code true} if you want to exclude FAILED state
* @param limit the limit of message count
* @return list of messages ordered by {@link Message#getMsgTimestamp() message timestamp} and
* {@link Message#getMsgId() message id}
*/
List<Message> getMessagesForGuaranteedOrderForRoute(String funnelValue, boolean excludeFailedState, long limit);

/**
* Gets list of messages with specified funnel value for guaranteed processing order of messages
* for specified funnel.
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2014 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -399,6 +399,32 @@ public List<Message> getMessagesForGuaranteedOrderForRoute(String funnelValue, b
return q.getResultList();
}

@Override
public List<Message> getMessagesForGuaranteedOrderForRoute(String funnelValue, boolean excludeFailedState, long limit) {
String jSql = "SELECT m "
+ "FROM " + Message.class.getName() + " m "
+ "WHERE (m.state = '" + MsgStateEnum.PROCESSING + "' "
+ " OR m.state = '" + MsgStateEnum.IN_QUEUE + "'"
+ " OR m.state = '" + MsgStateEnum.NEW + "'"
+ " OR m.state = '" + MsgStateEnum.WAITING + "'"
+ " OR m.state = '" + MsgStateEnum.PARTLY_FAILED + "'"
+ " OR m.state = '" + MsgStateEnum.POSTPONED + "'";

if (!excludeFailedState) {
jSql += " OR m.state = '" + MsgStateEnum.FAILED + "'";
}

jSql += " OR m.state = '" + MsgStateEnum.WAITING_FOR_RES + "')"
+ " AND m.funnelValue = '" + funnelValue + "'"
+ " AND m.guaranteedOrder is true"
+ " ORDER BY m.msgTimestamp, m.msgId DESC";

TypedQuery<Message> q = em.createQuery(jSql, Message.class);
q.setMaxResults((int) limit);

return q.getResultList();
}

@Override
public List<Message> getMessagesForGuaranteedOrderForFunnel(String funnelValue, Duration idleInterval,
boolean excludeFailedState, String funnelCompId) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -231,6 +231,100 @@ public void testFindPostponedOrPartlyFailedMessage() {
assertThat(message.getCorrelationId(), is("id2"));
}

@Test
public void testGetMessagesForGuaranteedOrderForRoute() {
// prepare message
createAndSaveMessages(10, (message, order) -> {
message.setGuaranteedOrder(true);
message.setFunnelValue("funnel");
message.setState(MsgStateEnum.POSTPONED);
message.setStartProcessTimestamp(Instant.now());
if (order == 4) {
message.setCorrelationId("id1");
message.setMsgTimestamp(Instant.now().minusSeconds(60));
}
});

List<Message> dbMessages = messageService.getMessagesForGuaranteedOrderForRoute("funnel", false);
assertThat(dbMessages.size(), is(10)); // all messages
assertThat(dbMessages.get(0).getCorrelationId(), is("id1"));
}

@Test
public void testGetMessagesForGuaranteedOrderForRoute_limit() {
// prepare message
final Instant initMsgTimestamp = Instant.now().minusSeconds(60);
createAndSaveMessages(15, (message, order) -> {
message.setGuaranteedOrder(true);
message.setFunnelValue("funnel");
message.setState(MsgStateEnum.POSTPONED);
message.setStartProcessTimestamp(Instant.now());
switch (order) {
case 4:
message.setCorrelationId("id1");
message.setMsgTimestamp(initMsgTimestamp);
break;
case 7:
message.setCorrelationId("id2");
message.setMsgTimestamp(initMsgTimestamp);
break;
}
});

List<Message> dbMessages = messageService.getMessagesForGuaranteedOrderForRoute("funnel", false, 10L);
assertThat(dbMessages.size(), is(10)); // limit is set to 10
assertThat(dbMessages.get(0).getCorrelationId(), is("id2"));
assertThat(dbMessages.get(1).getCorrelationId(), is("id1"));
}

@Test
public void testGetMessagesForGuaranteedOrderForRoute_excludeFailed() {
// prepare message
createAndSaveMessages(10, (message, order) -> {
message.setGuaranteedOrder(true);
message.setFunnelValue("funnel");
message.setStartProcessTimestamp(Instant.now());
switch (order) {
case 1:
case 4:
case 5:
message.setState(MsgStateEnum.FAILED);
break;
case 2:
message.setCorrelationId("id2");
message.setMsgTimestamp(Instant.now().minusSeconds(120));
break;
case 7:
message.setFunnelValue("otherFunnel");
break;
}
});

List<Message> dbMessages = messageService.getMessagesForGuaranteedOrderForRoute("funnel", true, 10L);
assertThat(dbMessages.size(), is(6));
assertThat(dbMessages.get(0).getCorrelationId(), is("id2"));
}

@Test
public void testGetMessagesForGuaranteedOrderForFunnel() {
// prepare message
createAndSaveMessages(10, (message, order) -> {
message.setFunnelValue("funnel");
message.setFunnelComponentId("funnelComp");
message.setState(MsgStateEnum.WAITING);
message.setStartProcessTimestamp(Instant.now());
if (order == 4) {
message.setCorrelationId("id1");
message.setMsgTimestamp(Instant.now().minusSeconds(60));
}
});

List<Message> dbMessages =
messageService.getMessagesForGuaranteedOrderForFunnel("funnel", Seconds.of(60).toDuration(), false, "funnelComp");
assertThat(dbMessages.size(), is(10));
assertThat(dbMessages.get(0).getCorrelationId(), is("id1"));
}

@Test
public void testFindMessagesByFilter_minimalOk() throws Exception {
// prepare message
Expand Down

0 comments on commit 114d301

Please sign in to comment.