Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rpc interface without persistence #219

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/main/java/io/iworkflow/core/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
final ClientOptions clientOptions;

/**
* return a full featured client. If you don't have the workflow Registry, you should use {@link UnregisteredClient} instead
* return a full-featured client. If you don't have the workflow Registry, you should use {@link UnregisteredClient} instead
*
* @param registry registry is required so that this client can perform some validation checks (workflow types, channel names)
* @param clientOptions is for configuring the client
Expand Down Expand Up @@ -686,6 +686,19 @@
return rpcStubMethod.execute(null, input, null, null);
}

/**
* invoking the RPC through RPC stub
*
* @param rpcStubMethod the RPC method from stub created by {@link #newRpcStub(Class, String, String)}
* @param input the input of the RPC method
* @param <I> the input type
* @param <O> the output type
* @return output
*/
public <I, O> O invokeRPC(RpcDefinitions.RpcFunc1NoPersistence<I, O> rpcStubMethod, I input) {
return rpcStubMethod.execute(null, input, null);
}

/**
* invoking the RPC through RPC stub
*
Expand All @@ -697,6 +710,17 @@
return rpcStubMethod.execute(null, null, null);
}

/**
* invoking the RPC through RPC stub
*
* @param rpcStubMethod the RPC method from stub created by {@link #newRpcStub(Class, String, String)}
* @param <O> the output type

Check warning on line 717 in src/main/java/io/iworkflow/core/Client.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/iworkflow/core/Client.java#L717

Added line #L717 was not covered by tests
* @return output
*/
public <O> O invokeRPC(RpcDefinitions.RpcFunc0NoPersistence<O> rpcStubMethod) {
return rpcStubMethod.execute(null, null);
}

/**
* invoking the RPC through RPC stub
*
Expand All @@ -708,6 +732,17 @@
rpcStubMethod.execute(null, input, null, null);
}

/**
* invoking the RPC through RPC stub
*
* @param rpcStubMethod the RPC method from stub created by {@link #newRpcStub(Class, String, String)}
* @param input the input of the RPC method

Check warning on line 739 in src/main/java/io/iworkflow/core/Client.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/iworkflow/core/Client.java#L739

Added line #L739 was not covered by tests
* @param <I> the input type
*/
public <I> void invokeRPC(RpcDefinitions.RpcProc1NoPersistence<I> rpcStubMethod, I input) {
rpcStubMethod.execute(null, input, null);
}

/**
* invoking the RPC through RPC stub
*
Expand All @@ -717,6 +752,15 @@
rpcStubMethod.execute(null, null, null);
}

/**
* invoking the RPC through RPC stub
*
* @param rpcStubMethod the RPC method from stub created by {@link #newRpcStub(Class, String, String)}
*/
public void invokeRPC(RpcDefinitions.RpcProc0NoPersistence rpcStubMethod) {
rpcStubMethod.execute(null, null);
}

Check warning on line 762 in src/main/java/io/iworkflow/core/Client.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/iworkflow/core/Client.java#L761-L762

Added lines #L761 - L762 were not covered by tests

/**
* Get specified search attributes (by attributeKeys) of a workflow
*
Expand Down
88 changes: 62 additions & 26 deletions src/main/java/io/iworkflow/core/RpcDefinitions.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package io.iworkflow.core;

import com.google.common.collect.ImmutableMap;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.persistence.Persistence;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Map;

public final class RpcDefinitions {
private RpcDefinitions() {
}

/**
* RPC with input and output
*
* RPC definition
* with: input, output, persistence, communication
* without: NA
* @param <I> input type
* @param <O> output type
*/
Expand All @@ -22,8 +25,21 @@
}

/**
* RPC with output only
*
* RPC definition
* with: input, output, communication
* without: persistence
* @param <I> input type
* @param <O> output type
*/
@FunctionalInterface
public interface RpcFunc1NoPersistence<I, O> extends Serializable {
O execute(Context context, I input, Communication communication);
}

/**
* RPC definition
* with: output, persistence, communication
* without: input
* @param <O> output type
*/
@FunctionalInterface
Expand All @@ -32,8 +48,20 @@
}

/**
* RPC with input only
*
* RPC definition
* with: output, communication
* without: input, persistence
* @param <O> output type
*/
@FunctionalInterface
public interface RpcFunc0NoPersistence<O> extends Serializable {
O execute(Context context, Communication communication);
}

/**
* RPC definition
* with: input, persistence, communication
* without: output
* @param <I> input type
*/
@FunctionalInterface
Expand All @@ -42,34 +70,42 @@
}

/**
* RPC without input or output
* RPC definition
* with: input, communication
* without: output, persistence
* @param <I> input type
*/
@FunctionalInterface
public interface RpcProc1NoPersistence<I> extends Serializable {
void execute(Context context, I input, Communication communication);
}

/**
* RPC definition
* with: persistence, communication
* without: input, output
*/
@FunctionalInterface
public interface RpcProc0 extends Serializable {
void execute(Context context, Persistence persistence, Communication communication);
}

public static final int PARAMETERS_WITH_INPUT = 4;
public static final int PARAMETERS_NO_INPUT = 3;
/**
* RPC definition
* with: communication
* without: input, output, persistence
*/
@FunctionalInterface
public interface RpcProc0NoPersistence extends Serializable {
void execute(Context context, Communication communication);
}

public static final int INDEX_OF_INPUT_PARAMETER = 1;
public static final String ERROR_MESSAGE = "An RPC method must be in the form of one of {@link RpcDefinitions}";

