Skip to content

Commit

Permalink
Merge 5ff5f85 into 0a5bf18
Browse files Browse the repository at this point in the history
  • Loading branch information
efonsell authored Feb 18, 2020
2 parents 0a5bf18 + 5ff5f85 commit a685414
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- `nflow-engine`
- Improve workflow instance archiving performance. Archiving has been in practice unusable in some scenarios.
- Add support for deleting workflow instances, actions and state variables from production and archive tables.
- Add `CronWorkflow` abstract class to help implement long running workflows that execute scheduled tasks.
- Remove support for `nflow.executor.fetchChildWorkflowIds` configuration property.

**Details**
Expand All @@ -21,6 +22,7 @@
- Removed unnecessary indices and foreign keys and added missing indices to improve nFlow database performance. See database update scripts for details.
- Added name for all existing and new constraints in create scripts, if they did not have one yet. This is to make modify operations easier in future. All of these may not be covered in database update scripts.
- See `MaintenanceService` and `MaintenanceConfiguration` for details on how to archive and delete workflow instances. The maintenance operations can now be limited by workflow type as well.
- See `MaintenanceCronWorkflow` for an example on how to use `CronWorkflow` to schedule tasks.
- As ArchiveService is removed, the old functionality of `ArchiveService.archiveWorkflows(DateTime olderThan, int batchSize)` can now be achieved with
`MaintenanceService.cleanupWorkflows(new MaintenanceConfiguration.Builder()
.setArchiveWorkflows(new ConfigurationItem.Builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package io.nflow.engine.workflow.definition;

import static io.nflow.engine.workflow.definition.CronWorkflow.State.done;
import static io.nflow.engine.workflow.definition.CronWorkflow.State.error;
import static io.nflow.engine.workflow.definition.CronWorkflow.State.executeTask;
import static io.nflow.engine.workflow.definition.CronWorkflow.State.handleFailure;
import static io.nflow.engine.workflow.definition.NextAction.moveToState;
import static io.nflow.engine.workflow.definition.NextAction.moveToStateAfter;
import static io.nflow.engine.workflow.definition.WorkflowStateType.end;
import static io.nflow.engine.workflow.definition.WorkflowStateType.manual;
import static io.nflow.engine.workflow.definition.WorkflowStateType.normal;
import static io.nflow.engine.workflow.definition.WorkflowStateType.start;
import static org.slf4j.LoggerFactory.getLogger;

import org.joda.time.DateTime;
import org.slf4j.Logger;

/**
* A workflow that executes a task at given times. Similar to cron job or scheduled method but guarantees single execution in the
* cluster.
*/
public abstract class CronWorkflow extends WorkflowDefinition<CronWorkflow.State> {

private static final Logger logger = getLogger(CronWorkflow.class);

/**
* The workflow states.
*/
public enum State implements io.nflow.engine.workflow.definition.WorkflowState {
executeTask(start), handleFailure(normal), done(end), error(manual);

private WorkflowStateType type;

State(WorkflowStateType type) {
this.type = type;
}

@Override
public WorkflowStateType getType() {
return type;
}

@Override
public String getDescription() {
return name();
}
}

/**
* Call this from your cron workflow implementation constructor.
*
* @param type
* The type of your cron workflow
* @param settings
* The workflow settings
*/
protected CronWorkflow(String type, WorkflowSettings settings) {
super(type, executeTask, handleFailure, settings);
permit(executeTask, executeTask);
permit(executeTask, done);
permit(handleFailure, executeTask, error);
permit(handleFailure, done);
}

/**
* Executes the task and reschedules it or goes to the `done` state.
*
* @param execution
* State execution
* @return The next action
*/
public NextAction executeTask(StateExecution execution) {
if (executeTaskImpl(execution)) {
return moveToStateAfter(executeTask, getNextExecutionTime(), "Task executed successfully");
}
return moveToState(done, "Task executed successfully");
}

/**
* Handles the failure and reschedules the task or goes to the `done` state.
*
* @param execution
* State execution
* @return The next action
*/
public NextAction handleFailure(StateExecution execution) {
if (handleFailureImpl(execution)) {
return moveToStateAfter(executeTask, getNextExecutionTime(), "Failure handled successfully");
}
return moveToState(done, "Failure handled successfully");
}

/**
* Override this method to execute the task.
*
* @param execution
* State execution
* @return True if the task should be rescheduled, false otherwise.
*/
protected abstract boolean executeTaskImpl(StateExecution execution);

/**
* Override this method to implement scheduling.
*
* @return The next time when the task should be executed.
*/
protected abstract DateTime getNextExecutionTime();

/**
* Override this method to handle persistent failures. Default implementation logs an error and returns false to avoid
* rescheduling the task execution.
*
* @param execution
* State execution
* @return True if the task should be rescheduled, false otherwise.
*/
protected boolean handleFailureImpl(StateExecution execution) {
logger.error("Persistent failure in task execution of cron workflow {}. Task is not rescheduled.", getType());
return false;
}

}
3 changes: 3 additions & 0 deletions nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.nflow.metrics.NflowMetricsContext;
import io.nflow.tests.demo.workflow.DemoBulkWorkflow;
import io.nflow.tests.demo.workflow.DemoWorkflow;
import io.nflow.tests.demo.workflow.MaintenanceCronWorkflow;

public class DemoServer {

Expand Down Expand Up @@ -75,5 +76,7 @@ private static void insertDemoWorkflows() {
workflowInstanceService.insertWorkflowInstance(instance);
instance = new WorkflowInstance.Builder().setType(FOREVER_WAITING_WORKFLOW_TYPE).build();
workflowInstanceService.insertWorkflowInstance(instance);
instance = new WorkflowInstance.Builder().setType(MaintenanceCronWorkflow.TYPE).build();
workflowInstanceService.insertWorkflowInstance(instance);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.nflow.tests.demo.workflow;

import static org.joda.time.DateTime.now;
import static org.joda.time.Period.days;

import javax.inject.Inject;

import org.joda.time.DateTime;
import org.springframework.stereotype.Component;

import io.nflow.engine.service.MaintenanceConfiguration;
import io.nflow.engine.service.MaintenanceConfiguration.ConfigurationItem;
import io.nflow.engine.service.MaintenanceService;
import io.nflow.engine.workflow.definition.CronWorkflow;
import io.nflow.engine.workflow.definition.StateExecution;
import io.nflow.engine.workflow.definition.WorkflowSettings;

/**
* Example cron workflow that cleans up old workflow instances that are older than one day every hour.
*/
@Component
public class MaintenanceCronWorkflow extends CronWorkflow {

public static final String TYPE = "maintenanceCronWorkflow";
private final MaintenanceService maintenanceService;
private final MaintenanceConfiguration configuration;

@Inject
public MaintenanceCronWorkflow(MaintenanceService maintenanceService) {
super(TYPE, new WorkflowSettings.Builder().build());
setDescription("Example cron workflow that cleans up old workflow instances that are older than one day every hour.");
this.maintenanceService = maintenanceService;
configuration = new MaintenanceConfiguration.Builder()
.setDeleteWorkflows(new ConfigurationItem.Builder().setOlderThanPeriod(days(1)).build()).build();
}

@Override
protected boolean executeTaskImpl(StateExecution execution) {
maintenanceService.cleanupWorkflows(configuration);
return true;
}

@Override
protected DateTime getNextExecutionTime() {
return now().plusHours(1);
}

}

0 comments on commit a685414

Please sign in to comment.