Skip to content

Commit

Permalink
Merge ce71de0 into 77d046c
Browse files Browse the repository at this point in the history
  • Loading branch information
Edvard Fonsell committed Jun 6, 2019
2 parents 77d046c + ce71de0 commit 47eea41
Show file tree
Hide file tree
Showing 26 changed files with 313 additions and 167 deletions.
34 changes: 19 additions & 15 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
## 5.7.1-SNAPSHOT (future release)
## 6.0.0-SNAPSHOT (future release)

**Highlights**
- Use constructor injection instead of field or setter injection in nFlow classes
- Separate workflow definition scanning from `WorkflowDefinitionService`

**Details**
- `nflow-engine`
- Separate workflow definition scanning from `WorkflowDefinitionService` by introducing `WorkflowDefinitionSpringBeanScanner` and `WorkflowDefinitionClassNameScanner`. This allows breaking the circular dependency when a workflow definition uses `WorkflowInstanceService` (which depends on `WorkflowDefinitionService`, which depended on all workflow definitions). This enabled using constructor injection in all nFlow classes.

## 5.7.0 (2019-06-06)

Expand All @@ -20,20 +24,20 @@
- Moved default implementations for `WorkflowExecutorListener` interface methods from the abstract class to the interface.
- Fixed bug with dropping non-existent index in PostgreSQL create script.
- Dependency updates:
- reactor.netty 0.8.8.RELEASE
- jetty 9.4.18.v20190429
- javassist 3.25.0-GA
- mysql-connector-java 8.0.16
- mssql-jdbc 7.2.2.jre8
- metrics 4.1.0
- spring 5.1.7.RELEASE
- hibernate.validator 6.0.15.Final
- cxf 3.3.2
- joda-time 2.10.2
- commons-lang3 3.9
- jackson 2.9.9
- junit 5.4.1
- mockito 2.27.0
- reactor.netty 0.8.8.RELEASE
- jetty 9.4.18.v20190429
- javassist 3.25.0-GA
- mysql-connector-java 8.0.16
- mssql-jdbc 7.2.2.jre8
- metrics 4.1.0
- spring 5.1.7.RELEASE
- hibernate.validator 6.0.15.Final
- cxf 3.3.2
- joda-time 2.10.2
- commons-lang3 3.9
- jackson 2.9.9
- junit 5.4.1
- mockito 2.27.0
- `nflow-explorer`
- Dependency updates

