Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support querying workflows from both main tables and archive. #381

Merged
merged 50 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
73598ae
Support querying workflows from both main tables and archive.
gmokki Feb 19, 2020
a0de2bd
Child workflows can be in different tables than the parent workflow
gmokki Feb 20, 2020
0ee87fe
Remove unused import
gmokki Feb 20, 2020
66e8cb7
Stream query workflows results out of rest api. (#382)
gmokki Feb 22, 2020
0e455e8
Support querying workflows from both main tables and archive. Stream …
gmokki Feb 19, 2020
e893985
Expose archived bit over rest api
gmokki Feb 22, 2020
9c15880
Rename field
gmokki Feb 23, 2020
b1324d2
Change default state of queryArchive in rest api, and instead make ex…
gmokki Feb 23, 2020
e731c78
Fix test
gmokki Feb 23, 2020
ceee9c3
Fix tests
gmokki Feb 23, 2020
cab7c12
Add missing double arguments to jdbc union
gmokki Feb 23, 2020
d051853
Add back change descriptions
gmokki Apr 18, 2020
91c16a2
Replace hardcoded wait with loop
gmokki Apr 18, 2020
85487a2
Review fixes
gmokki Apr 27, 2020
3b554fd
More wording improvements
gmokki Apr 27, 2020
bc2158e
Move default query-archived state to constant(s) in base class
gmokki Apr 27, 2020
ca823e4
Review refactorings
gmokki Apr 27, 2020
0463178
Review fixes
gmokki Apr 27, 2020
55c147f
Fix test failures after rebase
gmokki Jan 23, 2022
d5f574a
Improve javadoc
gmokki Jan 23, 2022
18dae84
Comment fixes
gmokki Jan 23, 2022
1ffc32b
Use while loop with configured timeout instead of fixed for loop
gmokki Jan 23, 2022
e6b9ff5
Make archive queries configureable in config.js
gmokki Jan 23, 2022
1bf212c
Update CHANGELOG.md
gmokki Jan 24, 2022
85c410d
Update CHANGELOG.md
gmokki Jan 24, 2022
c4fa421
Update CHANGELOG.md
gmokki Jan 24, 2022
40f0b95
Update nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListW…
gmokki Jan 24, 2022
6bdce8e
Update nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceB…
gmokki Jan 24, 2022
b3e56b2
Update nflow-explorer/src/config.js
gmokki Jan 24, 2022
4aa31c8
Update nflow-engine/src/main/java/io/nflow/engine/workflow/instance/W…
gmokki Jan 24, 2022
94f3851
Update nflow-engine/src/main/java/io/nflow/engine/workflow/instance/W…
gmokki Jan 24, 2022
b86c696
Apply suggestions from code review
gmokki Jan 24, 2022
3fbf7d6
Apply suggestions from code review
gmokki Jan 24, 2022
4724e9e
Reuse same WorkflowInstanceRowMapper and WorkflowInstanceActionRowMapper
gmokki Jan 24, 2022
91638a9
Refactor TablePrefix usage to be simpler
gmokki Jan 24, 2022
aeb90dd
Renable TablePrefix to NflowTables
gmokki Jan 24, 2022
2ace977
Remove internal method wrapper
gmokki Jan 24, 2022
da87b0f
Split method that was starting to get too big
gmokki Jan 24, 2022
c02ad70
refactor
efonsell Jan 24, 2022
147ace4
rename enum values, fix and add tests
efonsell Jan 24, 2022
6a3b10e
refactor
efonsell Jan 24, 2022
4048957
Fix wherecondition
gmokki Jan 24, 2022
14571d4
Some test fixes
gmokki Jan 24, 2022
0191977
revert some refactoring to fix tests, clean up sql a bit
efonsell Jan 24, 2022
0147791
move string manipulation from dao to enum, cleanup
efonsell Jan 24, 2022
61dd269
make sql segments more readable
efonsell Jan 24, 2022
aa76298
restore query.stateVariableKey related logic to one place
efonsell Jan 24, 2022
71669e6
move executorGroup condition adding back from helper and remove the d…
efonsell Jan 24, 2022
c59b1aa
Merge branch 'master' into query-archived
efonsell Jan 24, 2022
a8a984a
return archived=true/false is queryArchive is true, otherwise return …
efonsell Jan 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- BREAKING CHANGE: Remove `ListWorkflowDefinitionResponse.TransitionDelays.immediate` field, it is not used by nFlow anymore.
- Enable maintenance (archiving and deleting old workflow instances) by default.
- Enable workflow instance history clean-up (deleting old actions and state variables) by default.
- Add support to query also archived workflow instances.

**Details**
- `nflow-engine`
Expand All @@ -24,6 +25,7 @@
- Remove `WorkflowSettings.immediateTransitionDelay` field. It is not used by nFlow.
- Maintenance workflow instance is added to nFlow database by default in startup. Instances that have been in final state longer than 45 days are archived. Archived instances that have been in final state longer than one year are deleted. Maintenance is run every night. Use `nflow.maintenance` configuration options to change the defaults before startup, or update the maintenance workflow instance state variables after the instance has been created.
- Workflow instance actions and state variables that are older than 45 days are automatically cleaned up occasionally when the instance is processed. Use workflow settings to change the default time period (`setHistoryDeletableAfter`) and condition (`setDeleteHistoryCondition`) of the clean-up.
- Add support to query also archived workflow instances when not enough non-archived matches are found.
- Dependency updates
- logback-classic update to version 1.2.10
- http://mailman.qos.ch/pipermail/announce/2021/000164.html
Expand All @@ -39,6 +41,7 @@
- slf4j 1.7.33
- `nflow-rest-api`
- BREAKING CHANGE: Remove `ListWorkflowDefinitionResponse.TransitionDelays.immediate` field, it is not used by nFlow.
- Add `queryArchive=true` query parameter to query also archived workflow instances when not enough non-archived matches are found.
- Dependency updates
- swagger 1.6.4
- `nflow-jetty`
Expand All @@ -52,6 +55,7 @@
- Dependency updates
- metrics 4.2.7
- `nflow-explorer`
- Query and show also archived workflow instances when not enough non-archived matches are found. Querying and showing archived instances can be disabled in `config.js`.
- Dependency updates
- nodejs 16.13.2
- npm 8.1.2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package io.nflow.engine.internal.dao;

import static io.nflow.engine.internal.dao.DaoUtil.ColumnNamesExtractor.columnNamesExtractor;
import static io.nflow.engine.internal.dao.TablePrefix.ARCHIVE;
import static io.nflow.engine.internal.dao.TablePrefix.MAIN;
import static io.nflow.engine.internal.dao.NflowTable.ACTION;
import static io.nflow.engine.internal.dao.NflowTable.WORKFLOW;
import static io.nflow.engine.internal.dao.NflowTable.STATE;
import static java.lang.System.currentTimeMillis;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Stream.generate;
Expand Down Expand Up @@ -69,8 +70,8 @@ private String getStateColumns() {
return stateColumns;
}

public List<Long> getOldWorkflowIds(TablePrefix table, DateTime before, int maxWorkflows, Set<String> workflowTypes) {
StringBuilder sql = new StringBuilder("select id from ").append(table.nameOf("workflow"))
public List<Long> getOldWorkflowIds(TableType type, DateTime before, int maxWorkflows, Set<String> workflowTypes) {
StringBuilder sql = new StringBuilder("select id from ").append(WORKFLOW.tableFor(type))
.append(" where next_activation is null and ").append(sqlVariants.dateLtEqDiff("modified", "?"));
List<Object> args = new ArrayList<>();
args.add(sqlVariants.toTimestampObject(before));
Expand All @@ -85,32 +86,32 @@ public List<Long> getOldWorkflowIds(TablePrefix table, DateTime before, int maxW
@Transactional
public int archiveWorkflows(Collection<Long> workflowIds) {
String workflowIdParams = params(workflowIds);
int archivedInstances = archiveTable("workflow", "id", getWorkflowColumns(), workflowIdParams);
int archivedActions = archiveTable("workflow_action", "workflow_id", getActionColumns(), workflowIdParams);
int archivedStates = archiveTable("workflow_state", "workflow_id", getStateColumns(), workflowIdParams);
int archivedInstances = archiveTable(WORKFLOW, "id", getWorkflowColumns(), workflowIdParams);
int archivedActions = archiveTable(ACTION, "workflow_id", getActionColumns(), workflowIdParams);
int archivedStates = archiveTable(STATE, "workflow_id", getStateColumns(), workflowIdParams);
logger.info("Archived {} workflow instances, {} actions and {} states.", archivedInstances, archivedActions, archivedStates);
deleteWorkflows(MAIN, workflowIdParams);
deleteWorkflows(TableType.MAIN, workflowIdParams);
return archivedInstances;
}

@Transactional
public int deleteWorkflows(TablePrefix table, Collection<Long> workflowIds) {
public int deleteWorkflows(TableType type, Collection<Long> workflowIds) {
String workflowIdParams = params(workflowIds);
return deleteWorkflows(table, workflowIdParams);
return deleteWorkflows(type, workflowIdParams);
}

private int archiveTable(String table, String workflowIdColumn, String columns, String workflowIdParams) {
return jdbc.update("insert into " + ARCHIVE.nameOf(table) + "(" + columns + ") " + "select " + columns + " from "
+ MAIN.nameOf(table) + sqlVariants.withUpdateSkipLocked() + " where " + workflowIdColumn + " in " + workflowIdParams
private int archiveTable(NflowTable table, String workflowIdColumn, String columns, String workflowIdParams) {
return jdbc.update("insert into " + table.archive + "(" + columns + ") " + "select " + columns + " from " + table.main
+ sqlVariants.withUpdateSkipLocked() + " where " + workflowIdColumn + " in " + workflowIdParams
+ sqlVariants.forUpdateSkipLocked());
}

private int deleteWorkflows(TablePrefix table, String workflowIdParams) {
int deletedStates = jdbc.update("delete from " + table.nameOf("workflow_state") + " where workflow_id in " + workflowIdParams);
int deletedActions = jdbc.update("delete from " + table.nameOf("workflow_action") + " where workflow_id in " + workflowIdParams);
int deletedInstances = jdbc.update("delete from " + table.nameOf("workflow") + " where id in " + workflowIdParams);
private int deleteWorkflows(TableType type, String workflowIdParams) {
int deletedStates = jdbc.update("delete from " + STATE.tableFor(type) + " where workflow_id in " + workflowIdParams);
int deletedActions = jdbc.update("delete from " + ACTION.tableFor(type) + " where workflow_id in " + workflowIdParams);
int deletedInstances = jdbc.update("delete from " + WORKFLOW.tableFor(type) + " where id in " + workflowIdParams);
logger.info("Deleted {} workflow instances, {} actions and {} states from {} tables.", deletedInstances, deletedActions,
deletedStates, table.name());
deletedStates, type.name());
return deletedInstances;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.nflow.engine.internal.dao;

import io.nflow.engine.workflow.instance.WorkflowInstance;

public enum NflowTable {
WORKFLOW("workflow"),
STATE("workflow_state"),
ACTION("workflow_action");

public String main;
public String archive;

NflowTable(String table) {
this.main = TableType.MAIN.prefix + table;
this.archive = TableType.ARCHIVE.prefix + table;
}

public String tableFor(TableType type) {
return type == TableType.MAIN ? main : archive;
}

public String tableFor(WorkflowInstance instance) {
return instance.isArchived ? archive : main;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public void ensureCopyingPossible(String sourceTable, String destinationTable) {
Map<String, ColumnMetadata> sourceMetadataMap = getMetadata(sourceTable);
Map<String, ColumnMetadata> destMetadataMap = getMetadata(destinationTable);
if (destMetadataMap.size() < sourceMetadataMap.size()) {
throw new IllegalArgumentException(format("Source table %s has more columns than destination table %s", sourceTable,
destinationTable));
throw new IllegalArgumentException(
format("Source table %s has more columns than destination table %s", sourceTable, destinationTable));
}
Set<String> sourceKeySet = sourceMetadataMap.keySet();
Set<String> destKeySet = destMetadataMap.keySet();
Expand All @@ -52,7 +52,8 @@ public void ensureCopyingPossible(String sourceTable, String destinationTable) {
if (!sourceMetadata.typeName.equals(destMetadata.typeName)) {
throw new IllegalArgumentException(format(
"Source column %s.%s has type %s and destination column %s.%s has mismatching type %s", sourceTable,
sourceMetadata.columnName, sourceMetadata.typeName, destinationTable, destMetadata.columnName, destMetadata.typeName));
sourceMetadata.columnName, sourceMetadata.typeName, destinationTable, destMetadata.columnName,
destMetadata.typeName));
}
if (sourceMetadata.size > destMetadata.size) {
throw new IllegalArgumentException(format("Source column %s.%s has size %s and destination column %s.%s smaller size %s",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.nflow.engine.internal.dao;

public enum TableType {
MAIN("nflow_"), ARCHIVE("nflow_archive_");

public String prefix;

TableType(String prefix) {
this.prefix = prefix;
}

static String convertMainToArchive(String sql) {
return sql.replaceAll(MAIN.prefix, ARCHIVE.prefix);
}
}
Loading