Skip to content

Commit

Permalink
Add executor_id to table nflow_workflow_action.
Browse files Browse the repository at this point in the history
WorkflowInstanceDao fetches nflow_workflow.executor_id to WorkflowInstance and
nflow_workflow_action.executor_id to WorkflowInstanceAction.
  • Loading branch information
jsyrjala committed Sep 22, 2014
1 parent 0c0c0d2 commit fe72f82
Show file tree
Hide file tree
Showing 13 changed files with 60 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,15 @@ public void insertWorkflowInstanceAction(final WorkflowInstance instance, final
public PreparedStatement createPreparedStatement(Connection con)
throws SQLException {
PreparedStatement p = con.prepareStatement(
"insert into nflow_workflow_action(workflow_id, state, state_text, retry_no, execution_start, execution_end) values (?,?,?,?,?,?)",
"insert into nflow_workflow_action(workflow_id, executor_id, state, state_text, retry_no, execution_start, execution_end) values (?,?,?,?,?,?,?)",
new String[] { "id" });
p.setInt(1, action.workflowId);
p.setString(2, action.state);
p.setString(3, limitLength(action.stateText, STATE_TEXT_LENGTH));
p.setInt(4, action.retryNo);
p.setTimestamp(5, toTimestamp(action.executionStart));
p.setTimestamp(6, toTimestamp(action.executionEnd));
p.setInt(2, executorInfo.getExecutorId());
p.setString(3, action.state);
p.setString(4, limitLength(action.stateText, STATE_TEXT_LENGTH));
p.setInt(5, action.retryNo);
p.setTimestamp(6, toTimestamp(action.executionStart));
p.setTimestamp(7, toTimestamp(action.executionEnd));
return p;
}
}, keyHolder);
Expand Down Expand Up @@ -308,16 +309,18 @@ public PreparedStatement createPreparedStatement(Connection connection) throws S
static class WorkflowInstanceRowMapper implements RowMapper<WorkflowInstance> {
@Override
public WorkflowInstance mapRow(ResultSet rs, int rowNum) throws SQLException {
Integer executorId = (Integer) rs.getObject("executor_id");
return new WorkflowInstance.Builder()
.setId(rs.getInt("id"))
.setExecutorId(executorId)
.setType(rs.getString("type"))
.setBusinessKey(rs.getString("business_key"))
.setExternalId(rs.getString("external_id"))
.setState(rs.getString("state"))
.setStateText(rs.getString("state_text"))
.setActions(new ArrayList<WorkflowInstanceAction>())
.setNextActivation(toDateTime(rs.getTimestamp("next_activation")))
.setProcessing(rs.getObject("executor_id") != null)
.setProcessing(executorId != null)
.setRetries(rs.getInt("retries"))
.setCreated(toDateTime(rs.getTimestamp("created")))
.setModified(toDateTime(rs.getTimestamp("modified")))
Expand All @@ -332,6 +335,7 @@ static class WorkflowInstanceActionRowMapper implements RowMapper<WorkflowInstan
public WorkflowInstanceAction mapRow(ResultSet rs, int rowNum) throws SQLException {
return new WorkflowInstanceAction.Builder()
.setWorkflowId(rs.getInt("workflow_id"))
.setExecutorId(rs.getInt("executor_id"))
.setState(rs.getString("state"))
.setStateText(rs.getString("state_text"))
.setRetryNo(rs.getInt("retry_no"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public class WorkflowInstance {
*/
public final Integer id;

/**
* The id of executor that is currently processing this workflow. May be null.
*/
public final Integer executorId;

/**
* The type of the workflow definition.
*/
Expand Down Expand Up @@ -97,6 +102,7 @@ public class WorkflowInstance {

WorkflowInstance(Builder builder) {
this.id = builder.id;
this.executorId = builder.executorId;
this.type = builder.type;
this.businessKey = builder.businessKey;
this.externalId = builder.externalId;
Expand All @@ -120,6 +126,7 @@ public class WorkflowInstance {
public static class Builder {

Integer id;
Integer executorId;
String type;
String businessKey;
String externalId;
Expand Down Expand Up @@ -157,6 +164,7 @@ public Builder(ObjectStringMapper objectMapper) {
*/
public Builder(WorkflowInstance copy) {
this.id = copy.id;
this.executorId = copy.executorId;
this.type = copy.type;
this.businessKey = copy.businessKey;
this.externalId = copy.externalId;
Expand All @@ -182,6 +190,16 @@ public Builder setId(Integer id) {
return this;
}

/**
* Set the executor instance identifier.
* @param executorId The identifier.
* @return this.
*/
public Builder setExecutorId(Integer executorId) {
this.executorId = executorId;
return this;
}

/**
* Set the type of the workflow definition.
* @param type The type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ public class WorkflowInstanceAction {
*/
public final int workflowId;

/**
* The id for executor that processed this state.
*/
public final int executorId;

/**
* The workflow state before the execution.
*/
Expand Down Expand Up @@ -41,6 +46,7 @@ public class WorkflowInstanceAction {

WorkflowInstanceAction(Builder builder) {
this.workflowId = builder.workflowId;
this.executorId = builder.executorId;
this.state = builder.state;
this.stateText = builder.stateText;
this.retryNo = builder.retryNo;
Expand All @@ -54,6 +60,7 @@ public class WorkflowInstanceAction {
public static class Builder {

int workflowId;
int executorId;
String state;
String stateText;
int retryNo;
Expand Down Expand Up @@ -87,6 +94,15 @@ public Builder setWorkflowId(int workflowId) {
return this;
}

/**
* Set the executor id.
* @param executorId The executor id.
* @return this
*/
public Builder setExecutorId(Integer executorId) {
this.executorId = executorId;
return this;
}
/**
* Set the state.
* @param state The name of the state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ create index if not exists nflow_workflow_next_activation on nflow_workflow(next
create table if not exists nflow_workflow_action (
id int not null auto_increment primary key,
workflow_id int not null,
executor_id int not null default -1,
state varchar(64) not null,
state_text varchar(128),
retry_no int not null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ create table if not exists nflow_workflow (
create table if not exists nflow_workflow_action (
id int not null auto_increment primary key,
workflow_id int not null,
executor_id int not null default -1,
state varchar(64) not null,
state_text varchar(128),
retry_no int not null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ create trigger nflow_workflow_insert before insert on `nflow_workflow`
create table if not exists nflow_workflow_action (
id int not null auto_increment primary key,
workflow_id int not null,
executor_id int not null default -1,
state varchar(64) not null,
state_text varchar(128),
retry_no int not null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ create index nflow_workflow_activation on nflow_workflow(next_activation) where
create table if not exists nflow_workflow_action (
id serial primary key,
workflow_id int not null,
executor_id int not null default -1,
state varchar(64) not null,
state_text varchar(128),
retry_no int not null,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table nflow_workflow_state executor_id int not null default -1;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table nflow_workflow_state executor_id int not null default -1;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table nflow_workflow_state executor_id int not null default -1;
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void insertWorkflowInstanceActionWorks() {
i1.stateVariables.put("a", "1");
int id = dao.insertWorkflowInstance(i1);
WorkflowInstanceAction a1 = new WorkflowInstanceAction.Builder().setExecutionStart(DateTime.now()).
setExecutionEnd(DateTime.now().plusMillis(100)).setRetryNo(1).setState("test").setStateText("state text").
setExecutorId(42).setExecutionEnd(DateTime.now().plusMillis(100)).setRetryNo(1).setState("test").setStateText("state text").
setWorkflowId(id).build();
i1.stateVariables.put("b", "2");
dao.insertWorkflowInstanceAction(i1, a1);
Expand Down Expand Up @@ -164,6 +164,7 @@ public void wakesUpWorkflowInMatchingState() {

private static void checkSameWorkflowInfo(WorkflowInstance i1, WorkflowInstance i2) {
assertThat(i1.type, equalTo(i2.type));
assertThat(i1.executorId,equalTo(i2.executorId));
assertThat(i1.state, equalTo(i2.state));
assertThat(i1.stateText, equalTo(i2.stateText));
assertThat(i1.nextActivation, equalTo(i2.nextActivation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
@ApiModel(value = "State change attempt. A new instance for every retry attempt.")
@SuppressFBWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD", justification="jackson reads dto fields")
public class Action {

@ApiModelProperty(value = "Name of state")
public String state;
@ApiModelProperty(value = "Description of state")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
import org.junit.Test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.nitorcreations.nflow.rest.v1.msg.*;
import com.nitorcreations.nflow.rest.v1.msg.Action;
import com.nitorcreations.nflow.rest.v1.msg.CreateWorkflowInstanceRequest;
import com.nitorcreations.nflow.rest.v1.msg.CreateWorkflowInstanceResponse;
import com.nitorcreations.nflow.rest.v1.msg.ListWorkflowInstanceResponse;
import com.nitorcreations.nflow.rest.v1.msg.UpdateWorkflowInstanceRequest;
import com.nitorcreations.nflow.tests.demo.CreditApplicationWorkflow;
import com.nitorcreations.nflow.tests.runner.NflowServerRule;

Expand Down

0 comments on commit fe72f82

Please sign in to comment.