Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

cassandra metadata dao modifications #1567

Merged
merged 2 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class CassandraMetadataDAO extends CassandraBaseDAO implements MetadataDA
private final PreparedStatement selectTaskDefStatement;
private final PreparedStatement selectAllTaskDefsStatement;

private final PreparedStatement updateWorkflowDefStatement;

private final PreparedStatement deleteWorkflowDefStatement;
private final PreparedStatement deleteWorkflowDefIndexStatement;
private final PreparedStatement deleteTaskDefStatement;
Expand Down Expand Up @@ -94,6 +96,9 @@ public CassandraMetadataDAO(Session session, ObjectMapper objectMapper, Cassandr
this.selectAllTaskDefsStatement = session.prepare(statements.getSelectAllTaskDefsStatement())
.setConsistencyLevel(config.getReadConsistencyLevel());

this.updateWorkflowDefStatement = session.prepare(statements.getUpdateWorkflowDefStatement())
.setConsistencyLevel(config.getWriteConsistencyLevel());

this.deleteWorkflowDefStatement = session.prepare(statements.getDeleteWorkflowDefStatement())
.setConsistencyLevel(config.getWriteConsistencyLevel());
this.deleteWorkflowDefIndexStatement = session.prepare(statements.getDeleteWorkflowDefIndexStatement())
Expand All @@ -108,13 +113,11 @@ public CassandraMetadataDAO(Session session, ObjectMapper objectMapper, Cassandr

@Override
public void createTaskDef(TaskDef taskDef) {
taskDef.setCreateTime(System.currentTimeMillis());
insertOrUpdateTaskDef(taskDef);
}

@Override
public String updateTaskDef(TaskDef taskDef) {
taskDef.setUpdateTime(System.currentTimeMillis());
return insertOrUpdateTaskDef(taskDef);
}

Expand Down Expand Up @@ -148,18 +151,47 @@ public void removeTaskDef(String name) {

@Override
public void createWorkflowDef(WorkflowDef workflowDef) {
if (workflowDefExists(workflowDef)) {
throw new ApplicationException(Code.CONFLICT, String.format("Workflow: %s, version: %s already exists!",
workflowDef.getName(), workflowDef.getVersion()));
try {
String workflowDefinition = toJson(workflowDef);
if (!session.execute(insertWorkflowDefStatement.bind(workflowDef.getName(), workflowDef.getVersion(),
workflowDefinition)).wasApplied()) {
throw new ApplicationException(Code.CONFLICT, String.format("Workflow: %s, version: %s already exists!",
workflowDef.getName(), workflowDef.getVersion()));
}
String workflowDefIndex = getWorkflowDefIndexValue(workflowDef.getName(), workflowDef.getVersion());
session.execute(insertWorkflowDefVersionIndexStatement.bind(workflowDefIndex, workflowDefIndex));
recordCassandraDaoRequests("createWorkflowDef");
recordCassandraDaoPayloadSize("createWorkflowDef", workflowDefinition.length(), "n/a",
workflowDef.getName());
} catch (ApplicationException ae) {
throw ae;
} catch (Exception e) {
Monitors.error(CLASS_NAME, "createWorkflowDef");
String errorMsg = String.format("Error creating workflow definition: %s/%d", workflowDef.getName(),
workflowDef.getVersion());
LOGGER.error(errorMsg, e);
throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg, e);
}
workflowDef.setCreateTime(System.currentTimeMillis());
insertOrUpdateWorkflowDef(workflowDef);
}

@Override
public void updateWorkflowDef(WorkflowDef workflowDef) {
workflowDef.setUpdateTime(System.currentTimeMillis());
insertOrUpdateWorkflowDef(workflowDef);
try {
String workflowDefinition = toJson(workflowDef);
session.execute(updateWorkflowDefStatement.bind(workflowDefinition, workflowDef.getName(),
workflowDef.getVersion()));
String workflowDefIndex = getWorkflowDefIndexValue(workflowDef.getName(), workflowDef.getVersion());
session.execute(insertWorkflowDefVersionIndexStatement.bind(workflowDefIndex, workflowDefIndex));
recordCassandraDaoRequests("updateWorkflowDef");
recordCassandraDaoPayloadSize("updateWorkflowDef", workflowDefinition.length(), "n/a",
workflowDef.getName());
} catch (Exception e) {
Monitors.error(CLASS_NAME, "updateWorkflowDef");
String errorMsg = String.format("Error updating workflow definition: %s/%d", workflowDef.getName(),
workflowDef.getVersion());
LOGGER.error(errorMsg, e);
throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg, e);
}
}

@Override
Expand Down Expand Up @@ -313,28 +345,7 @@ private String insertOrUpdateTaskDef(TaskDef taskDef) {
}

@VisibleForTesting
boolean workflowDefExists(WorkflowDef workflowDef) {
return getWorkflowDef(workflowDef.getName(), workflowDef.getVersion()).isPresent();
}

String getWorkflowDefIndexValue(String name, int version) {
return name + INDEX_DELIMITER + version;
}

private void insertOrUpdateWorkflowDef(WorkflowDef def) {
try {
String workflowDefinition = toJson(def);
session.execute(insertWorkflowDefStatement.bind(def.getName(), def.getVersion(), workflowDefinition));
String workflowDefIndex = getWorkflowDefIndexValue(def.getName(), def.getVersion());
session.execute(insertWorkflowDefVersionIndexStatement.bind(workflowDefIndex, workflowDefIndex));
recordCassandraDaoRequests("storeWorkflowDef");
recordCassandraDaoPayloadSize("storeWorkflwoDef", workflowDefinition.length(), "n/a", def.getName());
} catch (Exception e) {
Monitors.error(CLASS_NAME, "insertOrUpdateWorkflowDef");
String errorMsg = String.format("Error creating/updating workflow definition: %s/%d",
def.getName(), def.getVersion());
LOGGER.error(errorMsg, e);
throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
*
* <em>MetadataDAO</em>
* <ul>
* <li>INSERT INTO conductor.workflow_definitions (workflow_def_name,version,workflow_definition) VALUES (?,?,?); </li>
* <li>INSERT INTO conductor.workflow_definitions (workflow_def_name,version,workflow_definition) VALUES (?,?,?) IF NOT EXISTS; </li>
* <li>INSERT INTO conductor.workflow_defs_index (workflow_def_version_index,workflow_def_name_version, workflow_def_index_value) VALUES ('workflow_def_version_index',?,?); </li>
* <li>INSERT INTO conductor.task_definitions (task_defs,task_def_name,task_definition) VALUES ('task_defs',?,?); </li>
*
Expand All @@ -63,6 +63,8 @@
* <li>SELECT task_definition FROM conductor.task_definitions WHERE task_defs='task_defs' AND task_def_name=?; </li>
* <li>SELECT * FROM conductor.task_definitions WHERE task_defs=?; </li>
*
* <li>UPDATE conductor.workflow_definitions SET workflow_definition=? WHERE workflow_def_name=? AND version=?;</li>
*
* <li>DELETE FROM conductor.workflow_definitions WHERE workflow_def_name=? AND version=?; </li>
* <li>DELETE FROM conductor.workflow_defs_index WHERE workflow_def_version_index=? AND workflow_def_name_version=?; </li>
* <li>DELETE FROM conductor.task_definitions WHERE task_defs='task_defs' AND task_def_name=?; </li>
Expand Down Expand Up @@ -118,6 +120,7 @@ public String getInsertWorkflowDefStatement() {
.value(WORKFLOW_DEF_NAME_KEY, bindMarker())
.value(WORKFLOW_VERSION_KEY, bindMarker())
.value(WORKFLOW_DEFINITION_KEY, bindMarker())
.ifNotExists()
.getQueryString();
}

Expand Down Expand Up @@ -202,6 +205,19 @@ public String getSelectAllTaskDefsStatement() {
.getQueryString();
}

// Update Statement

/**
* @return cql query statement to update a workflow definitinos in the "workflow_definitions" table
*/
public String getUpdateWorkflowDefStatement() {
return QueryBuilder.update(keyspace, TABLE_WORKFLOW_DEFS)
.with(set(WORKFLOW_DEFINITION_KEY, bindMarker()))
.where(eq(WORKFLOW_DEF_NAME_KEY, bindMarker()))
.and(eq(WORKFLOW_VERSION_KEY, bindMarker()))
.getQueryString();
}

// Delete Statements

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.dao.cassandra;

import static com.netflix.conductor.core.execution.ApplicationException.Code.INVALID_INPUT;
import static com.netflix.conductor.util.Constants.TABLE_WORKFLOW_DEFS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class CassandraDAOTest {
private final ObjectMapper objectMapper = new JsonMapperProvider().get();

private static EmbeddedCassandra embeddedCassandra;
private Session session;

private CassandraMetadataDAO metadataDAO;
private CassandraExecutionDAO executionDAO;
Expand All @@ -68,7 +70,7 @@ public static void init() throws Exception {

@Before
public void setUp() {
Session session = embeddedCassandra.getSession();
session = embeddedCassandra.getSession();
Statements statements = new Statements(testConfiguration);
metadataDAO = new CassandraMetadataDAO(session, objectMapper, testConfiguration, statements);
executionDAO = new CassandraExecutionDAO(session, objectMapper, testConfiguration, statements);
Expand All @@ -81,7 +83,7 @@ public static void teardown() {
}

@Test
public void testWorkflowDefCRUD() {
public void testWorkflowDefCRUD() throws Exception {
String name = "workflow_def_1";
int version = 1;

Expand All @@ -90,33 +92,22 @@ public void testWorkflowDefCRUD() {
workflowDef.setVersion(version);
workflowDef.setOwnerEmail("test@junit.com");

// register the workflow definition
metadataDAO.createWorkflowDef(workflowDef);

// check if workflow definition exists
assertTrue(metadataDAO.workflowDefExists(workflowDef));
// create workflow definition explicitly in test
// since, embedded Cassandra server does not support LWT required for this API.
addWorkflowDefinition(workflowDef);

// fetch the workflow definition
Optional<WorkflowDef> defOptional = metadataDAO.getWorkflowDef(name, version);
assertTrue(defOptional.isPresent());
assertEquals(workflowDef, defOptional.get());

// get all workflow definitions
List<WorkflowDef> workflowDefs = metadataDAO.getAllWorkflowDefs();
assertNotNull(workflowDefs);
assertEquals(1, workflowDefs.size());
assertEquals(workflowDef, workflowDefs.get(0));

// register a higher version
int higherVersion = 2;
workflowDef.setVersion(higherVersion);
workflowDef.setDescription("higher version");

// register the higher version definition
metadataDAO.createWorkflowDef(workflowDef);

// check if workflow definition exists
assertTrue(metadataDAO.workflowDefExists(workflowDef));
addWorkflowDefinition(workflowDef);

// fetch the higher version
defOptional = metadataDAO.getWorkflowDef(name, higherVersion);
Expand All @@ -128,11 +119,6 @@ public void testWorkflowDefCRUD() {
assertTrue(defOptional.isPresent());
assertEquals(workflowDef, defOptional.get());

// get all workflow definitions
workflowDefs = metadataDAO.getAllWorkflowDefs();
assertNotNull(workflowDefs);
assertEquals(2, workflowDefs.size());

// modify the definition
workflowDef.setOwnerEmail("junit@test.com");
metadataDAO.updateWorkflowDef(workflowDef);
Expand All @@ -142,23 +128,10 @@ public void testWorkflowDefCRUD() {
assertTrue(defOptional.isPresent());
assertEquals(workflowDef, defOptional.get());

// register same definition again
expectedException.expect(ApplicationException.class);
expectedException.expectMessage("Workflow: workflow_def_1, version: 2 already exists!");
metadataDAO.createWorkflowDef(workflowDef);

// delete workflow def
metadataDAO.removeWorkflowDef(name, higherVersion);
defOptional = metadataDAO.getWorkflowDef(name, higherVersion);
assertFalse(defOptional.isPresent());

// get all workflow definitions
workflowDefs = metadataDAO.getAllWorkflowDefs();
assertNotNull(workflowDefs);
assertEquals(1, workflowDefs.size());

// check if workflow definition exists
assertFalse(metadataDAO.workflowDefExists(workflowDef));
}

@Test
Expand Down Expand Up @@ -554,4 +527,15 @@ public void testEventHandlerCRUD() {
assertNotNull(handlers);
assertEquals(1, handlers.size());
}

private void addWorkflowDefinition(WorkflowDef workflowDef) throws Exception {
//INSERT INTO conductor.workflow_definitions (workflow_def_name,version,workflow_definition) VALUES (?,?,?);
String table = testConfiguration.getCassandraKeyspace() + "." + TABLE_WORKFLOW_DEFS;
String queryString = "UPDATE " + table
+ " SET workflow_definition='" + objectMapper.writeValueAsString(workflowDef)
+ "' WHERE workflow_def_name='" + workflowDef.getName()
+ "' AND version=" + workflowDef.getVersion()
+ ";";
session.execute(queryString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
*/
package com.netflix.conductor.util;

import static org.junit.Assert.assertEquals;

import com.netflix.conductor.config.TestConfiguration;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class StatementsTest {

private final TestConfiguration testConfiguration = new TestConfiguration();
Expand All @@ -30,7 +30,7 @@ public void setUp() {

@Test
public void testGetInsertWorkflowDefStatement() {
String statement = "INSERT INTO junit.workflow_definitions (workflow_def_name,version,workflow_definition) VALUES (?,?,?);";
String statement = "INSERT INTO junit.workflow_definitions (workflow_def_name,version,workflow_definition) VALUES (?,?,?) IF NOT EXISTS;";
assertEquals(statement, statements.getInsertWorkflowDefStatement());
}

Expand Down Expand Up @@ -77,6 +77,12 @@ public void testGetSelectAllTaskDefsStatement() {
assertEquals(statement, statements.getSelectAllTaskDefsStatement());
}

@Test
public void testGetUpdateWorkflowDefStatement() {
String statement = "UPDATE junit.workflow_definitions SET workflow_definition=? WHERE workflow_def_name=? AND version=?;";
assertEquals(statement, statements.getUpdateWorkflowDefStatement());
}

@Test
public void testGetDeleteWorkflowDefStatement() {
String statement = "DELETE FROM junit.workflow_definitions WHERE workflow_def_name=? AND version=?;";
Expand Down