Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement state locals #53

Merged
merged 2 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iwf-idl
2 changes: 1 addition & 1 deletion src/main/java/io/github/cadenceoss/iwf/core/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

@Value.Immutable
public abstract class Context {
public abstract Integer getWorkflowStartTimestampSeconds();
public abstract Long getWorkflowStartTimestampSeconds();

public abstract String getStateExecutionId();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package io.github.cadenceoss.iwf.core;

import io.github.cadenceoss.iwf.gen.models.KeyValue;
import org.immutables.value.Value;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

@Value.Immutable
public abstract class StateDecision {

public abstract Optional<List<StateMovement>> getNextStates();

public abstract Optional<Boolean> getWaitForMoreCommandResults();

public abstract Optional<List<KeyValue>> getUpsertQueryAttributes();
public abstract List<StateMovement> getNextStates();

public static final StateDecision DEAD_END = ImmutableStateDecision.builder().build();

Expand Down
47 changes: 35 additions & 12 deletions src/main/java/io/github/cadenceoss/iwf/core/WorkerService.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.cadenceoss.iwf.core;

import io.github.cadenceoss.iwf.core.attributes.QueryAttributesRWImpl;
import io.github.cadenceoss.iwf.core.attributes.StateLocalImpl;
import io.github.cadenceoss.iwf.core.command.CommandRequest;
import io.github.cadenceoss.iwf.core.mapper.CommandRequestMapper;
import io.github.cadenceoss.iwf.core.mapper.CommandResultsMapper;
Expand Down Expand Up @@ -32,18 +33,27 @@ public WorkflowStateStartResponse handleWorkflowStateStart(final WorkflowStateSt
final Object input = objectEncoder.decode(stateInput, state.getWorkflowState().getInputType());
final QueryAttributesRWImpl queryAttributesRW =
createQueryAttributesRW(req.getWorkflowType(), req.getQueryAttributes());
final Context context = ImmutableContext.builder()
.workflowId(req.getContext().getWorkflowId())
.workflowRunId(req.getContext().getWorkflowRunId())
.workflowStartTimestampSeconds(req.getContext().getWorkflowStartedTimestamp())
.stateExecutionId(req.getContext().getStateExecutionId())
.build();
final StateLocalImpl stateLocals = new StateLocalImpl(toMap(null), objectEncoder);

CommandRequest commandRequest = state.getWorkflowState().start(
null,
context,
input,
null,
stateLocals,
null,
queryAttributesRW,
null);

return new WorkflowStateStartResponse()
.commandRequest(CommandRequestMapper.toGenerated(commandRequest))
.upsertQueryAttributes(queryAttributesRW.getUpsertQueryAttributes());
.upsertQueryAttributes(queryAttributesRW.getUpsertQueryAttributes())
.upsertStateLocalAttributes(stateLocals.getUpsertStateLocalAttributes())
.recordEvents(stateLocals.getRecordEvents());
}

public WorkflowStateDecideResponse handleWorkflowStateDecide(final WorkflowStateDecideRequest req) {
Expand All @@ -54,33 +64,46 @@ public WorkflowStateDecideResponse handleWorkflowStateDecide(final WorkflowState
final QueryAttributesRWImpl queryAttributesRW =
createQueryAttributesRW(req.getWorkflowType(), req.getQueryAttributes());

final Context context = ImmutableContext.builder()
.workflowId(req.getContext().getWorkflowId())
.workflowRunId(req.getContext().getWorkflowRunId())
.workflowStartTimestampSeconds(req.getContext().getWorkflowStartedTimestamp())
.stateExecutionId(req.getContext().getStateExecutionId())
.build();
final StateLocalImpl stateLocals = new StateLocalImpl(toMap(req.getStateLocalAttributes()), objectEncoder);

StateDecision stateDecision = state.getWorkflowState().decide(
null,
context,
input,
CommandResultsMapper.fromGenerated(
req.getCommandResults(),
registry.getSignalChannelNameToSignalTypeMap(req.getWorkflowType()),
objectEncoder),
null,
stateLocals,
null,
queryAttributesRW,
null);
List<KeyValue> queryAttributesToUpsert = queryAttributesRW.getUpsertQueryAttributes();
stateDecision = ImmutableStateDecision.copyOf(stateDecision).withUpsertQueryAttributes(queryAttributesToUpsert);
return new WorkflowStateDecideResponse()
.stateDecision(StateDecisionMapper.toGenerated(stateDecision));
.stateDecision(StateDecisionMapper.toGenerated(stateDecision))
.upsertQueryAttributes(queryAttributesRW.getUpsertQueryAttributes())
.upsertStateLocalAttributes(stateLocals.getUpsertStateLocalAttributes())
.recordEvents(stateLocals.getRecordEvents());
}

private QueryAttributesRWImpl createQueryAttributesRW(String workflowType, List<KeyValue> keyValues) {
Map<String, EncodedObject> map;
final Map<String, EncodedObject> map = toMap(keyValues);
return new QueryAttributesRWImpl(registry.getQueryAttributeKeyToTypeMap(workflowType), map, objectEncoder);
}

private Map<String, EncodedObject> toMap(final List<KeyValue> keyValues) {
final Map<String, EncodedObject> map;
if (keyValues == null || keyValues.isEmpty()) {
map = new HashMap<>();
} else {
map = keyValues.stream()
.filter(keyValue -> keyValue.getValue() != null)
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
}

return new QueryAttributesRWImpl(registry.getQueryAttributeKeyToTypeMap(workflowType), map, objectEncoder);
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.github.cadenceoss.iwf.core.attributes.AttributeLoadingPolicy;
import io.github.cadenceoss.iwf.core.attributes.QueryAttributesRW;
import io.github.cadenceoss.iwf.core.attributes.SearchAttributesRW;
import io.github.cadenceoss.iwf.core.attributes.StateLocalAttributesRW;
import io.github.cadenceoss.iwf.core.attributes.StateLocal;
import io.github.cadenceoss.iwf.core.command.CommandCarryOverPolicy;
import io.github.cadenceoss.iwf.core.command.CommandRequest;
import io.github.cadenceoss.iwf.core.command.CommandResults;
Expand Down Expand Up @@ -43,7 +43,7 @@ default StateOptions getStateOptions() {
*/
CommandRequest start(
final Context context, I input,
final StateLocalAttributesRW stateLocals,
final StateLocal stateLocals,
final SearchAttributesRW searchAttributes,
final QueryAttributesRW queryAttributes,
final InterStateChannel interStateChannel);
Expand All @@ -62,7 +62,7 @@ StateDecision decide(
final Context context,
final I input,
final CommandResults commandResults,
final StateLocalAttributesRW stateLocals,
final StateLocal stateLocals,
final SearchAttributesRW searchAttributes,
final QueryAttributesRW queryAttributes,
final InterStateChannel interStateChannel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
public class QueryAttributesRWImpl implements QueryAttributesRW{
private final Map<String, Class<?>> queryAttributeNameToTypeMap;
private final Map<String, EncodedObject> queryAttributeNameToEncodedObjectMap;
private final Map<String, EncodedObject> queryAttributesToUpsert;
private final Map<String, EncodedObject> upsertQueryAttributesToReturnToServer;
private final ObjectEncoder objectEncoder;

public QueryAttributesRWImpl(
Expand All @@ -21,7 +21,7 @@ public QueryAttributesRWImpl(
final ObjectEncoder objectEncoder) {
this.queryAttributeNameToTypeMap = queryAttributeNameToTypeMap;
this.queryAttributeNameToEncodedObjectMap = queryAttributeNameToValueMap;
this.queryAttributesToUpsert = new HashMap<>();
this.upsertQueryAttributesToReturnToServer = new HashMap<>();
this.objectEncoder = objectEncoder;
}

Expand Down Expand Up @@ -59,11 +59,11 @@ public void set(String key, Object value) {
}

this.queryAttributeNameToEncodedObjectMap.put(key, objectEncoder.encode(value));
this.queryAttributesToUpsert.put(key, objectEncoder.encode(value));
this.upsertQueryAttributesToReturnToServer.put(key, objectEncoder.encode(value));
}

public List<KeyValue> getUpsertQueryAttributes() {
return queryAttributesToUpsert.entrySet().stream()
return upsertQueryAttributesToReturnToServer.entrySet().stream()
.map(stringEncodedObjectEntry ->
new KeyValue()
.key(stringEncodedObjectEntry.getKey())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.github.cadenceoss.iwf.core.attributes;

public interface StateLocal {
/**
* set a local attribute. The scope of the attribute is only within the execution of this state.
* Usually it's for passing from State Start API to State Decide API
* User code must make sure using the same type for both get & set
*
* @param key
* @param value
*/
void setLocalAttribute(String key, Object value);

/**
* Retrieve a local state attribute
* User code must make sure using the same type for both get & set
*
* @param key
* @param type
* @param <T>
* @return
*/
<T> T getLocalAttribute(String key, Class<T> type);

/**
* Record an arbitrary event in State Start/Decide API for debugging/tracking purpose
*
* @param key the key of the event. Within a Start/Decide API, the same key cannot be used for more than once.
* @param eventData the data of the event.
*/
void recordEvent(String key, Object eventData);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.github.cadenceoss.iwf.core.attributes;

import io.github.cadenceoss.iwf.core.ObjectEncoder;
import io.github.cadenceoss.iwf.core.WorkflowDefinitionException;
import io.github.cadenceoss.iwf.gen.models.EncodedObject;
import io.github.cadenceoss.iwf.gen.models.KeyValue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class StateLocalImpl implements StateLocal {

private final Map<String, EncodedObject> recordEvents;
private final Map<String, EncodedObject> attributeNameToEncodedObjectMap;
private final Map<String, EncodedObject> upsertAttributesToReturnToServer;
private final ObjectEncoder objectEncoder;

public StateLocalImpl(final Map<String, EncodedObject> attributeNameToEncodedObjectMap,
final ObjectEncoder objectEncoder) {
this.objectEncoder = objectEncoder;
this.attributeNameToEncodedObjectMap = attributeNameToEncodedObjectMap;
upsertAttributesToReturnToServer = new HashMap<>();
recordEvents = new HashMap<>();
}

@Override
public void setLocalAttribute(final String key, final Object value) {
final EncodedObject encodedData = objectEncoder.encode(value);
attributeNameToEncodedObjectMap.put(key, encodedData);
upsertAttributesToReturnToServer.put(key, encodedData);
}

@Override
public <T> T getLocalAttribute(final String key, final Class<T> type) {
final EncodedObject encodedData = this.attributeNameToEncodedObjectMap.get(key);
if (encodedData == null) {
return null;
}
return objectEncoder.decode(encodedData, type);
}

@Override
public void recordEvent(final String key, final Object eventData) {
if (recordEvents.containsKey(key)) {
throw new WorkflowDefinitionException("cannot record the same event for more than once");
}
recordEvents.put(key, objectEncoder.encode(eventData));
}

public List<KeyValue> getUpsertStateLocalAttributes() {
return upsertAttributesToReturnToServer.entrySet().stream()
.map(stringEncodedObjectEntry ->
new KeyValue()
.key(stringEncodedObjectEntry.getKey())
.value(stringEncodedObjectEntry.getValue()))
.collect(Collectors.toList());
}

public List<KeyValue> getRecordEvents() {
return recordEvents.entrySet().stream()
.map(stringEncodedObjectEntry ->
new KeyValue()
.key(stringEncodedObjectEntry.getKey())
.value(stringEncodedObjectEntry.getValue()))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@

public class StateDecisionMapper {
public static StateDecision toGenerated(io.github.cadenceoss.iwf.core.StateDecision stateDecision) {
if (!stateDecision.getNextStates().isPresent()) {
if (stateDecision.getNextStates() == null) {
return null;
}
return new StateDecision()
.nextStates(stateDecision.getNextStates().get()
.nextStates(stateDecision.getNextStates()
.stream()
.map(StateMovementMapper::toGenerated)
.collect(Collectors.toList()))
.upsertQueryAttributes(stateDecision.getUpsertQueryAttributes().get());
.collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void testBasicQueryWorkflow() throws InterruptedException {
"query-start-query-decide", allQueryAttributes.get(BasicQueryWorkflow.ATTRIBUTE_KEY));
Assertions.assertEquals(
1, allQueryAttributes.size());
Assertions.assertEquals("test-value-2", output);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.github.cadenceoss.iwf.core.WorkflowState;
import io.github.cadenceoss.iwf.core.attributes.QueryAttributesRW;
import io.github.cadenceoss.iwf.core.attributes.SearchAttributesRW;
import io.github.cadenceoss.iwf.core.attributes.StateLocalAttributesRW;
import io.github.cadenceoss.iwf.core.attributes.StateLocal;
import io.github.cadenceoss.iwf.core.command.CommandRequest;
import io.github.cadenceoss.iwf.core.command.CommandResults;
import io.github.cadenceoss.iwf.core.command.InterStateChannel;
Expand All @@ -25,12 +25,12 @@ public Class<Integer> getInputType() {
}

@Override
public CommandRequest start(final Context context, final Integer input, final StateLocalAttributesRW stateLocals, final SearchAttributesRW searchAttributes, final QueryAttributesRW queryAttributes, final InterStateChannel interStateChannel) {
public CommandRequest start(final Context context, final Integer input, final StateLocal stateLocals, final SearchAttributesRW searchAttributes, final QueryAttributesRW queryAttributes, final InterStateChannel interStateChannel) {
return CommandRequest.empty;
}

@Override
public StateDecision decide(final Context context, final Integer input, final CommandResults commandResults, final StateLocalAttributesRW stateLocals, final SearchAttributesRW searchAttributes, final QueryAttributesRW queryAttributes, final InterStateChannel interStateChannel) {
public StateDecision decide(final Context context, final Integer input, final CommandResults commandResults, final StateLocal stateLocals, final SearchAttributesRW searchAttributes, final QueryAttributesRW queryAttributes, final InterStateChannel interStateChannel) {
final int output = input + 1;
return StateDecision.singleNextState(BasicWorkflowS2.StateId, output);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.github.cadenceoss.iwf.core.WorkflowState;
import io.github.cadenceoss.iwf.core.attributes.QueryAttributesRW;
import io.github.cadenceoss.iwf.core.attributes.SearchAttributesRW;
import io.github.cadenceoss.iwf.core.attributes.StateLocalAttributesRW;
import io.github.cadenceoss.iwf.core.attributes.StateLocal;
import io.github.cadenceoss.iwf.core.command.CommandRequest;
import io.github.cadenceoss.iwf.core.command.CommandResults;
import io.github.cadenceoss.iwf.core.command.InterStateChannel;
Expand All @@ -25,12 +25,12 @@ public Class<Integer> getInputType() {
}

@Override
public CommandRequest start(final Context context, final Integer input, final StateLocalAttributesRW stateLocals, final SearchAttributesRW searchAttributes, final QueryAttributesRW queryAttributes, final InterStateChannel interStateChannel) {
public CommandRequest start(final Context context, final Integer input, final StateLocal stateLocals, final SearchAttributesRW searchAttributes, final QueryAttributesRW queryAttributes, final InterStateChannel interStateChannel) {
return CommandRequest.empty;
}

@Override
public StateDecision decide(final Context context, final Integer input, final CommandResults commandResults, final StateLocalAttributesRW stateLocals, final SearchAttributesRW searchAttributes, final QueryAttributesRW queryAttributes, final InterStateChannel interStateChannel) {
public StateDecision decide(final Context context, final Integer input, final CommandResults commandResults, final StateLocal stateLocals, final SearchAttributesRW searchAttributes, final QueryAttributesRW queryAttributes, final InterStateChannel interStateChannel) {
final int output = input + 1;
return StateDecision.gracefulCompleteWorkflow(output);
}
Expand Down
Loading