Expand Down
2 changes: 1 addition & 1 deletion nflow-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<artifactId>nflow-root</artifactId>
<groupId>io.nflow</groupId>
<version>5.7.1-SNAPSHOT</version>
<version>6.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public NflowEngine(DataSource dataSource, SQLVariants sqlVariants,
workflowLifecycle = ctx.getBean(WorkflowLifecycle.class);

workflowDefinitionService = ctx.getBean(WorkflowDefinitionService.class);
workflowDefinitionService.setWorkflowDefinitions(workflowDefinitions);
workflowDefinitions.forEach(workflowDefinitionService::addWorkflowDefinition);

archiveService = ctx.getBean(ArchiveService.class);
healthCheckService = ctx.getBean(HealthCheckService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static org.slf4j.LoggerFactory.getLogger;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -41,7 +40,6 @@ public class WorkflowDispatcher implements Runnable {
private final long sleepTimeMillis;
private final int stuckThreadThresholdSeconds;
private final Random rand = new Random();
private final boolean autoInit;

@Inject
@SuppressFBWarnings(value = "WEM_WEAK_EXCEPTION_MESSAGING", justification = "Transaction support exception message is fine")
Expand All @@ -55,7 +53,6 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao
this.executorDao = executorDao;
this.sleepTimeMillis = env.getRequiredProperty("nflow.dispatcher.sleep.ms", Long.class);
this.stuckThreadThresholdSeconds = env.getRequiredProperty("nflow.executor.stuckThreadThreshold.seconds", Integer.class);
this.autoInit = env.getRequiredProperty("nflow.autoinit", Boolean.class);

if (!executorDao.isTransactionSupportEnabled()) {
throw new BeanCreationException("Transaction support must be enabled");
Expand All @@ -66,17 +63,14 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao
public void run() {
logger.info("Starting.");
try {
if (!autoInit) {
workflowDefinitions.postProcessWorkflowDefinitions();
}
workflowDefinitions.postProcessWorkflowDefinitions();
running = true;
while (!shutdownRequested) {
if (paused) {
sleep(false);
} else {
try {
executor.waitUntilQueueSizeLowerThanThreshold(executorDao.getMaxWaitUntil());

if (!shutdownRequested) {
if (executorDao.tick()) {
workflowInstances.recoverWorkflowInstancesFromDeadNodes();
Expand All @@ -98,9 +92,6 @@ public void run() {
}
}
}

} catch (IOException | ReflectiveOperationException e) {
logger.error("Fetching workflow definitions failed", e);
} finally {
running = false;
shutdownPool();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static org.slf4j.LoggerFactory.getLogger;

import java.io.IOException;
import java.util.concurrent.ThreadFactory;

import javax.inject.Inject;
Expand All @@ -13,27 +12,18 @@
import org.springframework.stereotype.Component;

import io.nflow.engine.config.NFlow;
import io.nflow.engine.service.WorkflowDefinitionService;

@Component
public class WorkflowLifecycle implements SmartLifecycle {
private static final Logger logger = getLogger(WorkflowLifecycle.class);

private final WorkflowDefinitionService workflowDefinitions;
private final WorkflowDispatcher dispatcher;
private final boolean autoStart;
private final Thread dispatcherThread;

@Inject
public WorkflowLifecycle(WorkflowDefinitionService workflowDefinitions, WorkflowDispatcher dispatcher,
@NFlow ThreadFactory nflowThreadFactory, Environment env) throws IOException, ReflectiveOperationException {
public WorkflowLifecycle(WorkflowDispatcher dispatcher, @NFlow ThreadFactory nflowThreadFactory, Environment env) {
this.dispatcher = dispatcher;
this.workflowDefinitions = workflowDefinitions;
if (env.getRequiredProperty("nflow.autoinit", Boolean.class)) {
this.workflowDefinitions.postProcessWorkflowDefinitions();
} else {
logger.info("nFlow engine autoinit disabled (system property nflow.autoinit=false)");
}
autoStart = env.getRequiredProperty("nflow.autostart", Boolean.class);
dispatcherThread = nflowThreadFactory.newThread(dispatcher);
dispatcherThread.setName("nflow-dispatcher");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.nflow.engine.service;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import javax.annotation.Nullable;
import javax.inject.Inject;

import org.slf4j.Logger;
import org.springframework.core.io.AbstractResource;
import org.springframework.stereotype.Component;

import io.nflow.engine.config.NFlow;
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowState;

/**
* Register workflow definitions defined in the class name listing resource.
*/
@Component
public class WorkflowDefinitionClassNameScanner {

private static final Logger logger = getLogger(WorkflowDefinitionClassNameScanner.class);

@Inject
public WorkflowDefinitionClassNameScanner(WorkflowDefinitionService workflowDefinitionService,
@Nullable @NFlow AbstractResource classNameListing) throws IOException, ReflectiveOperationException {
if (classNameListing == null) {
logger.info("No non-Spring workflow definitions");
} else {
try (BufferedReader br = new BufferedReader(new InputStreamReader(classNameListing.getInputStream(), UTF_8))) {
String row;
while ((row = br.readLine()) != null) {
logger.info("Preparing workflow {}", row);
@SuppressWarnings("unchecked")
Class<AbstractWorkflowDefinition<? extends WorkflowState>> clazz = (Class<AbstractWorkflowDefinition<? extends WorkflowState>>) Class
.forName(row);
workflowDefinitionService.addWorkflowDefinition(clazz.getDeclaredConstructor().newInstance());
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
package io.nflow.engine.service;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import javax.inject.Inject;

import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.core.io.AbstractResource;
import org.springframework.stereotype.Component;

import io.nflow.engine.config.NFlow;
import io.nflow.engine.internal.dao.WorkflowDefinitionDao;
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowState;
Expand All @@ -33,36 +25,23 @@ public class WorkflowDefinitionService {

private static final Logger logger = getLogger(WorkflowDefinitionService.class);

private AbstractResource nonSpringWorkflowsListing;
private final Map<String, AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitions = new LinkedHashMap<>();
private volatile Map<String, AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitions = new LinkedHashMap<>();
private final WorkflowDefinitionDao workflowDefinitionDao;
private final boolean persistWorkflowDefinitions;
private final boolean autoInit;

@Inject
public WorkflowDefinitionService(WorkflowDefinitionDao workflowDefinitionDao, Environment env) {
this.workflowDefinitionDao = workflowDefinitionDao;
this.persistWorkflowDefinitions = env.getRequiredProperty("nflow.definition.persist", Boolean.class);
}

/**
* Add given workflow definitions to the managed definitions.
* @param workflowDefinitions The workflow definitions to be added.
*/
@Autowired(required = false)
public void setWorkflowDefinitions(Collection<AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitions) {
for (AbstractWorkflowDefinition<? extends WorkflowState> wd : workflowDefinitions) {
addWorkflowDefinition(wd);
}
}

@Autowired(required = false)
public void setWorkflowDefinitions(@NFlow AbstractResource nflowNonSpringWorkflowsListing) {
this.nonSpringWorkflowsListing = nflowNonSpringWorkflowsListing;
this.autoInit = env.getRequiredProperty("nflow.autoinit", Boolean.class);
}

/**
* Return the workflow definition that matches the give workflow type name.
* @param type Workflow definition type.
*
* @param type
* Workflow definition type.
* @return The workflow definition or null if not found.
*/
public AbstractWorkflowDefinition<?> getWorkflowDefinition(String type) {
Expand All @@ -71,49 +50,45 @@ public AbstractWorkflowDefinition<?> getWorkflowDefinition(String type) {

/**
* Return all managed workflow definitions.
*
* @return List of workflow definitions.
*/
public List<AbstractWorkflowDefinition<? extends WorkflowState>> getWorkflowDefinitions() {
return new ArrayList<>(workflowDefinitions.values());
}

/**
* Add workflow definitions from the nflowNonSpringWorkflowsListing resource and persist
* all loaded workflow definitions.
* @throws IOException when workflow definitions can not be read from the resource.
* @throws ReflectiveOperationException when the workflow definition can not be instantiated.
* Persist all loaded workflow definitions if nflow.autoinit is false and nflow.definition.persist is true. If nflow.autoinit is
* true, definitions are persisted when they are added to managed definitions.
*/
public void postProcessWorkflowDefinitions() throws IOException, ReflectiveOperationException {
if (nonSpringWorkflowsListing == null) {
logger.info("No non-Spring workflow definitions");
} else {
initNonSpringWorkflowDefinitions();
}
if (persistWorkflowDefinitions) {
for (AbstractWorkflowDefinition<?> definition : workflowDefinitions.values()) {
workflowDefinitionDao.storeWorkflowDefinition(definition);
}
public void postProcessWorkflowDefinitions() {
if (!autoInit && persistWorkflowDefinitions) {
workflowDefinitions.values().forEach(workflowDefinitionDao::storeWorkflowDefinition);
}
}

private void initNonSpringWorkflowDefinitions() throws IOException, ReflectiveOperationException {
try (BufferedReader br = new BufferedReader(new InputStreamReader(nonSpringWorkflowsListing.getInputStream(), UTF_8))) {
String row;
while ((row = br.readLine()) != null) {
logger.info("Preparing workflow {}", row);
@SuppressWarnings("unchecked")
Class<AbstractWorkflowDefinition<? extends WorkflowState>> clazz = (Class<AbstractWorkflowDefinition<? extends WorkflowState>>) Class.forName(row);
addWorkflowDefinition(clazz.getDeclaredConstructor().newInstance());
/**
* Add given workflow definition to managed definitions. Persist given definition if nflow.autoinit and nflow.definition.persist
* are true.
*
* @param wd
* The workflow definition to be added.
* @throws IllegalStateException
* When a definition with the same type has already been added.
*/
public void addWorkflowDefinition(AbstractWorkflowDefinition<? extends WorkflowState> wd) {
synchronized (this) {
Map<String, AbstractWorkflowDefinition<? extends WorkflowState>> newDefinitions = new LinkedHashMap<>(workflowDefinitions);
AbstractWorkflowDefinition<? extends WorkflowState> conflict = newDefinitions.put(wd.getType(), wd);
if (conflict != null) {
throw new IllegalStateException("Both " + wd.getClass().getName() + " and " + conflict.getClass().getName()
+ " define same workflow type: " + wd.getType());
}
workflowDefinitions = newDefinitions;
}
}

public void addWorkflowDefinition(AbstractWorkflowDefinition<? extends WorkflowState> wd) {
AbstractWorkflowDefinition<? extends WorkflowState> conflict = workflowDefinitions.put(wd.getType(), wd);
if (conflict != null) {
throw new IllegalStateException("Both " + wd.getClass().getName() + " and " + conflict.getClass().getName() +
" define same workflow type: " + wd.getType());
if (autoInit && persistWorkflowDefinitions) {
workflowDefinitionDao.storeWorkflowDefinition(wd);
}
logger.info("Added workflow type: {} ({})", wd.getType(), wd.getClass().getName());
logger.info("Added workflow type: {} ({})", wd.getType(), wd.getClass().getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.nflow.engine.service;

import java.util.Collection;

import javax.inject.Inject;

import org.springframework.stereotype.Component;

import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowState;

/**
* Register workflow definitions defined as Spring beans.
*/
@Component
public class WorkflowDefinitionSpringBeanScanner {

@Inject
public WorkflowDefinitionSpringBeanScanner(WorkflowDefinitionService workflowDefinitionService,
Collection<AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitions) {
workflowDefinitions.forEach(workflowDefinitionService::addWorkflowDefinition);
}

}
Loading

0 comments on commit 47eea41

Please sign in to comment.