Skip to content

Commit

Permalink
Materialized workflow definitions WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
eputtone authored and Edvard Fonsell committed Dec 12, 2014
1 parent fe7c71d commit 4c73c6d
Show file tree
Hide file tree
Showing 16 changed files with 463 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package com.nitorcreations.nflow.engine.internal.dao;

import static java.util.Collections.sort;
import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.util.CollectionUtils.isEmpty;

import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import javax.inject.Inject;

import org.slf4j.Logger;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nitorcreations.nflow.engine.internal.config.NFlow;
import com.nitorcreations.nflow.engine.internal.workflow.StoredWorkflowDefinition;
import com.nitorcreations.nflow.engine.workflow.definition.WorkflowDefinition;
import com.nitorcreations.nflow.engine.workflow.definition.WorkflowState;

@Component
public class WorkflowDefinitionDao {

private static final Logger logger = getLogger(WorkflowDefinitionDao.class);
private ExecutorDao executorInfo;
private NamedParameterJdbcTemplate namedJdbc;
private ObjectMapper nflowObjectMapper;

@Inject
public void setExecutorDao(ExecutorDao executorDao) {
this.executorInfo = executorDao;
}

@Inject
public void setNamedParameterJdbcTemplate(@NFlow NamedParameterJdbcTemplate nflowNamedParameterJdbcTemplate) {
this.namedJdbc = nflowNamedParameterJdbcTemplate;
}

@Inject
public void setObjectMapper(@NFlow ObjectMapper nflowObjectMapper) {
this.nflowObjectMapper = nflowObjectMapper;
}

public void storeWorkflowDefinition(WorkflowDefinition<? extends WorkflowState> definition) {
StoredWorkflowDefinition storedDefinition = convert(definition);
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("type", definition.getType());
params.addValue("definition", serializeDefinition(storedDefinition));
params.addValue("modified_by", executorInfo.getExecutorId());
params.addValue("executor_group", executorInfo.getExecutorGroup());
String sql = "update nflow_workflow_definition "
+ "set definition = :definition, modified_by = :modified_by "
+ "where type = :type and executor_group = :executor_group and definition <> :definition";
int updatedRows = namedJdbc.update(sql, params);
if (updatedRows == 0) {
sql = "insert into nflow_workflow_definition(type, definition, modified_by, executor_group) "
+ "values (:type, :definition, :modified_by, :executor_group)";
try {
namedJdbc.update(sql, params);
} catch (DataIntegrityViolationException dex) {
logger.debug("Another executor already stored the definition.", dex);
}
}
}

public List<StoredWorkflowDefinition> queryStoredWorkflowDefinitions(Collection<String> types) {
String sql = "select definition from nflow_workflow_definition where " + executorInfo.getExecutorGroupCondition();
MapSqlParameterSource params = new MapSqlParameterSource();
if (!isEmpty(types)) {
sql += " and type in (:types)";
params.addValue("types", types);
}
return namedJdbc.query(sql, params, new RowMapper<StoredWorkflowDefinition>() {
@Override
public StoredWorkflowDefinition mapRow(ResultSet rs, int rowNum) throws SQLException {
return deserializeDefinition(rs.getString("definition"));
}
});
}

StoredWorkflowDefinition convert(WorkflowDefinition<? extends WorkflowState> definition) {
StoredWorkflowDefinition resp = new StoredWorkflowDefinition();
resp.type = definition.getType();
resp.description = definition.getDescription();
resp.onError = definition.getErrorState().name();
Map<String, StoredWorkflowDefinition.State> states = new HashMap<>();
for (WorkflowState state : definition.getStates()) {
states.put(state.name(), new StoredWorkflowDefinition.State(state.name(), state.getType().name(), state.getDescription()));
}
for (Entry<String, List<String>> entry : definition.getAllowedTransitions().entrySet()) {
StoredWorkflowDefinition.State state = states.get(entry.getKey());
for (String targetState : entry.getValue()) {
state.transitions.add(targetState);
}
sort(state.transitions);
}
for (Entry<String, WorkflowState> entry : definition.getFailureTransitions().entrySet()) {
StoredWorkflowDefinition.State state = states.get(entry.getKey());
state.onFailure = entry.getValue().name();
}
resp.states = new ArrayList<>(states.values());
sort(resp.states);
return resp;
}

private String serializeDefinition(StoredWorkflowDefinition storedDefinition) {
try {
return nflowObjectMapper.writeValueAsString(storedDefinition);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize workflow definition " + storedDefinition.type, e);
}
}

StoredWorkflowDefinition deserializeDefinition(String serializedDefinition) {
try {
return nflowObjectMapper.readValue(serializedDefinition, StoredWorkflowDefinition.class);
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize workflow definition " + serializedDefinition, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.nitorcreations.nflow.engine.internal.workflow;

import java.util.ArrayList;
import java.util.List;

public class StoredWorkflowDefinition {
public String type;
public String description;
public String onError;
public List<State> states;

public static class State implements Comparable<State> {

public State() {
// default constructor is required by Jackson deserializer
}

public State(String id, String type, String description) {
this.id = id;
this.type = type;
this.description = description;
}

public String id;
public String type;
public String description;
public List<String> transitions = new ArrayList<>();
public String onFailure;

@Override
public int compareTo(State state) {
return type.compareTo(state.type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.springframework.stereotype.Component;

import com.nitorcreations.nflow.engine.internal.config.NFlow;
import com.nitorcreations.nflow.engine.internal.dao.WorkflowDefinitionDao;
import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
import com.nitorcreations.nflow.engine.workflow.definition.StateExecutionStatistics;
import com.nitorcreations.nflow.engine.workflow.definition.WorkflowDefinition;
Expand All @@ -38,11 +39,14 @@ public class WorkflowDefinitionService {
private final AbstractResource nonSpringWorkflowsListing;
private final Map<String, WorkflowDefinition<? extends WorkflowState>> workflowDefitions = new LinkedHashMap<>();
private final WorkflowInstanceDao workflowInstanceDao;
private final WorkflowDefinitionDao workflowDefinitionDao;

@Inject
public WorkflowDefinitionService(@NFlow AbstractResource nflowNonSpringWorkflowsListing, WorkflowInstanceDao workflowInstanceDao) {
public WorkflowDefinitionService(@NFlow AbstractResource nflowNonSpringWorkflowsListing,
WorkflowInstanceDao workflowInstanceDao, WorkflowDefinitionDao workflowDefinitionDao) {
this.nonSpringWorkflowsListing = nflowNonSpringWorkflowsListing;
this.workflowInstanceDao = workflowInstanceDao;
this.workflowDefinitionDao = workflowDefinitionDao;
}

/**
Expand Down Expand Up @@ -74,16 +78,24 @@ public List<WorkflowDefinition<? extends WorkflowState>> getWorkflowDefinitions(
}

/**
* Add workflow definitions from the nflowNonSpringWorkflowsListing resource.
* Add workflow definitions from the nflowNonSpringWorkflowsListing resource and persist
* all loaded workflow definitions.
* @throws IOException when workflow definitions can not be read from the resource.
* @throws ReflectiveOperationException when the workflow definition can not be instantiated.
*/
@PostConstruct
public void initNonSpringWorkflowDefinitions() throws IOException, ReflectiveOperationException {
public void postProcessWorkflowDefinitions() throws IOException, ReflectiveOperationException {
if (nonSpringWorkflowsListing == null) {
logger.info("No non-Spring workflow definitions");
return;
} else {
initNonSpringWorkflowDefinitions();
}
for (WorkflowDefinition<?> definition : workflowDefitions.values()) {
workflowDefinitionDao.storeWorkflowDefinition(definition);
}
}

private void initNonSpringWorkflowDefinitions() throws IOException, ReflectiveOperationException {
try (BufferedReader br = new BufferedReader(new InputStreamReader(nonSpringWorkflowsListing.getInputStream(), UTF_8))) {
String row;
while ((row = br.readLine()) != null) {
Expand Down
10 changes: 10 additions & 0 deletions nflow-engine/src/main/resources/scripts/db/h2.create.ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,13 @@ create table if not exists nflow_executor (
active timestamp,
expires timestamp
);

create table if not exists nflow_workflow_definition (
type varchar(64) not null,
definition text not null,
created timestamp not null default current_timestamp,
modified timestamp not null default current_timestamp,
modified_by int not null,
executor_group varchar(64) not null,
primary key (type, executor_group)
);
10 changes: 10 additions & 0 deletions nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,13 @@ create table if not exists nflow_executor (
active timestamp(3),
expires timestamp(3)
);

create table if not exists nflow_workflow_definition (
type varchar(64) not null,
definition text not null,
created timestamp(3) not null default current_timestamp(3),
modified timestamp(3) not null default current_timestamp(3) on update current_timestamp(3),
modified_by int not null,
executor_group varchar(64) not null,
primary key (type, executor_group)
);
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,18 @@ create table if not exists nflow_executor (
active timestamp,
expires timestamp
);

create table if not exists nflow_workflow_definition (
type varchar(64) not null,
definition text not null,
modified timestamp not null default current_timestamp on update current_timestamp,
modified_by int not null,
created timestamp not null,
executor_group varchar(64) not null,
primary key (type, executor_group)
);

drop trigger if exists nflow_workflow_definition_insert;

create trigger nflow_workflow_definition_insert before insert on `nflow_workflow_definition`
for each row set new.created = now();
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,16 @@ create table if not exists nflow_executor (
active timestamptz,
expires timestamptz
);

create table if not exists nflow_workflow_definition (
type varchar(64) not null,
definition text not null,
created timestamptz not null default current_timestamp,
modified timestamptz not null default current_timestamp,
modified_by int not null,
executor_group varchar(64) not null,
primary key (type, executor_group)
);

drop trigger if exists update_nflow_definition_modified on nflow_workflow_definition;
create trigger update_nflow_definition_modified before update on nflow_workflow_definition for each row execute procedure update_modified();
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.nitorcreations.nflow.engine.internal.dao;

import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_EMPTY;

import javax.sql.DataSource;

import org.springframework.context.annotation.Bean;
Expand All @@ -11,6 +13,9 @@
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.joda.JodaModule;
import com.nitorcreations.nflow.engine.internal.config.NFlow;
import com.nitorcreations.nflow.engine.internal.storage.db.H2DatabaseConfiguration;
import com.nitorcreations.nflow.engine.internal.storage.db.H2DatabaseConfiguration.H2SQLVariants;

Expand All @@ -25,6 +30,11 @@ public WorkflowInstanceDao workflowInstanceDao() {
return new WorkflowInstanceDao();
}

@Bean
public WorkflowDefinitionDao workflowDefinitionDao() {
return new WorkflowDefinitionDao();
}

@Bean
public ExecutorDao executorDao(Environment env) {
ExecutorDao dao = new ExecutorDao();
Expand All @@ -43,5 +53,13 @@ public PlatformTransactionManager transactionManager(DataSource ds) {
return new DataSourceTransactionManager(ds);
}

@Bean
@NFlow
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.setSerializationInclusion(NON_EMPTY);
mapper.registerModule(new JodaModule());
return mapper;
}
}

Loading

0 comments on commit 4c73c6d

Please sign in to comment.