Skip to content

Commit

Permalink
Refactor workflow and workflow action id from int to long
Browse files Browse the repository at this point in the history
  • Loading branch information
gmokki committed Nov 22, 2019
1 parent d063270 commit 8bd21ac
Show file tree
Hide file tree
Showing 25 changed files with 291 additions and 293 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void ensureValidArchiveTablesExist() {
tableMetadataChecker.ensureCopyingPossible("nflow_workflow_state", "nflow_archive_workflow_state");
}

public List<Integer> listArchivableWorkflows(DateTime before, int maxRows) {
public List<Long> listArchivableWorkflows(DateTime before, int maxRows) {
return jdbc.query(
"select w.id id from nflow_workflow w, " +
"(" + sqlVariants.limit(
Expand All @@ -58,7 +58,7 @@ public List<Integer> listArchivableWorkflows(DateTime before, int maxRows) {
}

@Transactional
public int archiveWorkflows(Collection<Integer> workflowIds) {
public int archiveWorkflows(Collection<Long> workflowIds) {
String workflowIdParams = params(workflowIds);

int archivedWorkflows = archiveWorkflowTable(workflowIdParams);
Expand Down Expand Up @@ -99,14 +99,14 @@ private String columnsFromMetadata(String tableName) {
return join(columnNames, ",");
}

private String params(Collection<Integer> workflowIds) {
private String params(Collection<Long> workflowIds) {
return "(" + join(workflowIds, ",") + ")";
}

static class ArchivableWorkflowsRowMapper implements RowMapper<Integer> {
static class ArchivableWorkflowsRowMapper implements RowMapper<Long> {
@Override
public Integer mapRow(ResultSet rs, int rowNum) throws SQLException {
return rs.getInt("id");
public Long mapRow(ResultSet rs, int rowNum) throws SQLException {
return rs.getLong("id");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public static Integer getInt(ResultSet rs, String columnLabel) throws SQLExcepti
return rs.wasNull() ? null : value;
}

public static Long getLong(ResultSet rs, String columnLabel) throws SQLException {
long value = rs.getLong(columnLabel);
return rs.wasNull() ? null : value;
}

public static final class ColumnNamesExtractor implements ResultSetExtractor<List<String>> {
static final ColumnNamesExtractor columnNamesExtractor = new ColumnNamesExtractor();

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.nflow.engine.internal.executor;

public class InstanceInfo {
public int id;
public long id;
public String state;
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,19 @@ private void shutdownPool() {
}
}

private void dispatch(List<Integer> nextInstanceIds) {
private void dispatch(List<Long> nextInstanceIds) {
if (nextInstanceIds.isEmpty()) {
logger.debug("Found no workflow instances, sleeping.");
sleep(false);
return;
}
logger.debug("Found {} workflow instances, dispatching executors.", nextInstanceIds.size());
for (Integer instanceId : nextInstanceIds) {
for (Long instanceId : nextInstanceIds) {
executor.execute(stateProcessorFactory.createProcessor(instanceId));
}
}

private List<Integer> getNextInstanceIds() {
private List<Long> getNextInstanceIds() {
int nextBatchSize = executor.getQueueRemainingCapacity();
logger.debug("Polling next {} workflow instances.", nextBatchSize);
return workflowInstances.pollNextWorkflowInstanceIds(nextBatchSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class WorkflowStateProcessor implements Runnable {
private static final PeriodicLogger threadStuckLogger = new PeriodicLogger(logger, 60);
private static final String MDC_KEY = "workflowInstanceId";

private final int instanceId;
private final long instanceId;
private final WorkflowDefinitionService workflowDefinitions;
private final WorkflowInstanceService workflowInstances;
private final WorkflowInstancePreProcessor workflowInstancePreProcessor;
Expand All @@ -75,14 +75,14 @@ class WorkflowStateProcessor implements Runnable {
private final int stateProcessingRetryDelay;
private final int stateSaveRetryDelay;
private boolean internalRetryEnabled = true;
private final Map<Integer, WorkflowStateProcessor> processingInstances;
private final Map<Long, WorkflowStateProcessor> processingInstances;
private long startTimeSeconds;
private Thread thread;

WorkflowStateProcessor(int instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowStateProcessor(long instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao,
WorkflowInstancePreProcessor workflowInstancePreProcessor, Environment env,
Map<Integer, WorkflowStateProcessor> processingInstances, WorkflowExecutorListener... executorListeners) {
Map<Long, WorkflowStateProcessor> processingInstances, WorkflowExecutorListener... executorListeners) {
this.instanceId = instanceId;
this.objectMapper = objectMapper;
this.workflowDefinitions = workflowDefinitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class WorkflowStateProcessorFactory {
private final Environment env;
@Autowired(required = false)
protected WorkflowExecutorListener[] listeners = new WorkflowExecutorListener[0];
final Map<Integer, WorkflowStateProcessor> processingInstances = new ConcurrentHashMap<>();
final Map<Long, WorkflowStateProcessor> processingInstances = new ConcurrentHashMap<>();
private final int stuckThreadThresholdSeconds;

@Inject
Expand All @@ -44,7 +44,7 @@ public WorkflowStateProcessorFactory(WorkflowDefinitionService workflowDefinitio
this.env = env;
}

public WorkflowStateProcessor createProcessor(int instanceId) {
public WorkflowStateProcessor createProcessor(long instanceId) {
return new WorkflowStateProcessor(instanceId, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao,
workflowInstancePreProcessor, env, processingInstances, listeners);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public String getCurrentStateName() {
}

@Override
public int getWorkflowInstanceId() {
public long getWorkflowInstanceId() {
return instance.id;
}

Expand Down Expand Up @@ -254,7 +254,7 @@ public void setSignal(Optional<Integer> signal, String reason) {
}

@Override
public Optional<Integer> getParentId() {
public Optional<Long> getParentId() {
return Optional.ofNullable(instance.parentWorkflowId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public int archiveWorkflows(DateTime olderThan, int batchSize) {
log.info("Archiving starting. Archiving passive workflows older than {}, in batches of {}.", olderThan, batchSize);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
List<Integer> workflowIds;
List<Long> workflowIds;
PeriodicLogger periodicLogger = new PeriodicLogger(log, 60);
int archivedWorkflowsTotal = 0;
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public WorkflowInstanceService(WorkflowInstanceDao workflowInstanceDao, Workflow
* @param maxActions Maximum number of actions to be loaded.
* @return The workflow instance, or null if not found.
*/
public WorkflowInstance getWorkflowInstance(int id, Set<WorkflowInstanceInclude> includes, Long maxActions) {
public WorkflowInstance getWorkflowInstance(long id, Set<WorkflowInstanceInclude> includes, Long maxActions) {
return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions);
}

Expand All @@ -63,10 +63,10 @@ public WorkflowInstance getWorkflowInstance(int id, Set<WorkflowInstanceInclude>
* @return The id of the inserted or existing workflow instance.
*/
@SuppressFBWarnings(value = "BC_UNCONFIRMED_CAST_OF_RETURN_VALUE", justification = "getInitialState().toString() has no cast")
public int insertWorkflowInstance(WorkflowInstance instance) {
public long insertWorkflowInstance(WorkflowInstance instance) {
Assert.notNull(workflowInstancePreProcessor, "workflowInstancePreProcessor can not be null");
WorkflowInstance processedInstance = workflowInstancePreProcessor.process(instance);
int id = workflowInstanceDao.insertWorkflowInstance(processedInstance);
long id = workflowInstanceDao.insertWorkflowInstance(processedInstance);
if (id == -1 && !isEmpty(instance.externalId)) {
QueryWorkflowInstances query = new QueryWorkflowInstances.Builder().addTypes(instance.type).setExternalId(instance.externalId).build();
id = workflowInstanceDao.queryWorkflowInstances(query).get(0).id;
Expand Down Expand Up @@ -131,7 +131,7 @@ public Collection<WorkflowInstance> listWorkflowInstances(QueryWorkflowInstances
* @param workflowInstanceId Workflow instance id.
* @return Current signal value.
*/
public Optional<Integer> getSignal(Integer workflowInstanceId) {
public Optional<Integer> getSignal(long workflowInstanceId) {
return workflowInstanceDao.getSignal(workflowInstanceId);
}

Expand All @@ -143,7 +143,7 @@ public Optional<Integer> getSignal(Integer workflowInstanceId) {
* @param actionType The type of workflow action that is stored to instance actions.
* @return True when signal was set, false otherwise.
*/
public boolean setSignal(Integer workflowInstanceId, Optional<Integer> signal, String reason, WorkflowActionType actionType) {
public boolean setSignal(long workflowInstanceId, Optional<Integer> signal, String reason, WorkflowActionType actionType) {
Assert.notNull(workflowDefinitionService, "workflowDefinitionService cannot be null");
signal.ifPresent(signalValue -> {
AbstractWorkflowDefinition<?> definition = getDefinition(workflowInstanceId);
Expand All @@ -154,7 +154,7 @@ public boolean setSignal(Integer workflowInstanceId, Optional<Integer> signal, S
return workflowInstanceDao.setSignal(workflowInstanceId, signal, reason, actionType);
}

private AbstractWorkflowDefinition<?> getDefinition(Integer workflowInstanceId) {
private AbstractWorkflowDefinition<?> getDefinition(Long workflowInstanceId) {
return workflowDefinitionService.getWorkflowDefinition(workflowInstanceDao.getWorkflowInstanceType(workflowInstanceId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface StateExecution {
*
* @return The workflow instance id.
*/
int getWorkflowInstanceId();
long getWorkflowInstanceId();

/**
* Return the business key associated to the workflow instance.
Expand Down Expand Up @@ -175,6 +175,6 @@ public interface StateExecution {
*
* @return The parent workflow instance id or empty.
*/
Optional<Integer> getParentId();
Optional<Long> getParentId();

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class QueryWorkflowInstances extends ModelObject {
/**
* Workflow instance identifiers.
*/
public final List<Integer> ids;
public final List<Long> ids;

/**
* Workflow instance definition type.
Expand All @@ -26,12 +26,12 @@ public class QueryWorkflowInstances extends ModelObject {
/**
* Parent workflow instance id.
*/
public Integer parentWorkflowId;
public Long parentWorkflowId;

/**
* Parent workflow action id.
*/
public Integer parentActionId;
public Long parentActionId;

/**
* Workflow instance states.
Expand Down Expand Up @@ -106,10 +106,10 @@ public class QueryWorkflowInstances extends ModelObject {
* Builder for workflow instance queries.
*/
public static class Builder {
List<Integer> ids = new ArrayList<>();
List<Long> ids = new ArrayList<>();
List<String> types = new ArrayList<>();
Integer parentWorkflowId;
Integer parentActionId;
Long parentWorkflowId;
Long parentActionId;
List<String> states = new ArrayList<>();
List<WorkflowInstanceStatus> statuses = new ArrayList<>();
String businessKey;
Expand Down Expand Up @@ -148,7 +148,7 @@ public Builder(QueryWorkflowInstances copy) {
* @param newIds The identifiers.
* @return this.
*/
public Builder addIds(Integer ... newIds) {
public Builder addIds(Long ... newIds) {
this.ids.addAll(asList(newIds));
return this;
}
Expand All @@ -168,7 +168,7 @@ public Builder addTypes(String ... newTypes) {
* @param parentWorkflowId The parent workflow instance id.
* @return this.
*/
public Builder setParentWorkflowId(Integer parentWorkflowId) {
public Builder setParentWorkflowId(Long parentWorkflowId) {
this.parentWorkflowId = parentWorkflowId;
return this;
}
Expand All @@ -178,7 +178,7 @@ public Builder setParentWorkflowId(Integer parentWorkflowId) {
* @param parentActionId The parent action id.
* @return this.
*/
public Builder setParentActionId(Integer parentActionId) {
public Builder setParentActionId(Long parentActionId) {
this.parentActionId = parentActionId;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public enum WorkflowInstanceStatus {
/**
* The workflow instance identifier.
*/
public final Integer id;
public final Long id;

/**
* The id of executor that is currently processing this workflow. May be null.
Expand All @@ -53,17 +53,17 @@ public enum WorkflowInstanceStatus {
* The id of the workflow that created the hierarchy of workflow where this sub workflow belongs to.
* Null for workflows that are the root of hierarchy.
*/
public final Integer rootWorkflowId;
public final Long rootWorkflowId;

/**
* The id of the workflow that created this sub workflow. Is null for root workflows.
*/
public final Integer parentWorkflowId;
public final Long parentWorkflowId;

/**
* The id of the workflow action that created this sub workflow. Is null for root workflows.
*/
public final Integer parentActionId;
public final Long parentActionId;

/**
* The current status of the workflow instance.
Expand Down Expand Up @@ -149,7 +149,7 @@ public enum WorkflowInstanceStatus {
/**
* Child workflow instance IDs created by this workflow instance, grouped by instance action ID.
*/
public Map<Integer, List<Integer>> childWorkflows;
public Map<Long, List<Long>> childWorkflows;

ObjectStringMapper mapper;

Expand Down Expand Up @@ -228,11 +228,11 @@ public String getStateVariable(String name, String defaultValue) {
*/
public static class Builder {

Integer id;
Long id;
Integer executorId;
Integer rootWorkflowId;
Integer parentWorkflowId;
Integer parentActionId;
Long rootWorkflowId;
Long parentWorkflowId;
Long parentActionId;
WorkflowInstanceStatus status;
String type;
String businessKey;
Expand All @@ -243,7 +243,7 @@ public static class Builder {
final Map<String, String> originalStateVariables = new LinkedHashMap<>();
final Map<String, String> stateVariables = new LinkedHashMap<>();
List<WorkflowInstanceAction> actions = new ArrayList<>();
final Map<Integer, List<Integer>> childWorkflows = new LinkedHashMap<>();
final Map<Long, List<Long>> childWorkflows = new LinkedHashMap<>();
int retries;
DateTime created;
DateTime started;
Expand Down Expand Up @@ -301,7 +301,7 @@ public Builder(WorkflowInstance copy) {
* @param id The identifier.
* @return this.
*/
public Builder setId(Integer id) {
public Builder setId(Long id) {
this.id = id;
return this;
}
Expand All @@ -321,7 +321,7 @@ public Builder setExecutorId(Integer executorId) {
* @param rootWorkflowId The identifier.
* @return this
*/
public Builder setRootWorkflowId(Integer rootWorkflowId) {
public Builder setRootWorkflowId(Long rootWorkflowId) {
this.rootWorkflowId = rootWorkflowId;
return this;
}
Expand All @@ -331,7 +331,7 @@ public Builder setRootWorkflowId(Integer rootWorkflowId) {
* @param parentWorkflowId The identifier.
* @return this.
*/
public Builder setParentWorkflowId(Integer parentWorkflowId) {
public Builder setParentWorkflowId(Long parentWorkflowId) {
this.parentWorkflowId = parentWorkflowId;
return this;
}
Expand All @@ -341,7 +341,7 @@ public Builder setParentWorkflowId(Integer parentWorkflowId) {
* @param parentActionId The identifier.
* @return this.
*/
public Builder setParentActionId(Integer parentActionId) {
public Builder setParentActionId(Long parentActionId) {
this.parentActionId = parentActionId;
return this;
}
Expand Down
Loading

0 comments on commit 8bd21ac

Please sign in to comment.