Skip to content

Commit

Permalink
get rid of nflow.autoinit, rename nflow.definition.persist to nflow.d…
Browse files Browse the repository at this point in the history
…efinition.autopersist
  • Loading branch information
Edvard Fonsell committed Jun 4, 2019
1 parent 7cb01a3 commit 347a79a
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.nflow.engine.internal.dao.PollingRaceConditionException;
import io.nflow.engine.internal.dao.WorkflowInstanceDao;
import io.nflow.engine.internal.util.PeriodicLogger;
import io.nflow.engine.service.WorkflowDefinitionService;

@Component
@SuppressFBWarnings(value = "MDM_RANDOM_SEED", justification = "rand does not need to be secure")
Expand All @@ -35,7 +34,6 @@ public class WorkflowDispatcher implements Runnable {
private final WorkflowInstanceExecutor executor;
private final WorkflowInstanceDao workflowInstances;
private final WorkflowStateProcessorFactory stateProcessorFactory;
private final WorkflowDefinitionService workflowDefinitions;
private final ExecutorDao executorDao;
private final long sleepTimeMillis;
private final int stuckThreadThresholdSeconds;
Expand All @@ -44,12 +42,10 @@ public class WorkflowDispatcher implements Runnable {
@Inject
@SuppressFBWarnings(value = "WEM_WEAK_EXCEPTION_MESSAGING", justification = "Transaction support exception message is fine")
public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao workflowInstances,
WorkflowStateProcessorFactory stateProcessorFactory, WorkflowDefinitionService workflowDefinitions, ExecutorDao executorDao,
Environment env) {
WorkflowStateProcessorFactory stateProcessorFactory, ExecutorDao executorDao, Environment env) {
this.executor = executor;
this.workflowInstances = workflowInstances;
this.stateProcessorFactory = stateProcessorFactory;
this.workflowDefinitions = workflowDefinitions;
this.executorDao = executorDao;
this.sleepTimeMillis = env.getRequiredProperty("nflow.dispatcher.sleep.ms", Long.class);
this.stuckThreadThresholdSeconds = env.getRequiredProperty("nflow.executor.stuckThreadThreshold.seconds", Integer.class);
Expand All @@ -63,7 +59,6 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao
public void run() {
logger.info("Starting.");
try {
workflowDefinitions.postProcessWorkflowDefinitions();
running = true;
while (!shutdownRequested) {
if (paused) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ public class WorkflowDefinitionService {

private volatile Map<String, AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitions = new LinkedHashMap<>();
private final WorkflowDefinitionDao workflowDefinitionDao;
private final boolean persistWorkflowDefinitions;
private final boolean autoInit;
private final boolean autoPersistDefinitions;

@Inject
public WorkflowDefinitionService(WorkflowDefinitionDao workflowDefinitionDao, Environment env) {
this.workflowDefinitionDao = workflowDefinitionDao;
this.persistWorkflowDefinitions = env.getRequiredProperty("nflow.definition.persist", Boolean.class);
this.autoInit = env.getRequiredProperty("nflow.autoinit", Boolean.class);
this.autoPersistDefinitions = env.getRequiredProperty("nflow.definition.autopersist", Boolean.class);
}

/**
Expand All @@ -58,18 +56,19 @@ public List<AbstractWorkflowDefinition<? extends WorkflowState>> getWorkflowDefi
}

/**
* Persist all loaded workflow definitions if nflow.autoinit is false and nflow.definition.persist is true. If nflow.autoinit is
* true, definitions are persisted when they are added to managed definitions.
* Persist all loaded workflow definitions to database if nflow.definition.autopersist is false. If nflow.definition.autopersist
* is true, definitions are persisted automatically when they are added to managed definitions.
*/
public void postProcessWorkflowDefinitions() {
if (!autoInit && persistWorkflowDefinitions) {
public void persistWorkflowDefinitions() {
if (!autoPersistDefinitions) {
workflowDefinitions.values().forEach(workflowDefinitionDao::storeWorkflowDefinition);
}
}

/**
* Add given workflow definition to managed definitions. Persist given definition if nflow.autoinit and nflow.definition.persist
* are true.
* Add given workflow definition to managed definitions. Persist given definition to database if nflow.definition.autopersist is
* true. If nflow.definition.autopersist is false, call persistWorkflowDefinitions manually if needed to persist the
* definitions.
*
* @param wd
* The workflow definition to be added.
Expand All @@ -86,7 +85,7 @@ public void addWorkflowDefinition(AbstractWorkflowDefinition<? extends WorkflowS
}
workflowDefinitions = newDefinitions;
}
if (autoInit && persistWorkflowDefinitions) {
if (autoPersistDefinitions) {
workflowDefinitionDao.storeWorkflowDefinition(wd);
}
logger.info("Added workflow type: {} ({})", wd.getType(), wd.getClass().getName());
Expand Down
2 changes: 1 addition & 1 deletion nflow-engine/src/main/resources/nflow-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ nflow.db.max_pool_size=4
nflow.db.idle_timeout_seconds=600
nflow.db.create_on_startup=true

nflow.definition.persist=true
nflow.definition.autopersist=true
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.nflow.engine.internal.dao.ExecutorDao;
import io.nflow.engine.internal.dao.WorkflowInstanceDao;
import io.nflow.engine.listener.WorkflowExecutorListener;
import io.nflow.engine.service.WorkflowDefinitionService;

@ExtendWith(MockitoExtension.class)
public class WorkflowDispatcherTest {
Expand All @@ -59,8 +58,6 @@ public class WorkflowDispatcherTest {
@Mock
WorkflowInstanceDao workflowInstances;
@Mock
WorkflowDefinitionService workflowDefinitions;
@Mock
ExecutorDao executorDao;
@Mock
WorkflowStateProcessorFactory executorFactory;
Expand All @@ -71,7 +68,6 @@ public class WorkflowDispatcherTest {

@BeforeEach
public void setup() {
env.setProperty("nflow.autoinit", "true");
env.setProperty("nflow.dispatcher.sleep.ms", "0");
env.setProperty("nflow.dispatcher.executor.queue.wait_until_threshold", "0");
env.setProperty("nflow.illegal.state.change.action", "ignore");
Expand All @@ -82,7 +78,7 @@ public void setup() {
env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "60");
when(executorDao.isTransactionSupportEnabled()).thenReturn(true);
executor = new WorkflowInstanceExecutor(3, 2, 0, 10, 0, new CustomizableThreadFactory("nflow-executor-"));
dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, workflowDefinitions, executorDao, env);
dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, executorDao, env);
Logger logger = (Logger) getLogger(ROOT_LOGGER_NAME);
logger.addAppender(mockAppender);
}
Expand All @@ -96,7 +92,8 @@ public void teardown() {
@Test
public void workflowDispatcherCreationFailsWithoutTransactionSupport() {
when(executorDao.isTransactionSupportEnabled()).thenReturn(false);
assertThrows(BeanCreationException.class, () -> new WorkflowDispatcher(executor, workflowInstances, executorFactory, workflowDefinitions, executorDao, env));
assertThrows(BeanCreationException.class,
() -> new WorkflowDispatcher(executor, workflowInstances, executorFactory, executorDao, env));
}

@Test
Expand Down Expand Up @@ -252,7 +249,7 @@ class ExceptionOnPoolShutdownIsNotPropagated extends MultithreadedTestCase {
@Override
public void initialize() {
poolSpy = Mockito.spy(executor);
dispatcher = new WorkflowDispatcher(poolSpy, workflowInstances, executorFactory, workflowDefinitions, executorDao, env);
dispatcher = new WorkflowDispatcher(poolSpy, workflowInstances, executorFactory, executorDao, env);
}

public void threadDispatcher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,25 @@ public class WorkflowDefinitionServiceTest extends BaseNflowTest {
@BeforeEach
public void setup() {
lenient().when(workflowDefinition.getType()).thenReturn("dummy");
initializeService(true);
}

private void initializeService(boolean definitionPersist, boolean autoInit) {
when(env.getRequiredProperty("nflow.definition.persist", Boolean.class)).thenReturn(definitionPersist);
when(env.getRequiredProperty("nflow.autoinit", Boolean.class)).thenReturn(autoInit);
private void initializeService(boolean autoPersistDefinition) {
when(env.getRequiredProperty("nflow.definition.autopersist", Boolean.class)).thenReturn(autoPersistDefinition);
service = new WorkflowDefinitionService(workflowDefinitionDao, env);
}

@Test
public void addedDefinitionIsStoredWhenAutoInitIsTrue() {
initializeService(true, true);

public void addedDefinitionIsStoredWhenAutoPersistDefinitionIsTrue() {
service.addWorkflowDefinition(workflowDefinition);

verify(workflowDefinitionDao).storeWorkflowDefinition(workflowDefinition);
assertThat(service.getWorkflowDefinitions().size(), is(equalTo(1)));
}

@Test
public void addedDefinitionIsNotStoredWhenAutoInitIsFalse() {
initializeService(true, false);
public void addedDefinitionIsNotStoredWhenAutoPersistDefinitionIsFalse() {
initializeService(false);

service.addWorkflowDefinition(workflowDefinition);

Expand All @@ -63,53 +61,31 @@ public void addedDefinitionIsNotStoredWhenAutoInitIsFalse() {
}

@Test
public void addedDefinitionIsNotStoredWhenDefinitionPersistIsFalse() {
initializeService(false, true);

public void persistWorkflowDefinitionStoresDefinitionsWhenAutoPersistDefinitionIsFalse() {
initializeService(false);
service.addWorkflowDefinition(workflowDefinition);

verifyZeroInteractions(workflowDefinitionDao);
assertThat(service.getWorkflowDefinitions().size(), is(equalTo(1)));
}

@Test
public void definitionsAreStoredDuringPostProcessingWhenAutoInitIsFalse() {
initializeService(true, false);
service.addWorkflowDefinition(workflowDefinition);

service.postProcessWorkflowDefinitions();
service.persistWorkflowDefinitions();

verify(workflowDefinitionDao).storeWorkflowDefinition(workflowDefinition);
assertThat(service.getWorkflowDefinitions().size(), is(equalTo(1)));
}

@Test
public void definitionsAreNotStoredDuringPostProcessingWhenAutoInitIsTrue() {
initializeService(true, true);
public void persistWorkflowDefinitionDoesNotStoreDefinitionsWhenAutoPersistDefinitionIsTrue() {
service.addWorkflowDefinition(workflowDefinition);
verify(workflowDefinitionDao).storeWorkflowDefinition(workflowDefinition);

service.postProcessWorkflowDefinitions();
service.persistWorkflowDefinitions();

verifyNoMoreInteractions(workflowDefinitionDao);
assertThat(service.getWorkflowDefinitions().size(), is(equalTo(1)));
}

@Test
public void definitionsAreNotStoredDuringPostProcessingWhenDefinitionPersistIsFalse() {
initializeService(false, false);
service.addWorkflowDefinition(workflowDefinition);

service.postProcessWorkflowDefinitions();

verifyZeroInteractions(workflowDefinitionDao);
assertThat(service.getWorkflowDefinitions().size(), is(equalTo(1)));
}

@Test
public void addingDuplicatDefinitionThrowsException() {
initializeService(true, true);
service.addWorkflowDefinition(workflowDefinition);
verify(workflowDefinitionDao).storeWorkflowDefinition(workflowDefinition);

IllegalStateException thrown = assertThrows(IllegalStateException.class,
() -> service.addWorkflowDefinition(workflowDefinition));
Expand All @@ -118,22 +94,19 @@ public void addingDuplicatDefinitionThrowsException() {
assertThat(thrown.getMessage(),
containsString("Both " + className + " and " + className + " define same workflow type: dummy"));
assertThat(service.getWorkflowDefinitions().size(), is(equalTo(1)));
}

@Test
public void getWorkflowDefinitionReturnsNullWhenTypeIsNotFound() {
initializeService(true, true);

assertThat(service.getWorkflowDefinition("notFound"), is(nullValue()));
verifyNoMoreInteractions(workflowDefinitionDao);
}

@Test
public void getWorkflowDefinitionReturnsDefinitionWhenTypeIsFound() {
initializeService(true, true);

service.addWorkflowDefinition(workflowDefinition);

assertThat(service.getWorkflowDefinition("dummy"), is(instanceOf(DummyTestWorkflow.class)));
}

@Test
public void getWorkflowDefinitionReturnsNullWhenTypeIsNotFound() {
assertThat(service.getWorkflowDefinition("dummy"), is(nullValue()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static class ContextConfiguration {
@Bean
@Primary
public Environment env() {
return new MockEnvironment().withProperty("nflow.definition.persist", "true").withProperty("nflow.autoinit", "true");
return new MockEnvironment().withProperty("nflow.definition.autopersist", "true");
}

@Bean
Expand Down
4 changes: 2 additions & 2 deletions nflow-netty/src/test/java/io/nflow/netty/StartNflowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void startNflowNetty() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("nflow.db.create_on_startup", false);
properties.put("nflow.autostart", false);
properties.put("nflow.autoinit", false);
properties.put("nflow.definition.autopersist", false);
ApplicationContext ctx = startNflow.startNetty(7500, "local", "", properties);

assertNotNull(testListener.applicationContextEvent);
Expand All @@ -39,7 +39,7 @@ public void startNflowNetty() throws Exception {
assertEquals("externallyDefinedExecutorGroup", ctx.getEnvironment().getProperty("nflow.executor.group"));
assertEquals("false", ctx.getEnvironment().getProperty("nflow.db.create_on_startup"));
assertEquals("false", ctx.getEnvironment().getProperty("nflow.autostart"));
assertEquals("false", ctx.getEnvironment().getProperty("nflow.autoinit"));
assertEquals("false", ctx.getEnvironment().getProperty("nflow.definition.autopersist"));
}

}
22 changes: 12 additions & 10 deletions nflow-tests/src/test/java/io/nflow/tests/SkipAutoStartTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@

import static org.junit.jupiter.api.Assertions.assertNotNull;

import io.nflow.tests.extension.NflowServerConfig;
import io.nflow.tests.extension.NflowServerExtension;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;

import io.nflow.tests.extension.NflowServerConfig;
import io.nflow.tests.extension.NflowServerExtension;

@ExtendWith(NflowServerExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SkipAutoStartTest extends AbstractNflowTest {

// When nflow.autoinit, nflow.autostart and nflow.db.create_on_startup are false
// no database access should happen. This test fails if SQL statements are
// issued during bean initialization.
public static NflowServerConfig server = new NflowServerConfig.Builder()
.prop("nflow.autoinit", "false")
.prop("nflow.autostart", "false")
.prop("nflow.db.create_on_startup", "false")
.build();
/**
* When nflow.definition.autopersist, nflow.autostart and nflow.db.create_on_startup are false no database access should happen.
* This test fails if SQL statements are issued during bean initialization.
*/
public static NflowServerConfig server = new NflowServerConfig.Builder() //
.prop("nflow.definition.autopersist", "false") //
.prop("nflow.autostart", "false") //
.prop("nflow.db.create_on_startup", "false") //
.build();

public SkipAutoStartTest() {
super(server);
Expand Down

0 comments on commit 347a79a

Please sign in to comment.