Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Cleanup to support generic queue implementations. #1481

Merged
merged 3 commits into from Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -638,8 +638,6 @@ public static class DeciderOutcome {

List<Task> tasksToBeUpdated = new LinkedList<>();

List<Task> tasksToBeRequeued = new LinkedList<>();

boolean isComplete;

private DeciderOutcome() {
Expand Down
Expand Up @@ -962,13 +962,8 @@ public boolean decide(String workflowId) {
List<Task> tasksToBeScheduled = outcome.tasksToBeScheduled;
setTaskDomains(tasksToBeScheduled, workflow);
List<Task> tasksToBeUpdated = outcome.tasksToBeUpdated;
List<Task> tasksToBeRequeued = outcome.tasksToBeRequeued;
boolean stateChanged = false;

if (!tasksToBeRequeued.isEmpty()) {
addTaskToQueue(tasksToBeRequeued);
}

tasksToBeScheduled = dedupAndAddTasks(workflow, tasksToBeScheduled);

for (Task task : outcome.tasksToBeScheduled) {
Expand Down Expand Up @@ -1005,7 +1000,6 @@ public boolean decide(String workflowId) {
if (!outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty()) {
executionDAOFacade.updateTasks(tasksToBeUpdated);
executionDAOFacade.updateWorkflow(workflow);
queueDAO.push(DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), config.getSweepFrequency());
}

stateChanged = scheduleTask(workflow, tasksToBeScheduled) || stateChanged;
Expand Down Expand Up @@ -1135,7 +1129,6 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
public void addTaskToQueue(Task task) {
// put in queue
String taskQueueName = QueueUtils.getQueueName(task);
queueDAO.remove(taskQueueName, task.getTaskId());
if (task.getCallbackAfterSeconds() > 0) {
queueDAO.push(taskQueueName, task.getTaskId(), task.getWorkflowPriority(), task.getCallbackAfterSeconds());
} else {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.netflix.conductor.core.execution.ApplicationException.Code;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.dao.RateLimitingDao;
import com.netflix.conductor.metrics.Monitors;
import java.io.IOException;
Expand All @@ -40,6 +41,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.netflix.conductor.core.execution.WorkflowExecutor.DECIDER_QUEUE;

/**
* Service that acts as a facade for accessing execution data from the {@link ExecutionDAO}, {@link RateLimitingDao} and {@link IndexDAO} storage layers
*/
Expand All @@ -51,6 +54,7 @@ public class ExecutionDAOFacade {
private static final String RAW_JSON_FIELD = "rawJSON";

private final ExecutionDAO executionDAO;
private final QueueDAO queueDAO;
private final IndexDAO indexDAO;
private final RateLimitingDao rateLimitingDao;
private final ObjectMapper objectMapper;
Expand All @@ -59,9 +63,14 @@ public class ExecutionDAOFacade {
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;

@Inject
public ExecutionDAOFacade(ExecutionDAO executionDAO, IndexDAO indexDAO, RateLimitingDao rateLimitingDao,
ObjectMapper objectMapper, Configuration config) {
public ExecutionDAOFacade(ExecutionDAO executionDAO,
QueueDAO queueDAO,
IndexDAO indexDAO,
RateLimitingDao rateLimitingDao,
ObjectMapper objectMapper,
Configuration config) {
this.executionDAO = executionDAO;
this.queueDAO = queueDAO;
this.indexDAO = indexDAO;
this.rateLimitingDao = rateLimitingDao;
this.objectMapper = objectMapper;
Expand Down Expand Up @@ -184,6 +193,8 @@ public long getPendingWorkflowCount(String workflowName) {
public String createWorkflow(Workflow workflow) {
workflow.setCreateTime(System.currentTimeMillis());
executionDAO.createWorkflow(workflow);
// Add to decider queue
queueDAO.push(DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), config.getSweepFrequency());
if (config.enableAsyncIndexing()) {
indexDAO.asyncIndexWorkflow(workflow);
} else {
Expand Down
64 changes: 44 additions & 20 deletions core/src/main/java/com/netflix/conductor/dao/QueueDAO.java
Expand Up @@ -21,15 +21,15 @@
import java.util.Map;

/**
*
*
* @author Viren
* DAO responsible for managing queuing for the tasks.
*
*/
public interface QueueDAO {

/**
*
*
* @param queueName name of the queue
* @param id message id
* @param offsetTimeInSecond time in seconds, after which the message should be marked visible. (for timed queues)
Expand All @@ -52,7 +52,7 @@ public interface QueueDAO {
void push(String queueName, List<Message> messages);

/**
*
*
* @param queueName Name of the queue
* @param id message id
* @param offsetTimeInSecond time in seconds, after which the message should be marked visible. (for timed queues)
Expand All @@ -71,46 +71,70 @@ public interface QueueDAO {
boolean pushIfNotExists(String queueName, String id, int priority, long offsetTimeInSecond);

/**
*
*
* @param queueName Name of the queue
* @param count number of messages to be read from the queue
* @param timeout timeout in milliseconds
* @return list of elements from the named queue
*/
List<String> pop(String queueName, int count, int timeout);


/**
*
* Pop the messages from queue with given leaseDuration.
* @param queueName Name of the queue
* @param count number of messages to be read from the queue
* @param timeout timeout in milliseconds
* @param leaseDurationSeconds Message lease duration in seconds
* @return list of elements from the named queue
*/
default List<String> pop(String queueName, int count, int timeout, long leaseDurationSeconds) {
return pop(queueName, count, timeout);
}


/**
*
* @param queueName Name of the queue
* @param count number of messages to be read from the queue
* @param timeout timeout in milliseconds
* @return list of elements from the named queue
*/
List<Message> pollMessages(String queueName, int count, int timeout);


/**
* Pop the messages from queue with given leaseDuration.
* @param queueName Name of the queue
* @param count number of messages to be read from the queue
* @param timeout timeout in milliseconds
* @param leaseDurationSeconds Message lease duration in seconds
* @return list of Messages from the named queue
*/
default List<Message> pollMessages(String queueName, int count, int timeout, long leaseDurationSeconds) {
return pollMessages(queueName, count, timeout);
}

/**
*
*
* @param queueName Name of the queue
* @param messageId Message id
*/
void remove(String queueName, String messageId);

/**
*
*
* @param queueName Name of the queue
* @return size of the queue
*/
int getSize(String queueName);

/**
*
*
* @param queueName Name of the queue
* @param messageId Message Id
* @return true if the message was found and ack'ed
*/
boolean ack(String queueName, String messageId);

/**
* Extend the lease of the unacknowledged message for longer period.
* @param queueName Name of the queue
Expand All @@ -121,33 +145,33 @@ public interface QueueDAO {
boolean setUnackTimeout(String queueName, String messageId, long unackTimeout);

/**
*
*
* @param queueName Name of the queue
*/
void flush(String queueName);

/**
*
*
* @return key : queue name, value: size of the queue
*/
Map<String, Long> queuesDetail();

/**
*
*
* @return key : queue name, value: map of shard name to size and unack queue size
*/
Map<String, Map<String, Map<String, Long>>> queuesDetailVerbose();

default void processUnacks(String queueName) {

}

/**
* Sets the offset time without pulling out the message from the queue
* Sets the offset time without pulling out the message from the queue
* @param queueName name of the queue
* @param id message id
* @param offsetTimeInSecond time in seconds, after which the message should be marked visible. (for timed queues)
* @return true if the message is in queue and the change was successful else returns false
* @return true if the message is in queue and the change was successful else returns false
*/
boolean setOffsetTime(String queueName, String id, long offsetTimeInSecond);
}
}
Expand Up @@ -683,7 +683,6 @@ public void testDecideSuccessfulWorkflow() {
assertEquals("s1", deciderOutcome.tasksToBeUpdated.get(0).getReferenceTaskName());
assertEquals(1, deciderOutcome.tasksToBeScheduled.size());
assertEquals("s2", deciderOutcome.tasksToBeScheduled.get(0).getReferenceTaskName());
assertEquals(0, deciderOutcome.tasksToBeRequeued.size());
assertFalse(deciderOutcome.isComplete);

Task task2 = new Task();
Expand All @@ -702,7 +701,6 @@ public void testDecideSuccessfulWorkflow() {
assertEquals(1, deciderOutcome.tasksToBeUpdated.size());
assertEquals("s2", deciderOutcome.tasksToBeUpdated.get(0).getReferenceTaskName());
assertEquals(0, deciderOutcome.tasksToBeScheduled.size());
assertEquals(0, deciderOutcome.tasksToBeRequeued.size());
assertTrue(deciderOutcome.isComplete);
}

Expand Down Expand Up @@ -733,7 +731,6 @@ public void testDecideWithLoopTask() {
assertEquals("s1", deciderOutcome.tasksToBeUpdated.get(0).getReferenceTaskName());
assertEquals(1, deciderOutcome.tasksToBeScheduled.size());
assertEquals("s2__1", deciderOutcome.tasksToBeScheduled.get(0).getReferenceTaskName());
assertEquals(0, deciderOutcome.tasksToBeRequeued.size());
assertFalse(deciderOutcome.isComplete);
}

Expand Down Expand Up @@ -769,7 +766,6 @@ public void testDecideFailedTask() {
assertEquals("s1", deciderOutcome.tasksToBeUpdated.get(0).getReferenceTaskName());
assertEquals(1, deciderOutcome.tasksToBeScheduled.size());
assertEquals("s1", deciderOutcome.tasksToBeScheduled.get(0).getReferenceTaskName());
assertEquals(0, deciderOutcome.tasksToBeRequeued.size());
assertFalse(deciderOutcome.isComplete);
}

Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.netflix.conductor.core.execution.TestDeciderService;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.dao.RateLimitingDao;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -50,6 +51,7 @@
public class ExecutionDAOFacadeTest {

private ExecutionDAO executionDAO;
private QueueDAO queueDAO;
private IndexDAO indexDAO;
private ObjectMapper objectMapper;
private ExecutionDAOFacade executionDAOFacade;
Expand All @@ -58,11 +60,12 @@ public class ExecutionDAOFacadeTest {
@Before
public void setUp() {
executionDAO = mock(ExecutionDAO.class);
queueDAO = mock(QueueDAO.class);
indexDAO = mock(IndexDAO.class);
rateLimitingDao = mock(RateLimitingDao.class);
objectMapper = new JsonMapperProvider().get();
Configuration configuration = new TestConfiguration();
executionDAOFacade = new ExecutionDAOFacade(executionDAO, indexDAO, rateLimitingDao, objectMapper, configuration);
executionDAOFacade = new ExecutionDAOFacade(executionDAO, queueDAO, indexDAO, rateLimitingDao, objectMapper, configuration);
}

@Test
Expand Down Expand Up @@ -136,4 +139,4 @@ public void testAddEventExecution() {
assertTrue(added);
verify(indexDAO, times(1)).asyncAddEventExecution(any());
}
}
}