From 1f6bad2992a45354bb87bb1f184f4d1b34023f7e Mon Sep 17 00:00:00 2001 From: Kishore Date: Tue, 18 Feb 2020 11:21:12 -0800 Subject: [PATCH] Feature/queue message postpone (#1537) * Adding message postpone to queueDAO. * Update QueueDAO setOffsetTime to resetOffsetTime. --- .../core/execution/WorkflowExecutor.java | 10 ++++------ .../com/netflix/conductor/dao/QueueDAO.java | 20 ++++++++++++++++--- .../conductor/dao/mysql/MySQLQueueDAO.java | 3 ++- .../dao/postgres/PostgresQueueDAO.java | 3 ++- .../dao/dynomite/queue/DynoQueueDAO.java | 4 ++-- 5 files changed, 27 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index d0d41228e9..9930aa18ac 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -436,7 +436,7 @@ public void resetCallbacksForInProgressTasks(String workflowId) { // Get tasks that have callbackAfterSeconds > 0 and set the callbackAfterSeconds to 0 for (Task task : workflow.getTasks()) { if (task.getCallbackAfterSeconds() > 0) { - if (queueDAO.setOffsetTime(QueueUtils.getQueueName(task), task.getTaskId(), 0)) { + if (queueDAO.resetOffsetTime(QueueUtils.getQueueName(task), task.getTaskId())) { task.setCallbackAfterSeconds(0); executionDAOFacade.updateTask(task); } @@ -859,12 +859,10 @@ public void updateTask(TaskResult taskResult) { break; case IN_PROGRESS: case SCHEDULED: - // put it back in queue based on callbackAfterSeconds + // postpone based on callbackAfterSeconds long callBack = taskResult.getCallbackAfterSeconds(); - queueDAO.remove(taskQueueName, task.getTaskId()); - LOGGER.debug("Task: {} removed from taskQueue: {} since the task status is {}", task, taskQueueName, task.getStatus().name()); - queueDAO.push(taskQueueName, task.getTaskId(), task.getWorkflowPriority(), callBack); // Milliseconds - LOGGER.debug("Task: {} pushed back to taskQueue: {} since the task status is {} with callbackAfterSeconds: {}", task, taskQueueName, task.getStatus().name(), callBack); + queueDAO.postpone(taskQueueName, task.getTaskId(), task.getWorkflowPriority(), callBack); + LOGGER.debug("Task: {} postponed in taskQueue: {} since the task status is {} with callbackAfterSeconds: {}", task, taskQueueName, task.getStatus().name(), callBack); break; default: break; diff --git a/core/src/main/java/com/netflix/conductor/dao/QueueDAO.java b/core/src/main/java/com/netflix/conductor/dao/QueueDAO.java index d905bbc943..a4c98a2462 100644 --- a/core/src/main/java/com/netflix/conductor/dao/QueueDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/QueueDAO.java @@ -167,11 +167,25 @@ default void processUnacks(String queueName) { } /** - * Sets the offset time without pulling out the message from the queue + * Resets the offsetTime on a message to 0, without pulling out the message from the queue * @param queueName name of the queue * @param id message id - * @param offsetTimeInSecond time in seconds, after which the message should be marked visible. (for timed queues) * @return true if the message is in queue and the change was successful else returns false */ - boolean setOffsetTime(String queueName, String id, long offsetTimeInSecond); + boolean resetOffsetTime(String queueName, String id); + + /** + * Postpone a given message with postponeDurationInSeconds, so that the message won't be available for further polls + * until expiry. + * By default, the message is removed and pushed backed with postponeDurationInSeconds to be backwards compatible. + * @param queueName + * @param messageId + * @param priority + * @param postponeDurationInSeconds + */ + default boolean postpone(String queueName, String messageId, int priority, long postponeDurationInSeconds) { + remove(queueName, messageId); + push(queueName, messageId, priority, postponeDurationInSeconds); + return true; + } } diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java index 492af776f0..5b40a4468b 100644 --- a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java +++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java @@ -171,7 +171,8 @@ public void processUnacks(String queueName) { } @Override - public boolean setOffsetTime(String queueName, String messageId, long offsetTimeInSecond) { + public boolean resetOffsetTime(String queueName, String messageId) { + long offsetTimeInSecond = 0; // Reset to 0 final String SET_OFFSET_TIME = "UPDATE queue_message SET offset_time_seconds = ?, deliver_on = TIMESTAMPADD(SECOND,?,CURRENT_TIMESTAMP) \n" + "WHERE queue_name = ? AND message_id = ?"; diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/dao/postgres/PostgresQueueDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/dao/postgres/PostgresQueueDAO.java index a317034e5c..d5d2ee2b2d 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/dao/postgres/PostgresQueueDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/dao/postgres/PostgresQueueDAO.java @@ -182,7 +182,8 @@ public void processUnacks(String queueName) { } @Override - public boolean setOffsetTime(String queueName, String messageId, long offsetTimeInSecond) { + public boolean resetOffsetTime(String queueName, String messageId) { + long offsetTimeInSecond = 0; // Reset to 0 final String SET_OFFSET_TIME = "UPDATE queue_message SET offset_time_seconds = ?, deliver_on = (current_timestamp + (? ||' seconds')::interval) \n" + "WHERE queue_name = ? AND message_id = ?"; diff --git a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/queue/DynoQueueDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/queue/DynoQueueDAO.java index 805cd44276..d1d3fb4a91 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/queue/DynoQueueDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/queue/DynoQueueDAO.java @@ -249,9 +249,9 @@ public void processUnacks(String queueName) { } @Override - public boolean setOffsetTime(String queueName, String id, long offsetTimeInSecond) { + public boolean resetOffsetTime(String queueName, String id) { DynoQueue queue = queues.get(queueName); - return queue.setTimeout(id, offsetTimeInSecond); + return queue.setTimeout(id, 0); } }