Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge branch 'dev' into feature/add_workflow_completion_listener-AL
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex committed Nov 2, 2018
2 parents 8c544c6 + 591e4d3 commit 7ef5806
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 38 deletions.
Expand Up @@ -355,23 +355,20 @@ private void pollForTask(Worker worker) {
private void execute(Worker worker, Task task) {
String taskType = task.getTaskDefName();
try {

if(!worker.preAck(task)) {
logger.debug("Worker decided not to ack the task {}, taskId = {}", taskType, task.getTaskId());
return;
}

if (!taskClient.ack(task.getTaskId(), worker.getIdentity())) {
WorkflowTaskMetrics.incrementTaskAckFailedCount(worker.getTaskDefName());
logger.error("Ack failed for {}, taskId = {}", taskType, task.getTaskId());
returnTask(worker, task);
return;
}
logger.debug("Ack successful for {}, taskId = {}", taskType, task.getTaskId());

} catch (Exception e) {
logger.error(String.format("ack exception for task %s, taskId = %s in worker - %s", task.getTaskDefName(), task.getTaskId(), worker.getIdentity()), e);
WorkflowTaskMetrics.incrementTaskAckErrorCount(worker.getTaskDefName(), e);
returnTask(worker, task);
return;
}

Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -13,9 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
*
*/
package com.netflix.conductor.client.task;

import com.google.common.collect.ImmutableList;
Expand All @@ -24,6 +21,7 @@
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -54,7 +52,7 @@ public void testNoWorkersException() {

@Test
public void testThreadPool() {
Worker worker = Worker.create("test", (Task task)-> new TaskResult(task));
Worker worker = Worker.create("test", TaskResult::new);
WorkflowTaskCoordinator coordinator = new WorkflowTaskCoordinator.Builder().withWorkers(worker, worker, worker).withTaskClient(new TaskClient()).build();
assertEquals(-1, coordinator.getThreadCount()); //Not initialized yet
coordinator.init();
Expand Down Expand Up @@ -114,7 +112,7 @@ public void testTaskException() {
}

@Test
public void testReturnTaskWhenAckFailed() {
public void testNoOpWhenAckFailed() {
Worker worker = mock(Worker.class);
when(worker.getPollingInterval()).thenReturn(1000);
when(worker.getPollCount()).thenReturn(1);
Expand All @@ -135,27 +133,16 @@ public void testReturnTaskWhenAckFailed() {
testTask.setStatus(Task.Status.IN_PROGRESS);
when(client.batchPollTasksInDomain(anyString(), anyString(), anyString(), anyInt(), anyInt())).thenReturn(ImmutableList.of(testTask));
when(client.ack(anyString(), anyString())).thenReturn(false);
CountDownLatch latch = new CountDownLatch(1);

doAnswer(invocation -> {
Object[] args = invocation.getArguments();
TaskResult result = (TaskResult) args[0];
assertEquals(TaskResult.Status.IN_PROGRESS, result.getStatus());
latch.countDown();
return null;
}
).when(client).updateTask(any(), anyString());

coordinator.init();
Uninterruptibles.awaitUninterruptibly(latch);

// then worker.execute must not be called and task must be updated with IN_PROGRESS status
verify(worker, never()).execute(any());
Mockito.verify(client).updateTask(any(), anyString());
verify(client, never()).updateTask(any(), any());
}

@Test
public void testReturnTaskWhenAckThrowsException() {
public void testNoOpWhenAckThrowsException() {
Worker worker = mock(Worker.class);
when(worker.getPollingInterval()).thenReturn(1000);
when(worker.getPollCount()).thenReturn(1);
Expand All @@ -176,24 +163,12 @@ public void testReturnTaskWhenAckThrowsException() {
testTask.setStatus(Task.Status.IN_PROGRESS);
when(client.batchPollTasksInDomain(anyString(), anyString(), anyString(), anyInt(), anyInt())).thenReturn(ImmutableList.of(testTask));
when(client.ack(anyString(), anyString())).thenThrow(new RuntimeException("Ack failed"));
CountDownLatch latch = new CountDownLatch(1);

doAnswer(invocation -> {
assertEquals("test-worker-0", Thread.currentThread().getName());
Object[] args = invocation.getArguments();
TaskResult result = (TaskResult) args[0];
assertEquals(TaskResult.Status.IN_PROGRESS, result.getStatus());
latch.countDown();
return null;
}
).when(client).updateTask(any(), anyString());

coordinator.init();
Uninterruptibles.awaitUninterruptibly(latch);

// then worker.execute must not be called and task must be updated with IN_PROGRESS status
verify(worker, never()).execute(any());
Mockito.verify(client).updateTask(any(), anyString());
verify(client, never()).updateTask(any(), any());
}

@Test
Expand Down
Expand Up @@ -152,7 +152,16 @@ public String updateTask(TaskResult taskResult) {
public String ackTaskReceived(String taskId, String workerId) {
ServiceUtils.checkNotNullOrEmpty(taskId, "TaskId cannot be null or empty.");
LOGGER.debug("Ack received for task: {} from worker: {}", taskId, workerId);
return String.valueOf(executionService.ackTaskReceived(taskId));
boolean ackResult;
try {
ackResult = executionService.ackTaskReceived(taskId);
} catch (Exception e) {
// safe to ignore exception here, since the task will not be processed by the worker due to ack failure
// The task will eventually be available to be polled again after the unack timeout
LOGGER.error("Exception when trying to ack task {} from worker {}", taskId, workerId, e);
ackResult = false;
}
return String.valueOf(ackResult);
}

/**
Expand Down
Expand Up @@ -54,7 +54,7 @@ public List<AbstractModule> get() {
}

private List<AbstractModule> selectModulesToLoad() {
Configuration.DB database = null;
Configuration.DB database;
List<AbstractModule> modules = new ArrayList<>();

try {
Expand Down

0 comments on commit 7ef5806

Please sign in to comment.