Skip to content

Commit

Permalink
Merge ffabd2c into 01a01e6
Browse files Browse the repository at this point in the history
  • Loading branch information
Edvard Fonsell committed Jul 12, 2019
2 parents 01a01e6 + ffabd2c commit 802fcbd
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 83 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
**Highlights**
- Use constructor injection instead of field or setter injection in nFlow classes
- Separate workflow definition scanning from `WorkflowDefinitionService`
- Replace nflow.autoinit and nflow.definition.persist configuration options with nflow.definition.autopersist.

**Details**
- `nflow-engine`
- Separate workflow definition scanning from `WorkflowDefinitionService` by introducing `WorkflowDefinitionSpringBeanScanner` and `WorkflowDefinitionClassNameScanner`. This allows breaking the circular dependency when a workflow definition uses `WorkflowInstanceService` (which depends on `WorkflowDefinitionService`, which depended on all workflow definitions). This enabled using constructor injection in all nFlow classes.
- nflow.definition.autopersist=true works as having nflow.autoinit=true and nflow.definition.persist=true, e.g. definitions are persisted to database when they are registered to nFlow engine. When nflow.definition.autopersist is false, definitions are not automatically persisted when they are added, nor when nFlow engine starts. Instead, the definitions can be persisted by calling `WorkflowDefinitionService.persistWorkflowDefinitions` manually.

## 5.7.0 (2019-06-06)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.nflow.engine.internal.dao.PollingRaceConditionException;
import io.nflow.engine.internal.dao.WorkflowInstanceDao;
import io.nflow.engine.internal.util.PeriodicLogger;
import io.nflow.engine.service.WorkflowDefinitionService;

