Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Cleanup to support generic queue implementations. #1481

Merged
merged 3 commits into from Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -638,8 +638,6 @@ public static class DeciderOutcome {

List<Task> tasksToBeUpdated = new LinkedList<>();

List<Task> tasksToBeRequeued = new LinkedList<>();

boolean isComplete;

private DeciderOutcome() {
Expand Down
Expand Up @@ -552,6 +552,8 @@ public void retry(String workflowId) {
// Update Workflow with new status.
// This should load Workflow from archive, if archived.
workflow.setStatus(WorkflowStatus.RUNNING);
// Add to decider queue
queueDAO.push(DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), config.getSweepFrequency());
executionDAOFacade.updateWorkflow(workflow);

// taskToBeRescheduled would set task `retried` to true, and hence it's important to updateTasks after obtaining task copy from taskToBeRescheduled.
Expand Down Expand Up @@ -952,6 +954,10 @@ public boolean decide(String workflowId) {
// This code will be removed in a future version.
workflow = metadataMapperService.populateWorkflowWithDefinitions(workflow);

if (workflow.getStatus().isTerminal()) {
return true;
}

try {
DeciderService.DeciderOutcome outcome = deciderService.decide(workflow);
if (outcome.isComplete) {
Expand All @@ -962,13 +968,8 @@ public boolean decide(String workflowId) {
List<Task> tasksToBeScheduled = outcome.tasksToBeScheduled;
setTaskDomains(tasksToBeScheduled, workflow);
List<Task> tasksToBeUpdated = outcome.tasksToBeUpdated;
List<Task> tasksToBeRequeued = outcome.tasksToBeRequeued;
boolean stateChanged = false;

if (!tasksToBeRequeued.isEmpty()) {
addTaskToQueue(tasksToBeRequeued);
}

tasksToBeScheduled = dedupAndAddTasks(workflow, tasksToBeScheduled);

for (Task task : outcome.tasksToBeScheduled) {
Expand Down Expand Up @@ -1005,7 +1006,6 @@ public boolean decide(String workflowId) {
if (!outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty()) {
executionDAOFacade.updateTasks(tasksToBeUpdated);
executionDAOFacade.updateWorkflow(workflow);
queueDAO.push(DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), config.getSweepFrequency());
}

stateChanged = scheduleTask(workflow, tasksToBeScheduled) || stateChanged;
Expand Down Expand Up @@ -1135,7 +1135,6 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
public void addTaskToQueue(Task task) {
// put in queue
String taskQueueName = QueueUtils.getQueueName(task);
queueDAO.remove(taskQueueName, task.getTaskId());
if (task.getCallbackAfterSeconds() > 0) {
queueDAO.push(taskQueueName, task.getTaskId(), task.getWorkflowPriority(), task.getCallbackAfterSeconds());
} else {
Expand Down Expand Up @@ -1191,7 +1190,6 @@ public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, int
}

LOGGER.debug("Executing {}/{}-{}", task.getTaskType(), task.getTaskId(), task.getStatus());
queueDAO.setUnackTimeout(QueueUtils.getQueueName(task), task.getTaskId(), systemTask.getRetryTimeInSecond() * 1000);
if (task.getStatus() == SCHEDULED || !systemTask.isAsyncComplete(task)) {
task.setPollCount(task.getPollCount() + 1);
executionDAOFacade.updateTask(task);
Expand All @@ -1201,7 +1199,7 @@ public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, int

// Stop polling for asyncComplete system tasks that are not in SCHEDULED state
if (systemTask.isAsyncComplete(task) && task.getStatus() != SCHEDULED) {
queueDAO.ack(QueueUtils.getQueueName(task), task.getTaskId());
queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId());
return;
}

Expand Down Expand Up @@ -1476,6 +1474,8 @@ private boolean rerunWF(String workflowId, String taskId, Map<String, Object> ta
if (workflowInput != null) {
workflow.setInput(workflowInput);
}
// Add to decider queue
queueDAO.push(DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), config.getSweepFrequency());
executionDAOFacade.updateWorkflow(workflow);
//update tasks in datastore to update workflow-tasks relationship for archived workflows
executionDAOFacade.updateTasks(workflow.getTasks());
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.netflix.conductor.core.execution.ApplicationException.Code;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.dao.RateLimitingDao;
import com.netflix.conductor.metrics.Monitors;
import java.io.IOException;
Expand All @@ -40,6 +41,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.netflix.conductor.core.execution.WorkflowExecutor.DECIDER_QUEUE;

/**
* Service that acts as a facade for accessing execution data from the {@link ExecutionDAO}, {@link RateLimitingDao} and {@link IndexDAO} storage layers
*/
Expand All @@ -51,6 +54,7 @@ public class ExecutionDAOFacade {
private static final String RAW_JSON_FIELD = "rawJSON";

private final ExecutionDAO executionDAO;
private final QueueDAO queueDAO;
private final IndexDAO indexDAO;
private final RateLimitingDao rateLimitingDao;
private final ObjectMapper objectMapper;
Expand All @@ -59,9 +63,14 @@ public class ExecutionDAOFacade {
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;

@Inject
public ExecutionDAOFacade(ExecutionDAO executionDAO, IndexDAO indexDAO, RateLimitingDao rateLimitingDao,
ObjectMapper objectMapper, Configuration config) {
public ExecutionDAOFacade(ExecutionDAO executionDAO,
QueueDAO queueDAO,
IndexDAO indexDAO,
RateLimitingDao rateLimitingDao,
ObjectMapper objectMapper,
Configuration config) {
this.executionDAO = executionDAO;
this.queueDAO = queueDAO;
this.indexDAO = indexDAO;
this.rateLimitingDao = rateLimitingDao;
this.objectMapper = objectMapper;
Expand Down Expand Up @@ -184,6 +193,8 @@ public long getPendingWorkflowCount(String workflowName) {
public String createWorkflow(Workflow workflow) {
workflow.setCreateTime(System.currentTimeMillis());
executionDAO.createWorkflow(workflow);
// Add to decider queue
queueDAO.push(DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), config.getSweepFrequency());
if (config.enableAsyncIndexing()) {
indexDAO.asyncIndexWorkflow(workflow);
} else {
Expand Down
64 changes: 44 additions & 20 deletions core/src/main/java/com/netflix/conductor/dao/QueueDAO.java
Expand Up @@ -21,15 +21,15 @@
import java.util.Map;

/**
*
*
* @author Viren
* DAO responsible for managing queuing for the tasks.
*
*/
public interface QueueDAO {

/**
*
*
* @param queueName name of the queue
* @param id message id
* @param offsetTimeInSecond time in seconds, after which the message should be marked visible. (for timed queues)
Expand All @@ -52,7 +52,7 @@ public interface QueueDAO {
void push(String queueName, List<Message> messages);

/**
*
*
* @param queueName Name of the queue
* @param id message id
* @param offsetTimeInSecond time in seconds, after which the message should be marked visible. (for timed queues)
Expand All @@ -71,46 +71,70 @@ public interface QueueDAO {
boolean pushIfNotExists(String queueName, String id, int priority, long offsetTimeInSecond);

/**
*
*
* @param queueName Name of the queue
* @param count number of messages to be read from the queue
* @param timeout timeout in milliseconds
* @return list of elements from the named queue
*/
List<String> pop(String queueName, int count, int timeout);


/**
*
* Pop the messages from queue with given leaseDuration.
* @param queueName Name of the queue
* @param count number of messages to be read from the queue
* @param timeout timeout in milliseconds
* @param leaseDurationSeconds Message lease duration in seconds
* @return list of elements from the named queue
*/
default List<String> pop(String queueName, int count, int timeout, long leaseDurationSeconds) {
return pop(queueName, count, timeout);
}


/**
*
* @param queueName Name of the queue
* @param count number of messages to be read from the queue
* @param timeout timeout in milliseconds
* @return list of elements from the named queue
*/
List<Message> pollMessages(String queueName, int count, int timeout);


/**
* Pop the messages from queue with given leaseDuration.
* @param queueName Name of the queue
* @param count number of messages to be read from the queue
* @param timeout timeout in milliseconds
* @param leaseDurationSeconds Message lease duration in seconds
* @return list of Messages from the named queue
*/
default List<Message> pollMessages(String queueName, int count, int timeout, long leaseDurationSeconds) {
return pollMessages(queueName, count, timeout);
}

/**
*
*
* @param queueName Name of the queue
* @param messageId Message id
*/
void remove(String queueName, String messageId);

/**
*
*
* @param queueName Name of the queue
* @return size of the queue
*/
int getSize(String queueName);

/**
*
*
* @param queueName Name of the queue
* @param messageId Message Id
* @return true if the message was found and ack'ed
*/
boolean ack(String queueName, String messageId);

/**
* Extend the lease of the unacknowledged message for longer period.
* @param queueName Name of the queue
Expand All @@ -121,33 +145,33 @@ public interface QueueDAO {
boolean setUnackTimeout(String queueName, String messageId, long unackTimeout);

/**
*
*
* @param queueName Name of the queue
*/
void flush(String queueName);

/**
*
*
* @return key : queue name, value: size of the queue
*/
Map<String, Long> queuesDetail();

/**
*
*
* @return key : queue name, value: map of shard name to size and unack queue size
*/
Map<String, Map<String, Map<String, Long>>> queuesDetailVerbose();

default void processUnacks(String queueName) {

}

/**
* Sets the offset time without pulling out the message from the queue
* Sets the offset time without pulling out the message from the queue
* @param queueName name of the queue
* @param id message id
* @param offsetTimeInSecond time in seconds, after which the message should be marked visible. (for timed queues)
* @return true if the message is in queue and the change was successful else returns false
* @return true if the message is in queue and the change was successful else returns false
*/
boolean setOffsetTime(String queueName, String id, long offsetTimeInSecond);
}
}
Expand Up @@ -683,7 +683,6 @@ public void testDecideSuccessfulWorkflow() {
assertEquals("s1", deciderOutcome.tasksToBeUpdated.get(0).getReferenceTaskName());
assertEquals(1, deciderOutcome.tasksToBeScheduled.size());
assertEquals("s2", deciderOutcome.tasksToBeScheduled.get(0).getReferenceTaskName());
assertEquals(0, deciderOutcome.tasksToBeRequeued.size());
assertFalse(deciderOutcome.isComplete);

Task task2 = new Task();
Expand All @@ -702,7 +701,6 @@ public void testDecideSuccessfulWorkflow() {
assertEquals(1, deciderOutcome.tasksToBeUpdated.size());
assertEquals("s2", deciderOutcome.tasksToBeUpdated.get(0).getReferenceTaskName());
assertEquals(0, deciderOutcome.tasksToBeScheduled.size());
assertEquals(0, deciderOutcome.tasksToBeRequeued.size());
assertTrue(deciderOutcome.isComplete);
}

Expand Down Expand Up @@ -733,7 +731,6 @@ public void testDecideWithLoopTask() {
assertEquals("s1", deciderOutcome.tasksToBeUpdated.get(0).getReferenceTaskName());
assertEquals(1, deciderOutcome.tasksToBeScheduled.size());
assertEquals("s2__1", deciderOutcome.tasksToBeScheduled.get(0).getReferenceTaskName());
assertEquals(0, deciderOutcome.tasksToBeRequeued.size());
assertFalse(deciderOutcome.isComplete);
}

Expand Down Expand Up @@ -769,7 +766,6 @@ public void testDecideFailedTask() {
assertEquals("s1", deciderOutcome.tasksToBeUpdated.get(0).getReferenceTaskName());
assertEquals(1, deciderOutcome.tasksToBeScheduled.size());
assertEquals("s1", deciderOutcome.tasksToBeScheduled.get(0).getReferenceTaskName());
assertEquals(0, deciderOutcome.tasksToBeRequeued.size());
assertFalse(deciderOutcome.isComplete);
}

Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.netflix.conductor.core.execution.TestDeciderService;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.dao.RateLimitingDao;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -50,6 +51,7 @@
public class ExecutionDAOFacadeTest {

private ExecutionDAO executionDAO;
private QueueDAO queueDAO;
private IndexDAO indexDAO;
private ObjectMapper objectMapper;
private ExecutionDAOFacade executionDAOFacade;
Expand All @@ -58,11 +60,12 @@ public class ExecutionDAOFacadeTest {
@Before
public void setUp() {
executionDAO = mock(ExecutionDAO.class);
queueDAO = mock(QueueDAO.class);
indexDAO = mock(IndexDAO.class);
rateLimitingDao = mock(RateLimitingDao.class);
objectMapper = new JsonMapperProvider().get();
Configuration configuration = new TestConfiguration();
executionDAOFacade = new ExecutionDAOFacade(executionDAO, indexDAO, rateLimitingDao, objectMapper, configuration);
executionDAOFacade = new ExecutionDAOFacade(executionDAO, queueDAO, indexDAO, rateLimitingDao, objectMapper, configuration);
}

@Test
Expand Down Expand Up @@ -136,4 +139,4 @@ public void testAddEventExecution() {
assertTrue(added);
verify(indexDAO, times(1)).asyncAddEventExecution(any());
}
}
}