Skip to content

Commit

Permalink
Refactor for final release (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Dec 11, 2022
1 parent 7a894e4 commit e7b4e70
Show file tree
Hide file tree
Showing 67 changed files with 577 additions and 577 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Run the command `git submodule update --remote --merge` to update IDL to the lat
- [x] Timer command
- [x] Signal command
- [x] SearchAttributeRW
- [x] QueryAttributeRW
- [x] DataObjectRW
- [x] StateLocalAttribute
- [x] Signal workflow API
- [x] Query workflow API
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ signing {
}

group = "io.github.cadence-oss"
version = "1.0.0-rc4"
version = "1.0.0-rc5"

nexusPublishing {
repositories {
Expand Down
2 changes: 1 addition & 1 deletion iwf-idl
33 changes: 10 additions & 23 deletions src/main/java/io/github/cadenceoss/iwf/core/Client.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package io.github.cadenceoss.iwf.core;

import io.github.cadenceoss.iwf.core.attributes.SearchAttributeType;
import io.github.cadenceoss.iwf.core.persistence.SearchAttributeType;
import io.github.cadenceoss.iwf.gen.models.KeyValue;
import io.github.cadenceoss.iwf.gen.models.SearchAttribute;
import io.github.cadenceoss.iwf.gen.models.SearchAttributeKeyAndType;
import io.github.cadenceoss.iwf.gen.models.StateCompletionOutput;
import io.github.cadenceoss.iwf.gen.models.WorkflowGetQueryAttributesResponse;
import io.github.cadenceoss.iwf.gen.models.WorkflowGetDataObjectsResponse;
import io.github.cadenceoss.iwf.gen.models.WorkflowGetSearchAttributesResponse;
import io.github.cadenceoss.iwf.gen.models.WorkflowResetRequest;
import io.github.cadenceoss.iwf.gen.models.WorkflowSearchResponse;

import java.util.ArrayList;
Expand Down Expand Up @@ -120,30 +119,18 @@ public void SignalWorkflow(
}

/**
* @param workflowId required
* @param workflowRunId optional, default to current runId
* @param resetType required
* @param historyEventId required for resetType of HISTORY_EVENT_ID. The eventID of any event after DecisionTaskStarted you want to reset to (this event is exclusive in a new run. The new run history will fork and continue from the previous eventID of this). It can be DecisionTaskCompleted, DecisionTaskFailed or others
* @param reason reason to do the reset for tracking purpose
* @param resetBadBinaryChecksum required for resetType of BAD_BINARY. Binary checksum for resetType of BadBinary
* @param decisionOffset based on the reset point calculated by resetType, this offset will move/offset the point by decision. Currently only negative number is supported, and only works with LastDecisionCompleted
* @param earliestTime required for resetType of DECISION_COMPLETED_TIME. EarliestTime of decision start time, required for resetType of DecisionCompletedTime.Supported formats are '2006-01-02T15:04:05+07:00', raw UnixNano and time range
* @param skipSignalReapply
* @param workflowId
* @param workflowRunId
* @param resetWorkflowTypeAndOptions
* @return
*/
public String ResetWorkflow(
final String workflowId,
final String workflowRunId,
final WorkflowResetRequest.ResetTypeEnum resetType,
final int historyEventId,
final String reason,
final String resetBadBinaryChecksum,
final int decisionOffset,
final String earliestTime,
final boolean skipSignalReapply
final ResetWorkflowTypeAndOptions resetWorkflowTypeAndOptions
){

return untypedClient.ResetWorkflow(workflowId, workflowRunId, resetType, historyEventId, reason, resetBadBinaryChecksum, decisionOffset, earliestTime, skipSignalReapply);
return untypedClient.ResetWorkflow(workflowId, workflowRunId, resetWorkflowTypeAndOptions);
}

/**
Expand Down Expand Up @@ -205,13 +192,13 @@ private Map<String, Object> doGetWorkflowQueryAttributes(
}
}

WorkflowGetQueryAttributesResponse response = untypedClient.GetAnyWorkflowQueryAttributes(workflowId, workflowRunId, attributeKeys);
final WorkflowGetDataObjectsResponse response = untypedClient.GetAnyWorkflowDataObjects(workflowId, workflowRunId, attributeKeys);

if (response.getQueryAttributes() == null) {
if (response.getObjects() == null) {
throw new InternalServiceException("query attributes not returned");
}
Map<String, Object> result = new HashMap<>();
for (KeyValue keyValue : response.getQueryAttributes()) {
for (KeyValue keyValue : response.getObjects()) {
if (keyValue.getValue() != null) {
result.put(
keyValue.getKey(),
Expand Down
76 changes: 49 additions & 27 deletions src/main/java/io/github/cadenceoss/iwf/core/Registry.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package io.github.cadenceoss.iwf.core;

import io.github.cadenceoss.iwf.core.attributes.QueryAttributeDef;
import io.github.cadenceoss.iwf.core.attributes.SearchAttributeDef;
import io.github.cadenceoss.iwf.core.attributes.SearchAttributeType;
import io.github.cadenceoss.iwf.core.command.InterStateChannelDef;
import io.github.cadenceoss.iwf.core.command.SignalChannelDef;
import io.github.cadenceoss.iwf.core.communication.InterStateChannelDef;
import io.github.cadenceoss.iwf.core.communication.SignalChannelDef;
import io.github.cadenceoss.iwf.core.persistence.DataObjectFieldDef;
import io.github.cadenceoss.iwf.core.persistence.SearchAttributeFieldDef;
import io.github.cadenceoss.iwf.core.persistence.SearchAttributeType;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class Registry {
private final Map<String, Workflow> workflowStore = new HashMap<>();
Expand All @@ -16,7 +18,7 @@ public class Registry {
private final Map<String, Map<String, Class<?>>> signalTypeStore = new HashMap<>();

private final Map<String, Map<String, Class<?>>> interstateChannelTypeStore = new HashMap<>();
private final Map<String, Map<String, Class<?>>> queryAttributeTypeStore = new HashMap<>();
private final Map<String, Map<String, Class<?>>> dataObjectTypeStore = new HashMap<>();

private final Map<String, Map<String, SearchAttributeType>> searchAttributeTypeStore = new HashMap<>();

Expand All @@ -27,7 +29,7 @@ public void addWorkflow(final Workflow wf) {
registerWorkflowState(wf);
registerWorkflowSignal(wf);
registerWorkflowInterstateChannel(wf);
registerWorkflowQueryAttributes(wf);
registerWorkflowDataObjects(wf);
registerWorkflowSearchAttributes(wf);
}

Expand Down Expand Up @@ -64,12 +66,13 @@ private void registerWorkflowState(final Workflow wf) {

private void registerWorkflowSignal(final Workflow wf) {
String workflowType = getWorkflowType(wf);
if (wf.getSignalChannels() == null || wf.getSignalChannels().isEmpty()) {
final List<SignalChannelDef> channels = getSignalChannels(wf);
if (channels == null || channels.isEmpty()) {
signalTypeStore.put(workflowType, new HashMap<>());
return;
}

for (SignalChannelDef signalChannelDef : wf.getSignalChannels()) {
for (SignalChannelDef signalChannelDef : channels) {
Map<String, Class<?>> signalNameToTypeMap =
signalTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
if (signalNameToTypeMap.containsKey(signalChannelDef.getSignalChannelName())) {
Expand All @@ -82,12 +85,13 @@ private void registerWorkflowSignal(final Workflow wf) {

private void registerWorkflowInterstateChannel(final Workflow wf) {
String workflowType = getWorkflowType(wf);
if (wf.getInterStateChannels() == null || wf.getInterStateChannels().isEmpty()) {
final List<InterStateChannelDef> channels = getInterStateChannels(wf);
if (channels == null || channels.isEmpty()) {
interstateChannelTypeStore.put(workflowType, new HashMap<>());
return;
}

for (InterStateChannelDef interstateChannelDef : wf.getInterStateChannels()) {
for (InterStateChannelDef interstateChannelDef : channels) {
Map<String, Class<?>> nameToTypeMap =
interstateChannelTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
if (nameToTypeMap.containsKey(interstateChannelDef.getChannelName())) {
Expand All @@ -98,51 +102,69 @@ private void registerWorkflowInterstateChannel(final Workflow wf) {
}
}

private void registerWorkflowQueryAttributes(final Workflow wf) {
private void registerWorkflowDataObjects(final Workflow wf) {
String workflowType = getWorkflowType(wf);
if (wf.getQueryAttributes() == null || wf.getQueryAttributes().isEmpty()) {
queryAttributeTypeStore.put(workflowType, new HashMap<>());
final List<DataObjectFieldDef> fields = getDataObjectFields(wf);
if (fields == null || fields.isEmpty()) {
dataObjectTypeStore.put(workflowType, new HashMap<>());
return;
}

for (QueryAttributeDef queryAttributeDef : wf.getQueryAttributes()) {
for (DataObjectFieldDef dataObjectField : fields) {
Map<String, Class<?>> queryAttributeKeyToTypeMap =
queryAttributeTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
if (queryAttributeKeyToTypeMap.containsKey(queryAttributeDef.getQueryAttributeKey())) {
dataObjectTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
if (queryAttributeKeyToTypeMap.containsKey(dataObjectField.getKey())) {
throw new WorkflowDefinitionException(
String.format(
"Query attribute key %s already exists",
queryAttributeDef.getQueryAttributeKey())
dataObjectField.getDataObjectType())
);
}
queryAttributeKeyToTypeMap.put(
queryAttributeDef.getQueryAttributeKey(),
queryAttributeDef.getQueryAttributeType()
dataObjectField.getKey(),
dataObjectField.getDataObjectType()
);
}
}

private List<DataObjectFieldDef> getDataObjectFields(final Workflow wf) {
return wf.getPersistenceSchema().stream().filter((f) -> f instanceof DataObjectFieldDef).map(f -> (DataObjectFieldDef) f).collect(Collectors.toList());
}

private List<SearchAttributeFieldDef> getSearchAttributeFields(final Workflow wf) {
return wf.getPersistenceSchema().stream().filter((f) -> f instanceof SearchAttributeFieldDef).map(f -> (SearchAttributeFieldDef) f).collect(Collectors.toList());
}

private List<InterStateChannelDef> getInterStateChannels(final Workflow wf) {
return wf.getCommunicationSchema().stream().filter((f) -> f instanceof InterStateChannelDef).map(f -> (InterStateChannelDef) f).collect(Collectors.toList());
}

private List<SignalChannelDef> getSignalChannels(final Workflow wf) {
return wf.getCommunicationSchema().stream().filter((f) -> f instanceof SignalChannelDef).map(f -> (SignalChannelDef) f).collect(Collectors.toList());
}

private void registerWorkflowSearchAttributes(final Workflow wf) {
String workflowType = getWorkflowType(wf);
if (wf.getSearchAttributes() == null || wf.getSearchAttributes().isEmpty()) {
final List<SearchAttributeFieldDef> fields = getSearchAttributeFields(wf);
if (fields == null || fields.isEmpty()) {
searchAttributeTypeStore.put(workflowType, new HashMap<>());
return;
}

for (SearchAttributeDef searchAttributeDef : wf.getSearchAttributes()) {
for (SearchAttributeFieldDef searchAttributeField : fields) {
Map<String, SearchAttributeType> searchAttributeKeyToTypeMap =
searchAttributeTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());

if (searchAttributeKeyToTypeMap.containsKey(searchAttributeDef.getSearchAttributeKey())) {
if (searchAttributeKeyToTypeMap.containsKey(searchAttributeField.getKey())) {
throw new WorkflowDefinitionException(
String.format(
"Search attribute key %s already exists",
searchAttributeDef.getSearchAttributeKey())
searchAttributeField.getKey())
);
}
searchAttributeKeyToTypeMap.put(
searchAttributeDef.getSearchAttributeKey(),
searchAttributeDef.getSearchAttributeType()
searchAttributeField.getKey(),
searchAttributeField.getSearchAttributeType()
);
}
}
Expand All @@ -164,7 +186,7 @@ public Map<String, Class<?>> getInterStateChannelNameToTypeMap(final String work
}

public Map<String, Class<?>> getQueryAttributeKeyToTypeMap(final String workflowType) {
return queryAttributeTypeStore.get(workflowType);
return dataObjectTypeStore.get(workflowType);
}

public Map<String, SearchAttributeType> getSearchAttributeKeyToTypeMap(final String workflowType) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.github.cadenceoss.iwf.core;

import io.github.cadenceoss.iwf.gen.models.WorkflowResetRequest;
import org.immutables.value.Value;

import java.util.Optional;

@Value.Immutable
public abstract class ResetWorkflowTypeAndOptions {

public abstract WorkflowResetRequest.ResetTypeEnum getResetType();

public abstract Optional<Integer> getHistoryEventId();

public abstract String getReason();

public abstract Optional<String> getHistoryEventTime();

public abstract Optional<Boolean> getSkipSignalReapply();

public static ResetWorkflowTypeAndOptions resetToBeginning(final String reason) {
return builder()
.resetType(WorkflowResetRequest.ResetTypeEnum.BEGINNING)
.reason(reason)
.build();
}

public static ResetWorkflowTypeAndOptions resetToHistoryEventId(final int historyEventId, final String reason) {
return builder()
.resetType(WorkflowResetRequest.ResetTypeEnum.HISTORY_EVENT_ID)
.historyEventId(historyEventId)
.reason(reason)
.build();
}

public static ResetWorkflowTypeAndOptions resetToHistoryEventId(final String historyEventTime, final String reason) {
return builder()
.resetType(WorkflowResetRequest.ResetTypeEnum.HISTORY_EVENT_ID)
.historyEventTime(historyEventTime)
.reason(reason)
.build();
}

public static ImmutableResetWorkflowTypeAndOptions.Builder builder() {
return ImmutableResetWorkflowTypeAndOptions.builder();
}
}
6 changes: 3 additions & 3 deletions src/main/java/io/github/cadenceoss/iwf/core/StateOptions.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.github.cadenceoss.iwf.core;

import io.github.cadenceoss.iwf.core.attributes.AttributeLoadingPolicy;
import io.github.cadenceoss.iwf.core.command.CommandCarryOverPolicy;
import io.github.cadenceoss.iwf.core.command.CommandCarryOverType;
import io.github.cadenceoss.iwf.core.persistence.PersistenceLoadingPolicy;
import org.immutables.value.Value;

@Value.Immutable
Expand All @@ -19,11 +19,11 @@ public abstract class StateOptions {
* this decides whether to load all the query attributes into {@link WorkflowState#decide} and {@link WorkflowState#start} method
* default to true
*/
public abstract AttributeLoadingPolicy getQueryAttributesLoadingPolicy();
public abstract PersistenceLoadingPolicy getQueryAttributesLoadingPolicy();

/**
* this decides whether to load all the search attributes into {@link WorkflowState#decide} and {@link WorkflowState#start} method
* default to true
*/
public abstract AttributeLoadingPolicy getSearchAttributesLoadingPolicy();
public abstract PersistenceLoadingPolicy getSearchAttributesLoadingPolicy();
}
Loading

0 comments on commit e7b4e70

Please sign in to comment.