Skip to content

Commit

Permalink
Merge a2fb65c into 786d591
Browse files Browse the repository at this point in the history
  • Loading branch information
efonsell committed Nov 25, 2019
2 parents 786d591 + a2fb65c commit 03b602e
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 16 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

**Details**
- `nflow-engine`
- Add `priority` two byte integer to the `nflow_workflow` table. When an executor chooses from many available scheduled workflow instances it primarily (unfairly) schedules the workflow instance with the larger priority value, and for workflows with the same priority, the one scheduled first. Priority defaults to 0 and can also be negative. Requires database migration.
- Add `priority` two byte integer to the `nflow_workflow` table. When the dispatcher chooses from many available scheduled workflow instances it primarily (unfairly) picks the workflow instances with the largest priority values, and for workflows with the same priority, the ones with oldest `next_activation` timestamp. Priority defaults to 0 and can also be negative. Default priority value for the new workflow instances can be set per workflow definition (`WorkflowSettings.Builder.setDefaultPriority`), and overridden per workflow instance (`WorkflowInstance.Builder.setPriority`). Requires database migration, see database update scripts for details.
- Separate workflow definition scanning from `WorkflowDefinitionService` by introducing `WorkflowDefinitionSpringBeanScanner` and `WorkflowDefinitionClassNameScanner`. This allows breaking the circular dependency when a workflow definition uses `WorkflowInstanceService` (which depends on `WorkflowDefinitionService`, which depended on all workflow definitions). This enabled using constructor injection in all nFlow classes.
- Add `disableMariaDbDriver` to default MySQL JDBC URL so that in case there are both MySQL and MariaDB JDBC drivers in the classpath then MariaDB will not steal the MySQL URL.
- Add support for `nflow.db.mariadb` profile.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) {
try {
StringBuilder sqlb = new StringBuilder(256);
sqlb.append("with wf as (").append(insertWorkflowInstanceSql()).append(" returning id)");
Object[] instanceValues = new Object[] { instance.type, getInstancePriority(instance),
instance.rootWorkflowId, instance.parentWorkflowId,
instance.parentActionId, instance.businessKey, instance.externalId, executorInfo.getExecutorGroup(),
instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()),
toTimestamp(instance.nextActivation), instance.signal.orElse(null) };
Object[] instanceValues = new Object[] { instance.type, instance.priority, instance.rootWorkflowId,
instance.parentWorkflowId, instance.parentActionId, instance.businessKey, instance.externalId,
executorInfo.getExecutorGroup(), instance.status.name(), instance.state,
abbreviate(instance.stateText, getInstanceStateTextLength()), toTimestamp(instance.nextActivation),
instance.signal.orElse(null) };
int pos = instanceValues.length;
Object[] args = Arrays.copyOf(instanceValues, pos + instance.stateVariables.size() * 2);
for (Entry<String, String> variable : instance.stateVariables.entrySet()) {
Expand All @@ -187,10 +187,6 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) {
}
}

private short getInstancePriority(WorkflowInstance instance) {
return instance.priority != null ? instance.priority.shortValue() : 0;
}

