Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,13 @@
<project.encoding>UTF-8</project.encoding>
<caffeine.version>3.1.8</caffeine.version>
<junit.version>4.13.2</junit.version>
<lombok.version>1.18.26</lombok.version>
<mysql-connector.version>8.0.33</mysql-connector.version>
<okhttp.version>4.10.0</okhttp.version>
<protobuf.version>3.22.2</protobuf.version>
<testcontainers.version>1.17.6</testcontainers.version>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
77 changes: 60 additions & 17 deletions src/main/java/io/eigr/spawn/api/InvocationOpts.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,43 @@
package io.eigr.spawn.api;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Optional;

@Builder
@Getter
@AllArgsConstructor
@NoArgsConstructor
public class InvocationOpts {

@Builder.Default
private boolean async = false;
private final boolean async;
private final Duration timeoutSeconds;
private final Optional<Long> delaySeconds;
private final Optional<LocalDateTime> scheduledTo;

private InvocationOpts(InvocationOptsBuilder invocationOptsBuilder) {
this.async = invocationOptsBuilder.async;
this.timeoutSeconds = invocationOptsBuilder.timeoutSeconds;
this.delaySeconds = invocationOptsBuilder.delaySeconds;
this.scheduledTo = invocationOptsBuilder.scheduledTo;
}

public static InvocationOptsBuilder builder() {
return new InvocationOptsBuilder();
}

public boolean isAsync() {
return async;
}

@Builder.Default
private Duration timeoutSeconds = Duration.ofSeconds(10);
public Duration getTimeoutSeconds() {
return timeoutSeconds;
}

@Builder.Default
private Optional<Long> delaySeconds = Optional.empty();
public Optional<Long> getDelaySeconds() {
return delaySeconds;
}

@Builder.Default
private Optional<LocalDateTime> scheduledTo = Optional.empty();
public Optional<LocalDateTime> getScheduledTo() {
return scheduledTo;
}

public long getScheduleTimeInLong() {
if (scheduledTo.isPresent()) {
Expand All @@ -40,4 +51,36 @@ public long getScheduleTimeInLong() {
public long getTimeout() {
return this.timeoutSeconds.toMillis();
}

public static final class InvocationOptsBuilder {

private boolean async = false;
private Duration timeoutSeconds = Duration.ofSeconds(10);
private Optional<Long> delaySeconds = Optional.empty();
private Optional<LocalDateTime> scheduledTo = Optional.empty();

public InvocationOpts build() {
return new InvocationOpts(this);
}

public InvocationOptsBuilder async(boolean async) {
this.async = async;
return this;
}

public InvocationOptsBuilder timeoutSeconds(Duration timeoutSeconds) {
this.timeoutSeconds = timeoutSeconds;
return this;
}

public InvocationOptsBuilder delaySeconds(Optional<Long> delaySeconds) {
this.delaySeconds = delaySeconds;
return this;
}

public InvocationOptsBuilder scheduledTo(Optional<LocalDateTime> scheduledTo) {
this.scheduledTo = scheduledTo;
return this;
}
}
}
92 changes: 75 additions & 17 deletions src/main/java/io/eigr/spawn/api/TransportOpts.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,83 @@
package io.eigr.spawn.api;

import lombok.*;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class TransportOpts {

@Builder.Default
private String host = "127.0.0.1";
@Builder.Default
private int port = 8091;
@Builder.Default
private String proxyHost = "127.0.0.1";
@Builder.Default
private int proxyPort = 9001;
@Builder.Default
private Executor executor = Executors.newCachedThreadPool();
private String host;
private int port;
private String proxyHost;
private int proxyPort;
private Executor executor;

private TransportOpts(TransportOptsBuilder builder) {
this.host = builder.host;
this.port = builder.port;
this.proxyHost = builder.proxyHost;
this.proxyPort = builder.proxyPort;
this.executor = builder.executor;
}

public static TransportOptsBuilder builder() {
return new TransportOptsBuilder();
}

public String getHost() {
return host;
}

public int getPort() {
return port;
}

public String getProxyHost() {
return proxyHost;
}

public int getProxyPort() {
return proxyPort;
}

public Executor getExecutor() {
return executor;
}

public static final class TransportOptsBuilder {

private String host = "127.0.0.1";
private int port = 8091;
private String proxyHost = "127.0.0.1";
private int proxyPort = 9001;
private Executor executor = Executors.newCachedThreadPool();

public TransportOpts build() {
return new TransportOpts(this);
}

public TransportOptsBuilder host(String host) {
this.host = host;
return this;
}

public TransportOptsBuilder port(int port) {
this.port = port;
return this;
}

public TransportOptsBuilder proxyHost(String proxyHost) {
this.proxyHost = proxyHost;
return this;
}

public TransportOptsBuilder proxyPort(int proxyPort) {
this.proxyPort = proxyPort;
return this;
}

public TransportOptsBuilder executor(Executor executor) {
this.executor = executor;
return this;
}
}
}
46 changes: 22 additions & 24 deletions src/main/java/io/eigr/spawn/api/actors/Value.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,16 @@
import java.util.Objects;
import java.util.Optional;

public final class Value<S extends GeneratedMessageV3, R extends GeneratedMessageV3> {

private S state;
private R response;
public final class Value {

private Object state;
private Object response;
private boolean checkpoint;
private Optional<Broadcast<?>> broadcast;
private Optional<Forward> forward;
private Optional<Pipe> pipe;
private Optional<List<SideEffect>> effects;

private ResponseType type;
private Optional<List<SideEffect<?>>> effects;
private final ResponseType type;

private Value() {
this.state = null;
Expand All @@ -36,13 +34,13 @@ private Value() {
}

private Value(
R response,
S state,
Object response,
Object state,
boolean checkpoint,
Optional<Broadcast<?>> broadcast,
Optional<Forward> forward,
Optional<Pipe> pipe,
Optional<List<SideEffect>> effects,
Optional<List<SideEffect<?>>> effects,
ResponseType type) {
this.response = response;
this.state = state;
Expand All @@ -54,16 +52,16 @@ private Value(
this.type = type;
}

public static <S, V> Value at() {
public static Value at() {
return new Value();
}

public R getResponse() {
return response;
public <R extends GeneratedMessageV3> R getResponse() {
return (R) response;
}

public S getState() {
return state;
public <S extends GeneratedMessageV3> S getState() {
return (S) state;
}

public boolean getCheckpoint() {
Expand All @@ -82,31 +80,31 @@ public Optional<Pipe> getPipe() {
return pipe;
}

public Optional<List<SideEffect>> getEffects() {
public Optional<List<SideEffect<?>>> getEffects() {
return effects;
}

public ResponseType getType() {
return type;
}

public Value response(R value) {
public <R extends GeneratedMessageV3> Value response(R value) {
this.response = value;
return this;
}

public Value state(S state) {
public <S extends GeneratedMessageV3> Value state(S state) {
this.state = state;
return this;
}

public Value state(S state, boolean checkpoint) {
public <S extends GeneratedMessageV3> Value state(S state, boolean checkpoint) {
this.state = state;
this.checkpoint = checkpoint;
return this;
}

public Value flow(Broadcast broadcast) {
public Value flow(Broadcast<?> broadcast) {
this.broadcast = Optional.of(broadcast);
return this;
}
Expand All @@ -121,8 +119,8 @@ public Value flow(Pipe pipe) {
return this;
}

public Value flow(SideEffect effect) {
List<SideEffect> ef;
public Value flow(SideEffect<?> effect) {
List<SideEffect<?>> ef;
if (this.effects.isPresent()) {
ef = this.effects.get();
ef.add(effect);
Expand All @@ -135,7 +133,7 @@ public Value flow(SideEffect effect) {
return this;
}

public Value flow(List<SideEffect> effects) {
public Value flow(List<SideEffect<?>> effects) {
this.effects = Optional.of(effects);
return this;
}
Expand Down Expand Up @@ -171,7 +169,7 @@ public String toString() {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Value<?, ?> value = (Value<?, ?>) o;
Value value = (Value) o;
return Objects.equals(state, value.state) &&
Objects.equals(response, value.response) &&
Objects.equals(checkpoint, value.checkpoint) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
import io.eigr.spawn.api.actors.Value;
import io.eigr.spawn.api.actors.ActorContext;
import io.eigr.spawn.api.actors.ActorFactory;
import io.eigr.spawn.api.actors.workflows.Broadcast;
import io.eigr.spawn.api.actors.workflows.Forward;
import io.eigr.spawn.api.actors.workflows.Pipe;
import io.eigr.spawn.api.actors.workflows.SideEffect;
import io.eigr.spawn.api.exceptions.ActorInvocationException;
import io.eigr.spawn.internal.Entity;
Expand Down Expand Up @@ -209,30 +206,15 @@ private Optional<Entity> getEntityByActor(String actor, String parent) {
private Protocol.Workflow buildWorkflow(Value valueResponse) {
Protocol.Workflow.Builder workflowBuilder = Protocol.Workflow.newBuilder();

if (valueResponse.getBroadcast().isPresent()) {
Protocol.Broadcast b = ((Broadcast) valueResponse.getBroadcast().get()).build();
workflowBuilder.setBroadcast(b);
}

if (valueResponse.getForward().isPresent()) {
Protocol.Forward f = ((Forward) valueResponse.getForward().get()).build();
workflowBuilder.setForward(f);
}

if (valueResponse.getPipe().isPresent()) {
Protocol.Pipe p = ((Pipe) valueResponse.getPipe().get()).build();
workflowBuilder.setPipe(p);
}

if (valueResponse.getEffects().isPresent()) {
List<SideEffect> efs = ((List<SideEffect>) valueResponse.getEffects().get());
workflowBuilder.addAllEffects(getProtocolEffects(efs));
}
valueResponse.getBroadcast().ifPresent(b -> workflowBuilder.setBroadcast(b.build()));
valueResponse.getForward().ifPresent(f -> workflowBuilder.setForward(f.build()));
valueResponse.getPipe().ifPresent(p -> workflowBuilder.setPipe(p.build()));
valueResponse.getEffects().ifPresent(e -> workflowBuilder.addAllEffects(getProtocolEffects(e)));

return workflowBuilder.build();
}

private List<Protocol.SideEffect> getProtocolEffects(List<SideEffect> effects) {
private List<Protocol.SideEffect> getProtocolEffects(List<SideEffect<?>> effects) {
return effects.stream()
.map(SideEffect::build)
.collect(Collectors.toList());
Expand Down