Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add priority to workflow instance #354

Merged
merged 16 commits into from
Nov 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 6.0.0-SNAPSHOT (future release)

**Highlights**
- Add priority to workflow instances
- Use constructor injection instead of field or setter injection in nFlow classes
- Separate workflow definition scanning from `WorkflowDefinitionService`
- Remove deprecated `WorkflowInstanceInclude.STARTED` enum value
Expand All @@ -14,6 +15,7 @@

**Details**
- `nflow-engine`
- Add `priority` two byte integer to the `nflow_workflow` table. When an executor chooses from many available scheduled workflow instances it primarily (unfairly) schedules the workflow instance with the larger priority value, and for workflows with the same priority, the one scheduled first. Priority defaults to 0 and can also be negative. Requires database migration.
efonsell marked this conversation as resolved.
Show resolved Hide resolved
- Separate workflow definition scanning from `WorkflowDefinitionService` by introducing `WorkflowDefinitionSpringBeanScanner` and `WorkflowDefinitionClassNameScanner`. This allows breaking the circular dependency when a workflow definition uses `WorkflowInstanceService` (which depends on `WorkflowDefinitionService`, which depended on all workflow definitions). This enabled using constructor injection in all nFlow classes.
- Add `disableMariaDbDriver` to default MySQL JDBC URL so that in case there are both MySQL and MariaDB JDBC drivers in the classpath then MariaDB will not steal the MySQL URL.
- Add support for `nflow.db.mariadb` profile.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) {
try {
StringBuilder sqlb = new StringBuilder(256);
sqlb.append("with wf as (").append(insertWorkflowInstanceSql()).append(" returning id)");
Object[] instanceValues = new Object[] { instance.type, instance.rootWorkflowId, instance.parentWorkflowId,
Object[] instanceValues = new Object[] { instance.type, getInstancePriority(instance),
instance.rootWorkflowId, instance.parentWorkflowId,
instance.parentActionId, instance.businessKey, instance.externalId, executorInfo.getExecutorGroup(),
instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()),
toTimestamp(instance.nextActivation), instance.signal.orElse(null) };
Expand All @@ -186,13 +187,17 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) {
}
}

private short getInstancePriority(WorkflowInstance instance) {
return instance.priority != null ? instance.priority.shortValue() : 0;
}

boolean useBatchUpdate() {
return !disableBatchUpdates && sqlVariants.useBatchUpdate();
}

String insertWorkflowInstanceSql() {
return "insert into nflow_workflow(type, root_workflow_id, parent_workflow_id, parent_action_id, business_key, external_id, "
+ "executor_group, status, state, state_text, next_activation, workflow_signal) values (?, ?, ?, ?, ?, ?, ?, "
return "insert into nflow_workflow(type, priority, root_workflow_id, parent_workflow_id, parent_action_id, business_key, external_id, "
+ "executor_group, status, state, state_text, next_activation, workflow_signal) values (?, ?, ?, ?, ?, ?, ?, ?, "
+ sqlVariants.workflowStatus() + ", ?, ?, ?, ?)";
}