boolean useBatchUpdate() {
return !disableBatchUpdates && sqlVariants.useBatchUpdate();
}
Expand All @@ -215,7 +211,7 @@ private long insertWorkflowInstanceWithTransaction(final WorkflowInstance instan
int p = 1;
PreparedStatement ps = connection.prepareStatement(insertWorkflowInstanceSql(), new String[] { "id" });
ps.setString(p++, instance.type);
ps.setInt(p++, getInstancePriority(instance));
ps.setShort(p++, instance.priority);
ps.setObject(p++, instance.rootWorkflowId);
ps.setObject(p++, instance.parentWorkflowId);
ps.setObject(p++, instance.parentActionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public WorkflowInstance process(WorkflowInstance instance) {
if (instance.status == null) {
builder.setStatus(created);
}
if (instance.priority == null) {
builder.setPriority(def.getSettings().getDefaultPriority());
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public class WorkflowSettings extends ModelObject {
* By default, returns true roughly every tenth time.
*/
public final BooleanSupplier deleteHistoryCondition;
/**
* Default priority for new workflow instances.
*/
public final short defaultPriority;

WorkflowSettings(Builder builder) {
this.minErrorTransitionDelay = builder.minErrorTransitionDelay;
Expand All @@ -72,6 +76,7 @@ public class WorkflowSettings extends ModelObject {
this.maxSubsequentStateExecutionsPerState = new HashMap<>(builder.maxSubsequentStateExecutionsPerState);
this.historyDeletableAfterHours = builder.historyDeletableAfterHours;
this.deleteHistoryCondition = builder.deleteHistoryCondition;
this.defaultPriority = builder.defaultPriority;
}

/**
Expand All @@ -88,6 +93,7 @@ public static class Builder {
int maxSubsequentStateExecutions = 100;
Map<WorkflowState, Integer> maxSubsequentStateExecutionsPerState = new HashMap<>();
Integer historyDeletableAfterHours;
short defaultPriority = 0;
Random rnd = new Random();
BooleanSupplier deleteHistoryCondition = new BooleanSupplier() {

Expand Down Expand Up @@ -213,6 +219,18 @@ public Builder setDeleteHistoryCondition(BooleanSupplier deleteHistoryCondition)
return this;
}

/**
* Set the default priority for new workflow instances.
*
* @param defaultPriority
* Default priority.
* @return this.
*/
public Builder setDefaultPriority(short defaultPriority) {
this.defaultPriority = defaultPriority;
return this;
}

/**
* Create workflow settings object.
*
Expand Down Expand Up @@ -284,4 +302,13 @@ public boolean deleteWorkflowInstanceHistory() {
return deleteHistoryCondition.getAsBoolean();
}

/**
* Return default priority for new workflow instances.
*
* @return Default priority for new workflow instances.
*/
public Short getDefaultPriority() {
return defaultPriority;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,8 @@ public void recoverWorkflowInstancesFromDeadNodesSetsExecutorIdToNullAndStatusTo
int crashedExecutorId = 999;
insertCrashedExecutor(crashedExecutorId, executorDao.getExecutorGroup());
long id = dao.insertWorkflowInstance(new WorkflowInstance.Builder().setType("test").setExternalId("extId")
.setExecutorGroup(executorDao.getExecutorGroup()).setStatus(executing).setState("processing").build());
.setExecutorGroup(executorDao.getExecutorGroup()).setStatus(executing).setState("processing").setPriority((short) 0)
.build());
int updated = jdbc.update("update nflow_workflow set executor_id = ? where id = ?", crashedExecutorId, id);
assertThat(updated, is(1));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

import org.joda.time.DateTime;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
@SuppressWarnings("serial")
Expand All @@ -34,7 +34,8 @@ protected WorkflowInstance.Builder constructWorkflowInstanceBuilder() {
put("requestData", "{ \"parameter\": \"abc\" }");
}
}) //
.setSignal(Optional.of(42));
.setSignal(Optional.of(42)) //
.setPriority((short) 0);
}

protected WorkflowInstanceAction.Builder constructActionBuilder(long workflowInstanceID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.StateExecution;
import io.nflow.engine.workflow.definition.WorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.definition.WorkflowStateType;

public class DummyTestWorkflow extends WorkflowDefinition<DummyTestWorkflow.DummyTestState> {
Expand All @@ -34,7 +35,11 @@ public String getDescription() {
}

public DummyTestWorkflow() {
super("dummy", DummyTestState.start, DummyTestState.end);
this(new WorkflowSettings.Builder().build());
}

public DummyTestWorkflow(WorkflowSettings settings) {
super("dummy", DummyTestState.start, DummyTestState.end, settings);
permit(DummyTestState.start, DummyTestState.end, DummyTestState.end);
permit(DummyTestState.alternativeStart, DummyTestState.end);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.nflow.engine.service.DummyTestWorkflow;
import io.nflow.engine.service.WorkflowDefinitionService;
import io.nflow.engine.workflow.definition.WorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.instance.WorkflowInstance;

public class WorkflowInstancePreProcessorTest extends BaseNflowTest {
Expand All @@ -25,9 +26,13 @@ public class WorkflowInstancePreProcessorTest extends BaseNflowTest {

private WorkflowInstancePreProcessor preProcessor;

private WorkflowDefinition<?> dummyWorkflow;

private static final short DEFAULT_PRIORITY = 100;

@BeforeEach
public void setup() {
WorkflowDefinition<?> dummyWorkflow = new DummyTestWorkflow();
dummyWorkflow = new DummyTestWorkflow(new WorkflowSettings.Builder().setDefaultPriority(DEFAULT_PRIORITY).build());
lenient().doReturn(dummyWorkflow).when(workflowDefinitionService).getWorkflowDefinition("dummy");
preProcessor = new WorkflowInstancePreProcessor(workflowDefinitionService);
}
Expand Down Expand Up @@ -66,4 +71,19 @@ public void unsupportedTypeThrowsException() {
RuntimeException thrown = assertThrows(RuntimeException.class, () -> preProcessor.process(i));
assertThat(thrown.getMessage(), containsString("No workflow definition found for type [nonexistent]"));
}

@Test
public void setsPriorityToDefinitionDefaultIfMissing() {
WorkflowInstance i = constructWorkflowInstanceBuilder().setPriority(null).build();
WorkflowInstance processed = preProcessor.process(i);
assertThat(processed.priority, is(DEFAULT_PRIORITY));
}

@Test
public void doesNotOverrideInstancePriority() {
short priority = 10;
WorkflowInstance i = constructWorkflowInstanceBuilder().setPriority(priority).build();
WorkflowInstance processed = preProcessor.process(i);
assertThat(processed.priority, is(priority));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void verifyConstantDefaultValues() {
assertThat(delta, greaterThanOrEqualTo(-1000L));
assertThat(delta, lessThanOrEqualTo(0L));
assertThat(s.historyDeletableAfterHours, is(nullValue()));
assertThat(s.defaultPriority, is((short) 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public ListWorkflowDefinitionResponse convert(AbstractWorkflowDefinition<? exten
settings.transitionDelaysInMilliseconds = transitionDelays;
settings.maxRetries = workflowSettings.maxRetries;
settings.historyDeletableAfterHours = workflowSettings.historyDeletableAfterHours;
settings.defaultPriority = workflowSettings.defaultPriority;
resp.settings = settings;

resp.supportedSignals = definition.getSupportedSignals().entrySet().stream().map(entry -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public static class Settings extends ModelObject {
@ApiModelProperty(value = "Delay after which workflow instance history (actions, states) can be deleted from database", required = false)
public Integer historyDeletableAfterHours;

@ApiModelProperty(value = "Default priority for new workflow instances", required = true)
public short defaultPriority;

}

public static class TransitionDelays extends ModelObject {
Expand Down

0 comments on commit 03b602e

Please sign in to comment.