@Component
@SuppressFBWarnings(value = "MDM_RANDOM_SEED", justification = "rand does not need to be secure")
Expand All @@ -35,7 +34,6 @@ public class WorkflowDispatcher implements Runnable {
private final WorkflowInstanceExecutor executor;
private final WorkflowInstanceDao workflowInstances;
private final WorkflowStateProcessorFactory stateProcessorFactory;
private final WorkflowDefinitionService workflowDefinitions;
private final ExecutorDao executorDao;
private final long sleepTimeMillis;
private final int stuckThreadThresholdSeconds;
Expand All @@ -44,12 +42,10 @@ public class WorkflowDispatcher implements Runnable {
@Inject
@SuppressFBWarnings(value = "WEM_WEAK_EXCEPTION_MESSAGING", justification = "Transaction support exception message is fine")
public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao workflowInstances,
WorkflowStateProcessorFactory stateProcessorFactory, WorkflowDefinitionService workflowDefinitions, ExecutorDao executorDao,
Environment env) {
WorkflowStateProcessorFactory stateProcessorFactory, ExecutorDao executorDao, Environment env) {
this.executor = executor;
this.workflowInstances = workflowInstances;
this.stateProcessorFactory = stateProcessorFactory;
this.workflowDefinitions = workflowDefinitions;
this.executorDao = executorDao;
this.sleepTimeMillis = env.getRequiredProperty("nflow.dispatcher.sleep.ms", Long.class);
this.stuckThreadThresholdSeconds = env.getRequiredProperty("nflow.executor.stuckThreadThreshold.seconds", Integer.class);
Expand All @@ -63,7 +59,6 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao
public void run() {
logger.info("Starting.");
try {
workflowDefinitions.postProcessWorkflowDefinitions();
running = true;
while (!shutdownRequested) {
if (paused) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ public class WorkflowDefinitionService {

private volatile Map<String, AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitions = new LinkedHashMap<>();
private final WorkflowDefinitionDao workflowDefinitionDao;
private final boolean persistWorkflowDefinitions;
private final boolean autoInit;
private final boolean autoPersistDefinitions;

@Inject
public WorkflowDefinitionService(WorkflowDefinitionDao workflowDefinitionDao, Environment env) {
this.workflowDefinitionDao = workflowDefinitionDao;
this.persistWorkflowDefinitions = env.getRequiredProperty("nflow.definition.persist", Boolean.class);
this.autoInit = env.getRequiredProperty("nflow.autoinit", Boolean.class);
this.autoPersistDefinitions = env.getRequiredProperty("nflow.definition.autopersist", Boolean.class);
}

/**
Expand All @@ -58,18 +56,19 @@ public List<AbstractWorkflowDefinition<? extends WorkflowState>> getWorkflowDefi
}

/**
* Persist all loaded workflow definitions if nflow.autoinit is false and nflow.definition.persist is true. If nflow.autoinit is
* true, definitions are persisted when they are added to managed definitions.
* Persist all loaded workflow definitions to database if nflow.definition.autopersist is false. If nflow.definition.autopersist
* is true, definitions are persisted automatically when they are added to managed definitions.
*/
public void postProcessWorkflowDefinitions() {
if (!autoInit && persistWorkflowDefinitions) {
public void persistWorkflowDefinitions() {
if (!autoPersistDefinitions) {
workflowDefinitions.values().forEach(workflowDefinitionDao::storeWorkflowDefinition);
}
}

/**
* Add given workflow definition to managed definitions. Persist given definition if nflow.autoinit and nflow.definition.persist
* are true.
* Add given workflow definition to managed definitions. Persist given definition to database if nflow.definition.autopersist is
* true. If nflow.definition.autopersist is false, call persistWorkflowDefinitions manually if needed to persist the
* definitions.
*
* @param wd
* The workflow definition to be added.
Expand All @@ -86,7 +85,7 @@ public void addWorkflowDefinition(AbstractWorkflowDefinition<? extends WorkflowS
}
workflowDefinitions = newDefinitions;
}
if (autoInit && persistWorkflowDefinitions) {
if (autoPersistDefinitions) {
workflowDefinitionDao.storeWorkflowDefinition(wd);
}
logger.info("Added workflow type: {} ({})", wd.getType(), wd.getClass().getName());
Expand Down
2 changes: 1 addition & 1 deletion nflow-engine/src/main/resources/nflow-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ nflow.db.max_pool_size=4
nflow.db.idle_timeout_seconds=600
nflow.db.create_on_startup=true

nflow.definition.persist=true
nflow.definition.autopersist=true
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.nflow.engine.internal.dao.ExecutorDao;
import io.nflow.engine.internal.dao.WorkflowInstanceDao;
import io.nflow.engine.listener.WorkflowExecutorListener;
import io.nflow.engine.service.WorkflowDefinitionService;

@ExtendWith(MockitoExtension.class)
public class WorkflowDispatcherTest {
Expand All @@ -59,8 +58,6 @@ public class WorkflowDispatcherTest {
@Mock
WorkflowInstanceDao workflowInstances;
@Mock
WorkflowDefinitionService workflowDefinitions;
@Mock
ExecutorDao executorDao;
@Mock
WorkflowStateProcessorFactory executorFactory;
Expand All @@ -71,7 +68,6 @@ public class WorkflowDispatcherTest {

@BeforeEach
public void setup() {
env.setProperty("nflow.autoinit", "true");
env.setProperty("nflow.dispatcher.sleep.ms", "0");
env.setProperty("nflow.dispatcher.executor.queue.wait_until_threshold", "0");
env.setProperty("nflow.illegal.state.change.action", "ignore");
Expand All @@ -82,7 +78,7 @@ public void setup() {
env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "60");
when(executorDao.isTransactionSupportEnabled()).thenReturn(true);
executor = new WorkflowInstanceExecutor(3, 2, 0, 10, 0, new CustomizableThreadFactory("nflow-executor-"));
dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, workflowDefinitions, executorDao, env);
dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, executorDao, env);
Logger logger = (Logger) getLogger(ROOT_LOGGER_NAME);
logger.addAppender(mockAppender);
}
Expand All @@ -96,7 +92,8 @@ public void teardown() {
@Test
public void workflowDispatcherCreationFailsWithoutTransactionSupport() {
when(executorDao.isTransactionSupportEnabled()).thenReturn(false);
assertThrows(BeanCreationException.class, () -> new WorkflowDispatcher(executor, workflowInstances, executorFactory, workflowDefinitions, executorDao, env));
assertThrows(BeanCreationException.class,
() -> new WorkflowDispatcher(executor, workflowInstances, executorFactory, executorDao, env));
}

@Test
Expand Down Expand Up @@ -252,7 +249,7 @@ class ExceptionOnPoolShutdownIsNotPropagated extends MultithreadedTestCase {
@Override
public void initialize() {
poolSpy = Mockito.spy(executor);
dispatcher = new WorkflowDispatcher(poolSpy, workflowInstances, executorFactory, workflowDefinitions, executorDao, env);
dispatcher = new WorkflowDispatcher(poolSpy, workflowInstances, executorFactory, executorDao, env);
}

public void threadDispatcher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,25 @@ public class WorkflowDefinitionServiceTest extends BaseNflowTest {
@BeforeEach
public void setup() {
lenient().when(workflowDefinition.getType()).thenReturn("dummy");
initializeService(true);
}

private void initializeService(boolean definitionPersist, boolean autoInit) {
when(env.getRequiredProperty("nflow.definition.persist", Boolean.class)).thenReturn(definitionPersist);
when(env.getRequiredProperty("nflow.autoinit", Boolean.class)).thenReturn(autoInit);
private void initializeService(boolean autoPersistDefinition) {
when(env.getRequiredProperty("nflow.definition.autopersist", Boolean.class)).thenReturn(autoPersistDefinition);
service = new WorkflowDefinitionService(workflowDefinitionDao, env);
}

@Test
public void addedDefinitionIsStoredWhenAutoInitIsTrue() {
initializeService(true, true);

public void addedDefinitionIsStoredWhenAutoPersistDefinitionIsTrue() {
service.addWorkflowDefinition(workflowDefinition);

verify(workflowDefinitionDao).storeWorkflowDefinition(workflowDefinition);
assertThat(service.getWorkflowDefinitions().size(), is(equalTo(1)));
}

@Test
public void addedDefinitionIsNotStoredWhenAutoInitIsFalse() {
initializeService(true, false);
public void addedDefinitionIsNotStoredWhenAutoPersistDefinitionIsFalse() {
initializeService(false);

service.addWorkflowDefinition(workflowDefinition);

Expand All @@ -63,53 +61,31 @@ public void addedDefinitionIsNotStoredWhenAutoInitIsFalse() {
}

@Test
public void addedDefinitionIsNotStoredWhenDefinitionPersistIsFalse() {
initializeService(false, true);

public void persistWorkflowDefinitionStoresDefinitionsWhenAutoPersistDefinitionIsFalse() {
initializeService(false);
service.addWorkflowDefinition(workflowDefinition);

verifyZeroInteractions(workflowDefinitionDao);
assertThat(service.getWorkflowDefinitions().size(), is(equalTo(1)));
}

@Test
public void definitionsAreStoredDuringPostProcessingWhenAutoInitIsFalse() {
initializeService(true, false);
service.addWorkflowDefinition(workflowDefinition);

service.postProcessWorkflowDefinitions();
service.persistWorkflowDefinitions();

verify(workflowDefinitionDao).storeWorkflowDefinition(workflowDefinition);
assertThat(service.getWorkflowDefinitions().size(), is(equalTo(1)));
}

@Test
public void definitionsAreNotStoredDuringPostProcessingWhenAutoInitIsTrue() {
initializeService(true, true);
public void persistWorkflowDefinitionDoesNotStoreDefinitionsWhenAutoPersistDefinitionIsTrue() {
service.addWorkflowDefinition(workflowDefinition);
verify(workflowDefinitionDao).storeWorkflowDefinition(workflowDefinition);

service.postProcessWorkflowDefinitions();
service.persistWorkflowDefinitions();

verifyNoMoreInteractions(workflowDefinitionDao);
assertThat(service.getWorkflowDefinitions().size(), is(equalTo(1)));
}

@Test
public void definitionsAreNotStoredDuringPostProcessingWhenDefinitionPersistIsFalse() {
initializeService(false, false);
service.addWorkflowDefinition(workflowDefinition);

service.postProcessWorkflowDefinitions();

verifyZeroInteractions(workflowDefinitionDao);
assertThat(service.getWorkflowDefinitions().size(), is(equalTo(1)));
}

@Test
public void addingDuplicatDefinitionThrowsException() {
initializeService(true, true);
service.addWorkflowDefinition(workflowDefinition);
verify(workflowDefinitionDao).storeWorkflowDefinition(workflowDefinition);

IllegalStateException thrown = assertThrows(IllegalStateException.class,
() -> service.addWorkflowDefinition(workflowDefinition));
Expand All @@ -118,22 +94,19 @@ public void addingDuplicatDefinitionThrowsException() {
assertThat(thrown.getMessage(),
containsString("Both " + className + " and " + className + " define same workflow type: dummy"));
assertThat(service.getWorkflowDefinitions().size(), is(equalTo(1)));
}

@Test
public void getWorkflowDefinitionReturnsNullWhenTypeIsNotFound() {
initializeService(true, true);

assertThat(service.getWorkflowDefinition("notFound"), is(nullValue()));
verifyNoMoreInteractions(workflowDefinitionDao);
}

@Test
public void getWorkflowDefinitionReturnsDefinitionWhenTypeIsFound() {
initializeService(true, true);

service.addWorkflowDefinition(workflowDefinition);

assertThat(service.getWorkflowDefinition("dummy"), is(instanceOf(DummyTestWorkflow.class)));
}

@Test
public void getWorkflowDefinitionReturnsNullWhenTypeIsNotFound() {
assertThat(service.getWorkflowDefinition("dummy"), is(nullValue()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static class ContextConfiguration {
@Bean
@Primary
public Environment env() {
return new MockEnvironment().withProperty("nflow.definition.persist", "true").withProperty("nflow.autoinit", "true");
return new MockEnvironment().withProperty("nflow.definition.autopersist", "true");
}

@Bean
Expand Down
4 changes: 2 additions & 2 deletions nflow-netty/src/test/java/io/nflow/netty/StartNflowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void startNflowNetty() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("nflow.db.create_on_startup", false);
properties.put("nflow.autostart", false);
properties.put("nflow.autoinit", false);
properties.put("nflow.definition.autopersist", false);
ApplicationContext ctx = startNflow.startNetty(7500, "local", "", properties);

assertNotNull(testListener.applicationContextEvent);
Expand All @@ -39,7 +39,7 @@ public void startNflowNetty() throws Exception {
assertEquals("externallyDefinedExecutorGroup", ctx.getEnvironment().getProperty("nflow.executor.group"));
assertEquals("false", ctx.getEnvironment().getProperty("nflow.db.create_on_startup"));
assertEquals("false", ctx.getEnvironment().getProperty("nflow.autostart"));
assertEquals("false", ctx.getEnvironment().getProperty("nflow.autoinit"));
assertEquals("false", ctx.getEnvironment().getProperty("nflow.definition.autopersist"));
}

}
22 changes: 12 additions & 10 deletions nflow-tests/src/test/java/io/nflow/tests/SkipAutoStartTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@

import static org.junit.jupiter.api.Assertions.assertNotNull;

import io.nflow.tests.extension.NflowServerConfig;
import io.nflow.tests.extension.NflowServerExtension;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;

import io.nflow.tests.extension.NflowServerConfig;
import io.nflow.tests.extension.NflowServerExtension;

@ExtendWith(NflowServerExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SkipAutoStartTest extends AbstractNflowTest {

// When nflow.autoinit, nflow.autostart and nflow.db.create_on_startup are false
// no database access should happen. This test fails if SQL statements are
// issued during bean initialization.
public static NflowServerConfig server = new NflowServerConfig.Builder()
.prop("nflow.autoinit", "false")
.prop("nflow.autostart", "false")
.prop("nflow.db.create_on_startup", "false")
.build();
/**
* When nflow.definition.autopersist, nflow.autostart and nflow.db.create_on_startup are false no database access should happen.
* This test fails if SQL statements are issued during bean initialization.
*/
public static NflowServerConfig server = new NflowServerConfig.Builder() //
.prop("nflow.definition.autopersist", "false") //
.prop("nflow.autostart", "false") //
.prop("nflow.db.create_on_startup", "false") //
.build();

public SkipAutoStartTest() {
super(server);
Expand Down

0 comments on commit 802fcbd

Please sign in to comment.