Skip to content

Commit

Permalink
Merge branch 'master' into action-description
Browse files Browse the repository at this point in the history
  • Loading branch information
Edvard Fonsell committed Jan 13, 2015
2 parents 8a9a446 + a75d076 commit a3d52bd
Show file tree
Hide file tree
Showing 19 changed files with 227 additions and 41 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

**Details**
- nflow-engine:
- Use more optimal SQL when polling workflows when database supports update returning syntax
- Only rollback poll operation when no workflows could be allocated for executing (when multiple pollers compete for same workflows)
- Allow configuring executor queue length with _nflow.dispatcher.executor.queue.size_
- nflow-rest:
- Add support for user-provided action description when updating a workflow instance

Expand Down
5 changes: 5 additions & 0 deletions DECISION_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

(newest first)

2015-01-10 gmokki, efonsell
---------------------------
When polling for next workflow instances in WorkflowInstanceDao, the modified field in OptimisticLockKey is handled as String instead of Timestamp to avoid problems caused by losing millisecond precision from timestamps in some cases (for example with some older versions of MySQL).


2014-12-10 eputtone
-------------------
Internal nFlow functionalities can access DAO layer (package com.nitorcreations.nflow.engine.internal.dao) directly instead of going through service layer (package com.nitorcreations.nflow.engine.service). Rationale: service layer is currently part of public API that we wish to keep as simple as possible. Example: WorkflowDefinitionResource in nflow-rest-api uses WorkflowDefinitionDao directly for retrieving StoredWorkflowDefinitions, because we don't want to confuse public API users with multiple workflow definition representations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ public class EngineConfiguration {
public WorkflowInstanceExecutor nflowExecutor(@NFlow ThreadFactory nflowThreadFactory, Environment env) {
int threadCount = env.getProperty("nflow.executor.thread.count", Integer.class, 2 * getRuntime().availableProcessors());
int awaitTerminationSeconds = env.getProperty("nflow.dispatcher.await.termination.seconds", Integer.class, 60);
int notifyThreshold = env.getProperty("nflow.dispatcher.executor.queue.wait_until_threshold", Integer.class, 0);
int queueSize = env.getProperty("nflow.dispatcher.executor.queue.size", Integer.class, 2 * threadCount);
int notifyThreshold = env.getProperty("nflow.dispatcher.executor.queue.wait_until_threshold", Integer.class, queueSize / 2);
int keepAliveSeconds = env.getProperty("nflow.dispatcher.executor.thread.keepalive.seconds", Integer.class, 0);
return new WorkflowInstanceExecutor(threadCount, notifyThreshold, awaitTerminationSeconds, keepAliveSeconds,
return new WorkflowInstanceExecutor(queueSize, threadCount, notifyThreshold, awaitTerminationSeconds, keepAliveSeconds,
nflowThreadFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -43,6 +42,7 @@
import org.springframework.transaction.annotation.Transactional;

import com.nitorcreations.nflow.engine.internal.config.NFlow;
import com.nitorcreations.nflow.engine.internal.storage.db.SQLVariants;
import com.nitorcreations.nflow.engine.workflow.definition.StateExecutionStatistics;
import com.nitorcreations.nflow.engine.workflow.instance.QueryWorkflowInstances;
import com.nitorcreations.nflow.engine.workflow.instance.WorkflowInstance;
Expand All @@ -67,9 +67,15 @@ public class WorkflowInstanceDao {
private JdbcTemplate jdbc;
private NamedParameterJdbcTemplate namedJdbc;
ExecutorDao executorInfo;
private SQLVariants sqlVariants;
private long workflowInstanceQueryMaxResults;
private long workflowInstanceQueryMaxResultsDefault;

@Inject
public void setSQLVariants(SQLVariants sqlVariants) {
this.sqlVariants = sqlVariants;
}

@Inject
public void setJdbcTemplate(@NFlow JdbcTemplate nflowJdbcTemplate) {
this.jdbc = nflowJdbcTemplate;
Expand Down Expand Up @@ -209,12 +215,26 @@ public void processRow(ResultSet rs) throws SQLException {
@SuppressFBWarnings(value = "SIC_INNER_SHOULD_BE_STATIC_ANON", justification = "common jdbctemplate practice")
@Transactional
public List<Integer> pollNextWorkflowInstanceIds(int batchSize) {
if (sqlVariants.hasUpdateReturning()) {
return pollNextWorkflowInstanceIdsUsingUpdateReturning(batchSize);
}
return pollNextWorkflowInstanceIdsUsingSelectUpdate(batchSize);
}

private List<Integer> pollNextWorkflowInstanceIdsUsingUpdateReturning(int batchSize) {
return jdbc.queryForList("update nflow_workflow set executor_id = " + executorInfo.getExecutorId()
+ " where id in (select id from nflow_workflow where executor_id is null and next_activation < current_timestamp and "
+ executorInfo.getExecutorGroupCondition() + " order by next_activation asc limit " + batchSize
+ ") and executor_id is null returning id", Integer.class);
}

private List<Integer> pollNextWorkflowInstanceIdsUsingSelectUpdate(int batchSize) {
String sql = "select id, modified from nflow_workflow where executor_id is null and next_activation < current_timestamp and "
+ executorInfo.getExecutorGroupCondition() + " order by next_activation asc limit " + batchSize;
List<OptimisticLockKey> instances = jdbc.query(sql, new RowMapper<OptimisticLockKey>() {
@Override
public OptimisticLockKey mapRow(ResultSet rs, int rowNum) throws SQLException {
return new OptimisticLockKey(rs.getInt("id"), rs.getTimestamp("modified"));
return new OptimisticLockKey(rs.getInt("id"), rs.getString("modified"));
}
});
Collections.sort(instances);
Expand Down Expand Up @@ -247,11 +267,11 @@ public OptimisticLockKey mapRow(ResultSet rs, int rowNum) throws SQLException {

private static class OptimisticLockKey implements Comparable<OptimisticLockKey> {
public final int id;
public final Timestamp modified;
public final String modified;

public OptimisticLockKey(int id, Timestamp modified) {
public OptimisticLockKey(int id, String string) {
this.id = id;
this.modified = modified;
this.modified = string;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.nitorcreations.nflow.engine.internal.executor;

import static java.lang.Math.max;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.List;
Expand Down Expand Up @@ -107,7 +106,7 @@ private void dispatch(List<Integer> nextInstanceIds) {
}

private List<Integer> getNextInstanceIds() {
int nextBatchSize = max(0, 2 * executor.getMaximumPoolSize() - executor.getActiveCount());
int nextBatchSize = executor.getQueueRemainingCapacity();
logger.debug("Polling next {} workflow instances.", nextBatchSize);
return workflowInstances.pollNextWorkflowInstanceIds(nextBatchSize);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.nitorcreations.nflow.engine.internal.executor;

import static java.lang.Integer.MAX_VALUE;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.slf4j.LoggerFactory.getLogger;
Expand All @@ -18,9 +17,10 @@ public class WorkflowInstanceExecutor {
final ThreadPoolExecutor executor;
final ThresholdBlockingQueue<Runnable> queue;

public WorkflowInstanceExecutor(int threadCount, int notifyThreshold, int awaitTerminationSeconds, int keepAliveSeconds,
public WorkflowInstanceExecutor(int maxQueueSize, int threadCount, int notifyThreshold, int awaitTerminationSeconds,
int keepAliveSeconds,
ThreadFactory threadFactory) {
queue = new ThresholdBlockingQueue<>(MAX_VALUE, notifyThreshold);
queue = new ThresholdBlockingQueue<>(maxQueueSize, notifyThreshold);
executor = new ThreadPoolExecutor(threadCount, threadCount, keepAliveSeconds, SECONDS, queue, threadFactory);
executor.allowCoreThreadTimeOut(keepAliveSeconds > 0);
this.awaitTerminationSeconds = awaitTerminationSeconds;
Expand All @@ -34,12 +34,8 @@ public void execute(Runnable runnable) {
executor.execute(runnable);
}

public int getMaximumPoolSize() {
return executor.getMaximumPoolSize();
}

public int getActiveCount() {
return executor.getActiveCount();
public int getQueueRemainingCapacity() {
return queue.remainingCapacity();
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,10 @@ public static class H2SQLVariants implements SQLVariants {
public String currentTimePlusSeconds(int seconds) {
return "dateadd('second', " + seconds + ", current_timestamp)";
}

@Override
public boolean hasUpdateReturning() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,10 @@ public static class MySQLVariants implements SQLVariants {
public String currentTimePlusSeconds(int seconds) {
return "date_add(current_timestamp, interval " + seconds + " second)";
}

@Override
public boolean hasUpdateReturning() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,10 @@ public static class PostgreSQLVariants implements SQLVariants {
public String currentTimePlusSeconds(int seconds) {
return "current_timestamp + interval '" + seconds + " second'";
}

@Override
public boolean hasUpdateReturning() {
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

public interface SQLVariants {
String currentTimePlusSeconds(int seconds);

boolean hasUpdateReturning();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,17 @@ public class EngineConfigurationTest {
@InjectMocks
private final EngineConfiguration configuration = new EngineConfiguration();

public void dispatcherPoolExecutorInstantiation() {
@Test
public void dispatcherPoolExecutorInstantiationFromThreads() {
WorkflowInstanceExecutor executor = configuration.nflowExecutor(threadFactory, environment);
assertThat(executor.getQueueRemainingCapacity(), is(200));
}

@Test
public void dispatcherPoolExecutorInstantiationFromQueueSize() {
environment.setProperty("nflow.dispatcher.executor.queue.size", "10");
WorkflowInstanceExecutor executor = configuration.nflowExecutor(threadFactory, environment);
assertThat(executor.getMaximumPoolSize(), is(100));
assertThat(executor.getQueueRemainingCapacity(), is(10));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void setup() {
when(env.getProperty("nflow.dispatcher.sleep.ms", Long.class, 5000l)).thenReturn(0l);
when(env.getProperty("nflow.dispatcher.executor.queue.wait_until_threshold", Integer.class, 0)).thenReturn(0);
when(recovery.isTransactionSupportEnabled()).thenReturn(true);
executor = new WorkflowInstanceExecutor(2, 0, 10, 0, new CustomizableThreadFactory("nflow-executor-"));
executor = new WorkflowInstanceExecutor(3, 2, 0, 10, 0, new CustomizableThreadFactory("nflow-executor-"));
dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, recovery, env);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class WorkflowInstanceExecutorTest {

@Test
public void testThreadPoolCreateWithCorrectParameters() {
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(2, 1, 3, 4, threadFactory);
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(3, 2, 1, 3, 4, threadFactory);
assertThat(t.executor.getCorePoolSize(), is(2));
assertThat(t.executor.getMaximumPoolSize(), is(2));
assertThat(t.executor.getKeepAliveTime(SECONDS), is(4L));
Expand All @@ -38,28 +38,27 @@ public void testThreadPoolCreateWithCorrectParameters() {

@Test
public void testDummyGetters() {
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(2, 1, 3, 4, threadFactory);
assertThat(t.getActiveCount(), is(0));
assertThat(t.getMaximumPoolSize(), is(2));
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(3, 2, 1, 3, 4, threadFactory);
assertThat(t.getQueueRemainingCapacity(), is(3));
}

@Test
public void testExecute() {
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(2, 1, 3, 4, new CustomizableThreadFactory("test"));
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(3, 2, 1, 3, 4, new CustomizableThreadFactory("test"));
t.execute(runnable);
verify(runnable, timeout(1000)).run();
}

@Test
public void testWait() throws InterruptedException {
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(2, 1, 3, 4, new CustomizableThreadFactory("test"));
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(3, 2, 1, 3, 4, new CustomizableThreadFactory("test"));
t.execute(runnable);
t.waitUntilQueueSizeLowerThanThreshold(new DateTime().plusSeconds(5));
}

@Test
public void testShutdown() {
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(2, 1, 3, 4, new CustomizableThreadFactory("test"));
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(3, 2, 1, 3, 4, new CustomizableThreadFactory("test"));
t.shutdown();
assertThat(t.executor.isShutdown(), is(true));
}
Expand Down
27 changes: 23 additions & 4 deletions nflow-jetty/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@
<artifactId>nflow-explorer</artifactId>
<version>${nflow.explorer.version}</version>
<type>tar.gz</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.outputDirectory}/static/explorer</outputDirectory>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/explorer</outputDirectory>
</artifactItem>
</artifactItems>
<excludes>config.js</excludes>
</configuration>
</execution>
</executions>
Expand All @@ -65,7 +64,7 @@
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<id>copy-swagger-resources</id>
<phase>process-resources</phase>
<goals>
<goal>copy-resources</goal>
Expand All @@ -84,6 +83,26 @@
</resources>
</configuration>
</execution>
<execution>
<id>copy-explorer-resources</id>
<phase>process-resources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.outputDirectory}/static/explorer</outputDirectory>
<overwrite>false</overwrite>
<resources>
<resource>
<directory>${project.build.directory}/explorer</directory>
<filtering>false</filtering>
<excludes>
<exclude>config.js</exclude>
</excludes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public Response updateWorkflowInstance(
msg += "API changed nextActivationTime to " + req.nextActivationTime + ".";
}
}
if (msg.isEmpty()) {
return noContent().build();
}
WorkflowInstance instance = builder.build();
boolean updated = workflowInstances.updateWorkflowInstance(instance, new WorkflowInstanceAction.Builder(instance)
.setStateText(trimToNull(msg)).setExecutionEnd(now()).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.nitorcreations.nflow.engine.workflow.statistics.Statistics;
import com.nitorcreations.nflow.rest.v1.msg.Action;
import com.nitorcreations.nflow.rest.v1.msg.CreateWorkflowInstanceRequest;
import com.nitorcreations.nflow.rest.v1.msg.CreateWorkflowInstanceResponse;
import com.nitorcreations.nflow.rest.v1.msg.ListWorkflowDefinitionResponse;
import com.nitorcreations.nflow.rest.v1.msg.ListWorkflowInstanceResponse;
import com.nitorcreations.nflow.rest.v1.msg.StatisticsResponse;
import com.nitorcreations.nflow.rest.v1.msg.UpdateWorkflowInstanceRequest;
import com.nitorcreations.nflow.tests.config.PropertiesConfiguration;
import com.nitorcreations.nflow.tests.config.RestClientConfiguration;
import com.nitorcreations.nflow.tests.runner.NflowServerRule;
Expand Down Expand Up @@ -64,18 +65,22 @@ public void setStatisticsResource(@Named("statistics") WebClient client) {
}

protected ListWorkflowInstanceResponse getWorkflowInstance(int instanceId) {
return getInstanceResource(instanceId).get(ListWorkflowInstanceResponse.class);
}

private WebClient getInstanceResource(int instanceId) {
WebClient client = fromClient(workflowInstanceResource, true).path(Integer.toString(instanceId));
return client.get(ListWorkflowInstanceResponse.class);
return client;
}

protected ListWorkflowDefinitionResponse[] getWorkflowDefinitions() {
WebClient client = fromClient(workflowDefinitionResource, true);
return client.get(ListWorkflowDefinitionResponse[].class);
}

public Statistics getStatistics() {
public StatisticsResponse getStatistics() {
WebClient client = fromClient(statisticsResource, true);
return client.get(Statistics.class);
return client.get(StatisticsResponse.class);
}

protected ListWorkflowInstanceResponse getWorkflowInstance(int id, String expectedState) throws InterruptedException {
Expand Down Expand Up @@ -112,6 +117,10 @@ protected CreateWorkflowInstanceResponse createWorkflowInstance(CreateWorkflowIn
return makeWorkflowInstanceQuery(request, CreateWorkflowInstanceResponse.class);
}

protected String updateWorkflowInstance(int instanceId, UpdateWorkflowInstanceRequest request) {
return getInstanceResource(instanceId).put(request, String.class);
}

private <T> T makeWorkflowInstanceQuery(CreateWorkflowInstanceRequest request, Class<T> responseClass) {
return fromClient(workflowInstanceResource, true).put(request, responseClass);
}
Expand Down
Loading

0 comments on commit a3d52bd

Please sign in to comment.