public static void validateRpcMethod(final Method method) {
longquanzheng marked this conversation as resolved.
Show resolved Hide resolved
final Class<?>[] paramTypes = method.getParameterTypes();
final Class<?> persistenceType, communicationType, contextType;
if (paramTypes.length == PARAMETERS_NO_INPUT) {
contextType = paramTypes[0];
persistenceType = paramTypes[1];
communicationType = paramTypes[2];
} else if (paramTypes.length == PARAMETERS_WITH_INPUT) {
contextType = paramTypes[0];
persistenceType = paramTypes[2];
communicationType = paramTypes[3];
} else {
throw new WorkflowDefinitionException("An RPC method must be in the form of one of {@link RpcDefinitions}");
}
if (!persistenceType.equals(Persistence.class) || !communicationType.equals(Communication.class)) {
throw new WorkflowDefinitionException("An RPC method must be in the form of one of {@link RpcDefinitions}");
RpcMethodMetadata methodMetadata = RpcMethodMatcher.match(method);
if (methodMetadata == null) {
throw new WorkflowDefinitionException(ERROR_MESSAGE);

Check warning on line 108 in src/main/java/io/iworkflow/core/RpcDefinitions.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/iworkflow/core/RpcDefinitions.java#L108

Added line #L108 was not covered by tests
}
}
}
}
63 changes: 43 additions & 20 deletions src/main/java/io/iworkflow/core/RpcInvocationHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.iworkflow.core.persistence.PersistenceOptions;
import io.iworkflow.gen.models.PersistenceLoadingPolicy;
import io.iworkflow.gen.models.PersistenceLoadingType;
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.Origin;
Expand All @@ -11,9 +12,7 @@
import java.util.Arrays;
import java.util.List;

import static io.iworkflow.core.RpcDefinitions.INDEX_OF_INPUT_PARAMETER;
import static io.iworkflow.core.RpcDefinitions.PARAMETERS_WITH_INPUT;
import static io.iworkflow.core.RpcDefinitions.validateRpcMethod;
import static io.iworkflow.core.RpcDefinitions.*;

public class RpcInvocationHandler {

Expand Down Expand Up @@ -41,30 +40,54 @@
if (rpcAnno == null) {
throw new WorkflowDefinitionException("An RPC method must be annotated by RPC annotation");
}
validateRpcMethod(method);
Object input = null;
if (method.getParameterTypes().length == PARAMETERS_WITH_INPUT) {
input = allArguments[INDEX_OF_INPUT_PARAMETER];

RpcMethodMetadata metadata = RpcMethodMatcher.match(method);
if (metadata == null) {
throw new WorkflowDefinitionException("An RPC method must be annotated by RPC annotation");

Check warning on line 46 in src/main/java/io/iworkflow/core/RpcInvocationHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/iworkflow/core/RpcInvocationHandler.java#L46

Added line #L46 was not covered by tests
}
Object input = metadata.hasInput() ? allArguments[metadata.getInputIndex()] : null;

final Class<?> outputType = method.getReturnType();

boolean useMemo = schemaOptions.getEnableCaching();
if (rpcAnno.bypassCachingForStrongConsistency()) {
useMemo = false;
}
final Object output = unregisteredClient.invokeRpc(outputType, input, workflowId, workflowRunId, method.getName(), rpcAnno.timeoutSeconds(),
new PersistenceLoadingPolicy()
.persistenceLoadingType(rpcAnno.dataAttributesLoadingType())
.partialLoadingKeys(Arrays.asList(rpcAnno.dataAttributesPartialLoadingKeys()))
.lockingKeys(Arrays.asList(rpcAnno.dataAttributesLockingKeys())),
new PersistenceLoadingPolicy()
.persistenceLoadingType(rpcAnno.searchAttributesLoadingType())
.lockingKeys(Arrays.asList(rpcAnno.searchAttributesLockingKeys()))
.partialLoadingKeys(Arrays.asList(rpcAnno.searchAttributesPartialLoadingKeys())),
useMemo,
searchAttributeKeyAndTypes
);
return output;

if (metadata.usesPersistence()) {
return unregisteredClient.invokeRpc(
outputType,
input,
workflowId,
workflowRunId,
method.getName(),
rpcAnno.timeoutSeconds(),
new PersistenceLoadingPolicy()
.persistenceLoadingType(rpcAnno.dataAttributesLoadingType())
.partialLoadingKeys(Arrays.asList(rpcAnno.dataAttributesPartialLoadingKeys()))
.lockingKeys(Arrays.asList(rpcAnno.dataAttributesLockingKeys())),
new PersistenceLoadingPolicy()
.persistenceLoadingType(rpcAnno.searchAttributesLoadingType())
.lockingKeys(Arrays.asList(rpcAnno.searchAttributesLockingKeys()))
.partialLoadingKeys(Arrays.asList(rpcAnno.searchAttributesPartialLoadingKeys())),
useMemo,
searchAttributeKeyAndTypes
);
} else {
return unregisteredClient.invokeRpc(
outputType,
input,
workflowId,
workflowRunId,
method.getName(),
rpcAnno.timeoutSeconds(),
new PersistenceLoadingPolicy()
.persistenceLoadingType(PersistenceLoadingType.NONE),
new PersistenceLoadingPolicy()
.persistenceLoadingType(PersistenceLoadingType.NONE),
useMemo,
null);
}

}
}