Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1366 from Netflix/cassandra_execution_dao_invalid…
Browse files Browse the repository at this point in the history
…_checks

fix exceptions and errors for invalid inputs in cassandra execution dao
  • Loading branch information
apanicker-nflx committed Oct 26, 2019
2 parents 1b5ca23 + f2aaf1e commit 5530e06
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.core.execution.ApplicationException.Code;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.util.Statements;
Expand Down Expand Up @@ -258,15 +259,19 @@ public Task getTask(String taskId) {
}
// TODO: implement for query against multiple shards

ResultSet resultSet = session.execute(selectTaskStatement.bind(UUID.fromString(workflowId), DEFAULT_SHARD_ID, taskId));
ResultSet resultSet = session
.execute(selectTaskStatement.bind(UUID.fromString(workflowId), DEFAULT_SHARD_ID, taskId));
return Optional.ofNullable(resultSet.one())
.map(row -> {
Task task = readValue(row.getString(PAYLOAD_KEY), Task.class);
recordCassandraDaoRequests("getTask", task.getTaskType(), task.getWorkflowType());
recordCassandraDaoPayloadSize("getTask", toJson(task).length(), task.getTaskType(), task.getWorkflowType());
return task;
})
.orElse(null);
.map(row -> {
Task task = readValue(row.getString(PAYLOAD_KEY), Task.class);
recordCassandraDaoRequests("getTask", task.getTaskType(), task.getWorkflowType());
recordCassandraDaoPayloadSize("getTask", toJson(task).length(), task.getTaskType(),
task.getWorkflowType());
return task;
})
.orElse(null);
} catch (ApplicationException ae) {
throw ae;
} catch (Exception e) {
Monitors.error(CLASS_NAME, "getTask");
String errorMsg = String.format("Error getting task by id: %s", taskId);
Expand Down Expand Up @@ -421,6 +426,11 @@ public Workflow getWorkflow(String workflowId, boolean includeTasks) {
return workflow;
} catch (ApplicationException e) {
throw e;
} catch (IllegalArgumentException e) {
Monitors.error(CLASS_NAME, "getWorkflow");
String errorMsg = String.format("Invalid workflow id: %s", workflowId);
LOGGER.error(errorMsg, e);
throw new ApplicationException(Code.INVALID_INPUT, errorMsg, e);
} catch (Exception e) {
Monitors.error(CLASS_NAME, "getWorkflow");
String errorMsg = String.format("Failed to get workflow: %s", workflowId);
Expand Down Expand Up @@ -634,8 +644,13 @@ String lookupWorkflowIdFromTaskId(String taskId) {
try {
ResultSet resultSet = session.execute(selectTaskLookupStatement.bind(UUID.fromString(taskId)));
return Optional.ofNullable(resultSet.one())
.map(row -> row.getUUID(WORKFLOW_ID_KEY).toString())
.orElse(null);
.map(row -> row.getUUID(WORKFLOW_ID_KEY).toString())
.orElse(null);
} catch (IllegalArgumentException iae) {
Monitors.error(CLASS_NAME, "lookupWorkflowIdFromTaskId");
String errorMsg = String.format("Invalid task id: %s", taskId);
LOGGER.error(errorMsg, iae);
throw new ApplicationException(Code.INVALID_INPUT, errorMsg, iae);
} catch (Exception e) {
Monitors.error(CLASS_NAME, "lookupWorkflowIdFromTaskId");
String errorMsg = String.format("Failed to lookup workflowId from taskId: %s", taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.HashMap;
import java.util.List;

import static com.netflix.conductor.core.execution.ApplicationException.Code.INVALID_INPUT;
import static com.netflix.conductor.dao.cassandra.CassandraBaseDAO.WorkflowMetadata;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -328,4 +329,31 @@ public void testTaskDefLimitCRUD() {
executionDAO.updateTaskDefLimit(newTask, true);
assertFalse(executionDAO.exceedsInProgressLimit(task));
}

@Test
public void testInvalid() {
Task task = null;
String id = "invalid_id";
try {
task = executionDAO.getTask(id);
} catch (ApplicationException e) {
assertEquals(INVALID_INPUT, e.getCode());
}
assertNull(task);

Workflow workflow = null;
try {
workflow = executionDAO.getWorkflow(id, true);
} catch (ApplicationException e) {
assertEquals(INVALID_INPUT, e.getCode());
}
assertNull(workflow);

id = IDGenerator.generate();
task = executionDAO.getTask(id);
assertNull(task);

workflow = executionDAO.getWorkflow(id, true);
assertNull(workflow);
}
}

0 comments on commit 5530e06

Please sign in to comment.