Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1826 from srjinatl/fix/postgres-workflow-dao
Browse files Browse the repository at this point in the history
Postgres Workflow Metadata Update/Remove
  • Loading branch information
apanicker-nflx committed Aug 25, 2020
2 parents 1243bbb + 12837d1 commit 464a095
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 21 deletions.
Expand Up @@ -141,11 +141,18 @@ public Optional<WorkflowDef> getWorkflowDef(String name, int version) {
public void removeWorkflowDef(String name, Integer version) {
final String DELETE_WORKFLOW_QUERY = "DELETE from meta_workflow_def WHERE name = ? AND version = ?";

executeWithTransaction(DELETE_WORKFLOW_QUERY, q -> {
if (!q.addParameter(name).addParameter(version).executeDelete()) {
throw new ApplicationException(ApplicationException.Code.NOT_FOUND,
String.format("No such workflow definition: %s version: %d", name, version));
}
withTransaction( tx -> {
// remove specified workflow
execute(tx, DELETE_WORKFLOW_QUERY, q -> {
if (!q.addParameter(name).addParameter(version).executeDelete()) {
throw new ApplicationException(ApplicationException.Code.NOT_FOUND,
String.format("No such workflow definition: %s version: %d", name, version));
}
});
// reset latest version based on remaining rows for this workflow
Optional<Integer> maxVersion = getLatestVersion(tx, name);
maxVersion.ifPresent(newVersion -> updateLatestVersion(tx, name, newVersion));

});
}

Expand Down Expand Up @@ -314,18 +321,18 @@ private Boolean workflowExists(Connection connection, WorkflowDef def) {
}

/**
* Return the latest version that exists for the provided {@link WorkflowDef}.
* Return the latest version that exists for the provided {@code name}.
*
* @param tx The {@link Connection} to use for queries.
* @param def The {@code WorkflowDef} to check for.
* @param name The {@code name} to check for.
* @return {@code Optional.empty()} if no versions exist, otherwise the max {@link WorkflowDef#getVersion} found.
*/
private Optional<Integer> getLatestVersion(Connection tx, WorkflowDef def) {
private Optional<Integer> getLatestVersion(Connection tx, String name) {
final String GET_LATEST_WORKFLOW_DEF_VERSION = "SELECT max(version) AS version FROM meta_workflow_def WHERE " +
"name = ?";

Integer val = query(tx, GET_LATEST_WORKFLOW_DEF_VERSION, q -> {
q.addParameter(def.getName());
q.addParameter(name);
return q.executeAndFetch(rs -> {
if (!rs.next()) {
return null;
Expand All @@ -339,25 +346,25 @@ private Optional<Integer> getLatestVersion(Connection tx, WorkflowDef def) {
}

/**
* Update the latest version for the {@link WorkflowDef} to the version provided in {@literal def}.
* Update the latest version for the workflow with name {@code WorkflowDef} to the version provided in {@literal version}.
*
* @param tx The {@link Connection} to use for queries.
* @param def The {@code WorkflowDef} data to update to.
* @param version The new latest {@code version} value.
*/
private void updateLatestVersion(Connection tx, WorkflowDef def) {
private void updateLatestVersion(Connection tx, String name, int version) {
final String UPDATE_WORKFLOW_DEF_LATEST_VERSION_QUERY = "UPDATE meta_workflow_def SET latest_version = ? " +
"WHERE name = ?";

execute(tx, UPDATE_WORKFLOW_DEF_LATEST_VERSION_QUERY,
q -> q.addParameter(def.getVersion()).addParameter(def.getName()).executeUpdate());
q -> q.addParameter(version).addParameter(name).executeUpdate());
}

private void insertOrUpdateWorkflowDef(Connection tx, WorkflowDef def) {
final String INSERT_WORKFLOW_DEF_QUERY = "INSERT INTO meta_workflow_def (name, version, json_data) VALUES (?," +
" ?, ?)";

Optional<Integer> version = getLatestVersion(tx, def);
if (!version.isPresent() || version.get() < def.getVersion()) {
Optional<Integer> version = getLatestVersion(tx, def.getName());
if (!workflowExists(tx, def)) {
execute(tx, INSERT_WORKFLOW_DEF_QUERY, q -> q.addParameter(def.getName())
.addParameter(def.getVersion())
.addJsonParameter(def)
Expand All @@ -375,8 +382,12 @@ private void insertOrUpdateWorkflowDef(Connection tx, WorkflowDef def) {
.addParameter(def.getVersion())
.executeUpdate());
}
int maxVersion = def.getVersion();
if (version.isPresent() && version.get() > def.getVersion()) {
maxVersion = version.get();
}

updateLatestVersion(tx, def);
updateLatestVersion(tx, def.getName(), maxVersion);
}

/**
Expand Down
Expand Up @@ -92,7 +92,7 @@ public void testWorkflowDefOperations() throws Exception {
WorkflowDef found = dao.getWorkflowDef("test", 1).get();
assertTrue(EqualsBuilder.reflectionEquals(def, found));

def.setVersion(2);
def.setVersion(3);
dao.createWorkflowDef(def);

all = dao.getAllWorkflowDefs();
Expand All @@ -104,21 +104,21 @@ public void testWorkflowDefOperations() throws Exception {
found = dao.getLatestWorkflowDef(def.getName()).get();
assertEquals(def.getName(), found.getName());
assertEquals(def.getVersion(), found.getVersion());
assertEquals(2, found.getVersion());
assertEquals(3, found.getVersion());

all = dao.getAllLatest();
assertNotNull(all);
assertEquals(1, all.size());
assertEquals("test", all.get(0).getName());
assertEquals(2, all.get(0).getVersion());
assertEquals(3, all.get(0).getVersion());

all = dao.getAllVersions(def.getName());
assertNotNull(all);
assertEquals(2, all.size());
assertEquals("test", all.get(0).getName());
assertEquals("test", all.get(1).getName());
assertEquals(1, all.get(0).getVersion());
assertEquals(2, all.get(1).getVersion());
assertEquals(3, all.get(1).getVersion());

def.setDescription("updated");
dao.updateWorkflowDef(def);
Expand All @@ -130,9 +130,28 @@ public void testWorkflowDefOperations() throws Exception {
assertEquals(1, allnames.size());
assertEquals(def.getName(), allnames.get(0));

def.setVersion(2);
dao.createWorkflowDef(def);

found = dao.getLatestWorkflowDef(def.getName()).get();
assertEquals(def.getName(), found.getName());
assertEquals(3, found.getVersion());

dao.removeWorkflowDef("test", 3);
Optional<WorkflowDef> deleted = dao.getWorkflowDef("test", 3);
assertFalse(deleted.isPresent());

found = dao.getLatestWorkflowDef(def.getName()).get();
assertEquals(def.getName(), found.getName());
assertEquals(2, found.getVersion());

dao.removeWorkflowDef("test", 1);
Optional<WorkflowDef> deleted = dao.getWorkflowDef("test", 1);
deleted = dao.getWorkflowDef("test", 1);
assertFalse(deleted.isPresent());

found = dao.getLatestWorkflowDef(def.getName()).get();
assertEquals(def.getName(), found.getName());
assertEquals(2, found.getVersion());
}

@Test
Expand Down

0 comments on commit 464a095

Please sign in to comment.