Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Feature/queue message postpone (#1537)
Browse files Browse the repository at this point in the history
* Adding message postpone to queueDAO.

* Update QueueDAO setOffsetTime to resetOffsetTime.
  • Loading branch information
kishorebanala committed Feb 18, 2020
1 parent c732b86 commit 1f6bad2
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 13 deletions.
Expand Up @@ -436,7 +436,7 @@ public void resetCallbacksForInProgressTasks(String workflowId) {
// Get tasks that have callbackAfterSeconds > 0 and set the callbackAfterSeconds to 0
for (Task task : workflow.getTasks()) {
if (task.getCallbackAfterSeconds() > 0) {
if (queueDAO.setOffsetTime(QueueUtils.getQueueName(task), task.getTaskId(), 0)) {
if (queueDAO.resetOffsetTime(QueueUtils.getQueueName(task), task.getTaskId())) {
task.setCallbackAfterSeconds(0);
executionDAOFacade.updateTask(task);
}
Expand Down Expand Up @@ -859,12 +859,10 @@ public void updateTask(TaskResult taskResult) {
break;
case IN_PROGRESS:
case SCHEDULED:
// put it back in queue based on callbackAfterSeconds
// postpone based on callbackAfterSeconds
long callBack = taskResult.getCallbackAfterSeconds();
queueDAO.remove(taskQueueName, task.getTaskId());
LOGGER.debug("Task: {} removed from taskQueue: {} since the task status is {}", task, taskQueueName, task.getStatus().name());
queueDAO.push(taskQueueName, task.getTaskId(), task.getWorkflowPriority(), callBack); // Milliseconds
LOGGER.debug("Task: {} pushed back to taskQueue: {} since the task status is {} with callbackAfterSeconds: {}", task, taskQueueName, task.getStatus().name(), callBack);
queueDAO.postpone(taskQueueName, task.getTaskId(), task.getWorkflowPriority(), callBack);
LOGGER.debug("Task: {} postponed in taskQueue: {} since the task status is {} with callbackAfterSeconds: {}", task, taskQueueName, task.getStatus().name(), callBack);
break;
default:
break;
Expand Down
20 changes: 17 additions & 3 deletions core/src/main/java/com/netflix/conductor/dao/QueueDAO.java
Expand Up @@ -167,11 +167,25 @@ default void processUnacks(String queueName) {
}

/**
* Sets the offset time without pulling out the message from the queue
* Resets the offsetTime on a message to 0, without pulling out the message from the queue
* @param queueName name of the queue
* @param id message id
* @param offsetTimeInSecond time in seconds, after which the message should be marked visible. (for timed queues)
* @return true if the message is in queue and the change was successful else returns false
*/
boolean setOffsetTime(String queueName, String id, long offsetTimeInSecond);
boolean resetOffsetTime(String queueName, String id);

/**
* Postpone a given message with postponeDurationInSeconds, so that the message won't be available for further polls
* until expiry.
* By default, the message is removed and pushed backed with postponeDurationInSeconds to be backwards compatible.
* @param queueName
* @param messageId
* @param priority
* @param postponeDurationInSeconds
*/
default boolean postpone(String queueName, String messageId, int priority, long postponeDurationInSeconds) {
remove(queueName, messageId);
push(queueName, messageId, priority, postponeDurationInSeconds);
return true;
}
}
Expand Up @@ -171,7 +171,8 @@ public void processUnacks(String queueName) {
}

@Override
public boolean setOffsetTime(String queueName, String messageId, long offsetTimeInSecond) {
public boolean resetOffsetTime(String queueName, String messageId) {
long offsetTimeInSecond = 0; // Reset to 0
final String SET_OFFSET_TIME = "UPDATE queue_message SET offset_time_seconds = ?, deliver_on = TIMESTAMPADD(SECOND,?,CURRENT_TIMESTAMP) \n"
+ "WHERE queue_name = ? AND message_id = ?";

Expand Down
Expand Up @@ -182,7 +182,8 @@ public void processUnacks(String queueName) {
}

@Override
public boolean setOffsetTime(String queueName, String messageId, long offsetTimeInSecond) {
public boolean resetOffsetTime(String queueName, String messageId) {
long offsetTimeInSecond = 0; // Reset to 0
final String SET_OFFSET_TIME = "UPDATE queue_message SET offset_time_seconds = ?, deliver_on = (current_timestamp + (? ||' seconds')::interval) \n"
+ "WHERE queue_name = ? AND message_id = ?";

Expand Down
Expand Up @@ -249,9 +249,9 @@ public void processUnacks(String queueName) {
}

@Override
public boolean setOffsetTime(String queueName, String id, long offsetTimeInSecond) {
public boolean resetOffsetTime(String queueName, String id) {
DynoQueue queue = queues.get(queueName);
return queue.setTimeout(id, offsetTimeInSecond);
return queue.setTimeout(id, 0);

}
}

0 comments on commit 1f6bad2

Please sign in to comment.