Expand All @@ -210,6 +215,7 @@ private long insertWorkflowInstanceWithTransaction(final WorkflowInstance instan
int p = 1;
PreparedStatement ps = connection.prepareStatement(insertWorkflowInstanceSql(), new String[] { "id" });
ps.setString(p++, instance.type);
ps.setInt(p++, getInstancePriority(instance));
ps.setObject(p++, instance.rootWorkflowId);
ps.setObject(p++, instance.parentWorkflowId);
ps.setObject(p++, instance.parentActionId);
Expand Down Expand Up @@ -525,7 +531,7 @@ String updateInstanceForExecutionQuery() {
String whereConditionForInstanceUpdate() {
return "where executor_id is null and status in (" + sqlVariants.workflowStatus(created) + ", "
+ sqlVariants.workflowStatus(inProgress) + ") and " + sqlVariants.dateLtEqDiff("next_activation", "current_timestamp")
+ " and " + executorInfo.getExecutorGroupCondition() + " order by next_activation asc";
+ " and " + executorInfo.getExecutorGroupCondition() + " order by priority desc, next_activation asc";
}

private List<Long> pollNextWorkflowInstanceIdsWithUpdateReturning(int batchSize) {
Expand Down Expand Up @@ -762,6 +768,7 @@ public WorkflowInstance.Builder mapRow(ResultSet rs, int rowNum) throws SQLExcep
.setParentActionId(getLong(rs, "parent_action_id")) //
.setStatus(WorkflowInstanceStatus.valueOf(rs.getString("status"))) //
.setType(rs.getString("type")) //
.setPriority(rs.getShort("priority")) //
.setBusinessKey(rs.getString("business_key")) //
.setExternalId(rs.getString("external_id")) //
.setState(rs.getString("state")) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ public enum WorkflowInstanceStatus {
*/
public final String type;

/**
* The priority of the workflow instance. When an executor chooses from many available scheduled
* workflow instances it primarily (unfairly) schedules the workflow instance with the larger
* priority value, and for workflows with the same priority, the one scheduled first. Priority
* defaults to 0 and can also be negative.
*/
public final Short priority;

/**
* Business key.
*/
Expand Down Expand Up @@ -161,6 +169,7 @@ public enum WorkflowInstanceStatus {
this.parentActionId = builder.parentActionId;
this.status = builder.status;
this.type = builder.type;
this.priority = builder.priority;
this.businessKey = builder.businessKey;
this.externalId = builder.externalId;
this.state = builder.state;
Expand Down Expand Up @@ -235,6 +244,7 @@ public static class Builder {
Long parentActionId;
WorkflowInstanceStatus status;
String type;
Short priority;
String businessKey;
String externalId;
String state;
Expand Down Expand Up @@ -278,6 +288,7 @@ public Builder(WorkflowInstance copy) {
this.parentActionId = copy.parentActionId;
this.status = copy.status;
this.type = copy.type;
this.priority = copy.priority;
this.businessKey = copy.businessKey;
this.externalId = copy.externalId;
this.state = copy.state;
Expand Down Expand Up @@ -366,6 +377,19 @@ public Builder setType(String type) {
return this;
}

/**
* Set the priority of the workflow instance. When an executor chooses from many available
* scheduled workflow instances it primarily (unfairly) schedules the workflow instance with
* the larger priority value, and for workflows with the same priority, the one scheduled
* first. Priority defaults to 0 and can also be negative.
* @param priority The priority.
ttiurani marked this conversation as resolved.
Show resolved Hide resolved
* @return this.
*/
public Builder setPriority(Short priority) {
this.priority = priority;
return this;
}

/**
* Set the business key.
* @param businessKey The business key.
Expand Down
2 changes: 2 additions & 0 deletions nflow-engine/src/main/resources/scripts/db/db2.create.ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ create table nflow_workflow (
id int primary key generated always as identity,
ttiurani marked this conversation as resolved.
Show resolved Hide resolved
status varchar(32) not null check (status in ('created', 'executing', 'inProgress', 'finished', 'manual')),
type varchar(64) not null,
priority smallint not null default 0,
root_workflow_id integer default null,
parent_workflow_id integer default null,
parent_action_id integer default null,
Expand Down Expand Up @@ -99,6 +100,7 @@ create table nflow_archive_workflow (
id int not null primary key,
status varchar(32) not null,
type varchar(64) not null,
priority smallint null,
root_workflow_id integer,
parent_workflow_id integer,
parent_action_id integer,
Expand Down
2 changes: 2 additions & 0 deletions nflow-engine/src/main/resources/scripts/db/h2.create.ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ create table if not exists nflow_workflow (
id int not null auto_increment primary key,
status varchar(32) not null check status in ('created', 'executing', 'inProgress', 'finished', 'manual'),
type varchar(64) not null,
priority smallint not null default 0,
root_workflow_id integer default null,
parent_workflow_id integer default null,
parent_action_id integer default null,
Expand Down Expand Up @@ -87,6 +88,7 @@ create table if not exists nflow_archive_workflow (
id int not null primary key,
status varchar(32) not null check status in ('created', 'executing', 'inProgress', 'finished', 'manual'),
type varchar(64) not null,
priority smallint null,
root_workflow_id integer,
parent_workflow_id integer,
parent_action_id integer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ create table if not exists nflow_workflow (
id int not null auto_increment primary key,
status enum('created', 'executing', 'inProgress', 'finished', 'manual') not null,
type varchar(64) not null,
priority smallint not null default 0,
root_workflow_id integer default null,
parent_workflow_id integer default null,
parent_action_id integer default null,
Expand Down Expand Up @@ -85,6 +86,7 @@ create table if not exists nflow_archive_workflow (
id int not null primary key,
status enum('created', 'executing', 'inProgress', 'finished', 'manual') not null,
type varchar(64) not null,
priority smallint null,
root_workflow_id integer,
parent_workflow_id integer,
parent_action_id integer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ create table if not exists nflow_workflow (
id int not null auto_increment primary key,
status enum('created', 'executing', 'inProgress', 'finished', 'manual') not null,
type varchar(64) not null,
priority smallint not null default 0,
root_workflow_id integer default null,
parent_workflow_id integer default null,
parent_action_id integer default null,
Expand Down Expand Up @@ -95,6 +96,7 @@ create table if not exists nflow_archive_workflow (
id int not null primary key,
status enum('created', 'executing', 'inProgress', 'finished', 'manual') not null,
type varchar(64) not null,
priority smallint null,
root_workflow_id integer,
parent_workflow_id integer,
parent_action_id integer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ create table nflow_workflow (
id int not null primary key,
status varchar(32) not null,
type varchar(64) not null,
priority smallint not null default 0,
root_workflow_id int default null,
parent_workflow_id int default null,
parent_action_id int default null,
Expand Down Expand Up @@ -145,6 +146,7 @@ create table nflow_archive_workflow (
id int not null primary key,
status varchar(32) not null,
type varchar(64) not null,
priority smallint null,
root_workflow_id int,
parent_workflow_id int,
parent_action_id int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ create table if not exists nflow_workflow (
id serial primary key,
status workflow_status not null,
type varchar(64) not null,
priority smallint not null default 0,
root_workflow_id integer default null,
parent_workflow_id integer default null,
parent_action_id integer default null,
Expand Down Expand Up @@ -106,6 +107,7 @@ create table if not exists nflow_archive_workflow (
id integer primary key,
status workflow_status not null,
type varchar(64) not null,
priority smallint null,
root_workflow_id integer,
parent_workflow_id integer,
parent_action_id integer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ create table nflow_workflow (
id int not null identity(1,1) primary key,
status varchar(32) not null,
type varchar(64) not null,
priority smallint not null default 0,
root_workflow_id int default null,
parent_workflow_id int default null,
parent_action_id int default null,
Expand Down Expand Up @@ -125,6 +126,7 @@ create table nflow_archive_workflow (
id int not null primary key,
status varchar(32) not null,
type varchar(64) not null,
priority smallint null,
root_workflow_id int,
parent_workflow_id int,
parent_action_id int,
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,27 @@
-- For reasonably small nflow_workflow row count:
alter table nflow_workflow add priority smallint not null default 0;
-- For large nflow_workflow row count:
--
-- alter table nflow_workflow add priority smallint null;
-- drop trigger if exists nflow_workflow_update_modified;
--
-- followed by either:
--
-- update nflow_workflow set priority = 0, modified = modified where priority is null;
--
-- or in batches of 100k with this query, repeated until no rows are affected:
--
-- update nflow_workflow set priority = 0, modified = modified where priority is null limit 100000;
--
-- and finally:
--
-- alter table nflow_workflow alter column priority smallint not null default 0;
-- create trigger nflow_workflow_update_modified
-- before update on nflow_workflow
-- referencing new as n
-- for each row
-- set modified = current timestamp;

alter table nflow_archive_workflow add priority smallint null;

create index nflow_workflow_polling on nflow_workflow(next_activation, status, executor_id, executor_group);
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
alter table nflow_workflow add priority smallint not null default 0;
alter table nflow_archive_workflow add priority smallint null;

create index if not exists nflow_workflow_polling on nflow_workflow(next_activation, status, executor_id, executor_group);

drop index if exists nflow_workflow_next_activation;
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
-- For reasonably small nflow_workflow row count:
alter table nflow_workflow add priority smallint not null default 0;
-- For large nflow_workflow row count:
--
-- alter table nflow_workflow add priority smallint null;
--
-- followed by either:
--
-- update nflow_workflow set priority = 0, modified = modified where priority is null;
--
-- or in batches of 100k with this query, repeated until no rows are affected:
--
-- update nflow_workflow set priority = 0, modified = modified where priority is null limit 100000;
--
-- and finally:
--
-- alter table nflow_workflow alter column priority smallint not null default 0;

alter table nflow_archive_workflow add priority smallint null;

create index nflow_workflow_polling on nflow_workflow(next_activation, status, executor_id, executor_group);

drop index nflow_workflow_activation on nflow_workflow;
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
-- For reasonably small nflow_workflow row count:
alter table nflow_workflow add priority smallint not null default 0;
-- For large nflow_workflow row count:
--
-- alter table nflow_workflow add priority smallint null;
--
-- followed by either:
--
-- update nflow_workflow set priority = 0, modified = modified where priority is null;
--
-- or in batches of 100k with this query, repeated until no rows are affected:
--
-- update nflow_workflow set priority = 0, modified = modified where priority is null limit 100000;
--
-- and finally:
--
-- alter table nflow_workflow alter column priority smallint not null default 0;

alter table nflow_archive_workflow add priority smallint null;

create index nflow_workflow_polling on nflow_workflow(next_activation, status, executor_id, executor_group);

drop index nflow_workflow_activation on nflow_workflow;
Original file line number Diff line number Diff line change
@@ -1,2 +1,33 @@
-- For reasonably small nflow_workflow row count:
alter table nflow_workflow add priority smallint not null default 0
/
-- For large nflow_workflow row count:
--
-- alter table nflow_workflow add priority smallint null
-- /
-- alter trigger nflow_workflow_update disable
-- /
--
-- followed by either:
--
-- update nflow_workflow set priority = 0, modified = modified where priority is null
-- /
--
-- or in batches of 100k with this query, repeated until no rows are affected:
--
-- update nflow_workflow set priority = 0, modified = modified where priority is null limit 100000
-- /
--
-- and finally:
--
-- alter table nflow_workflow alter column priority smallint not null default 0
-- /
-- alter trigger nflow_workflow_update enable
-- /


alter table nflow_archive_workflow add priority smallint null
/

create index nflow_workflow_polling on nflow_workflow (next_activation, status, executor_id, executor_group)
/
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
-- For reasonably small nflow_workflow row count:
alter table nflow_workflow add priority smallint not null default 0;
-- For large nflow_workflow row count:
--
-- alter table nflow_workflow add priority smallint null;
-- alter table nflow_workflow disable trigger update_nflow_modified;
--
-- followed by either:
--
-- update nflow_workflow set priority = 0, modified = modified where priority is null;
--
-- or in batches of 100k with this query, repeated until no rows are affected:
--
-- update nflow_workflow set priority = 0, modified = modified where priority is null limit 100000;
--
-- and finally:
--
-- alter table nflow_workflow alter column priority smallint not null default 0;
-- alter table nflow_workflow enable trigger update_nflow_modified;

alter table nflow_archive_workflow add priority smallint null;

drop index if exists nflow_workflow_polling;
create index nflow_workflow_polling on nflow_workflow(next_activation, status, executor_id, executor_group) where next_activation is not null;

Expand Down
Loading