Skip to content

Commit

Permalink
Experimental saga participant proxy concept, misc refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
cer committed Feb 2, 2018
1 parent 3eaa0fc commit 82d42ff
Show file tree
Hide file tree
Showing 18 changed files with 312 additions and 18 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -3,3 +3,4 @@ build/
*.idea/
*.iml
*.log
out
@@ -0,0 +1,32 @@
package io.eventuate.tram.sagas.orchestration;

import io.eventuate.tram.commands.common.Command;
import io.eventuate.tram.commands.producer.CommandProducer;
import io.eventuate.tram.sagas.common.SagaCommandHeaders;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.HashMap;
import java.util.Map;

public class SagaCommandProducer {

@Autowired
private CommandProducer commandProducer;

public SagaCommandProducer() {
}

public SagaCommandProducer(CommandProducer commandProducer) {
this.commandProducer = commandProducer;
}



public void sendCommand(String sagaType, String sagaId, String destinationChannel, String resource, String requestId, Command command, String replyTo) {
Map<String, String> headers = new HashMap<>();
headers.put(SagaCommandHeaders.SAGA_TYPE, sagaType);
headers.put(SagaCommandHeaders.SAGA_ID, sagaId);
headers.put(SagaCommandHeaders.SAGA_REQUEST_ID, requestId);
commandProducer.send(destinationChannel, resource, command, replyTo, headers);
}
}
Expand Up @@ -5,11 +5,11 @@

public class SagaDataSerde {

static <Data> SerializedSagaData serializeSagaData(Data sagaData) {
public static <Data> SerializedSagaData serializeSagaData(Data sagaData) {
return new SerializedSagaData(sagaData.getClass().getName(), JSonMapper.toJson(sagaData));
}

static <Data> Data deserializeSagaData(SerializedSagaData serializedSagaData) {
public static <Data> Data deserializeSagaData(SerializedSagaData serializedSagaData) {
Class<?> clasz = null;
try {
clasz = SagaDataSerde.class.getClassLoader().loadClass(serializedSagaData.getSagaDataType());
Expand Down
Expand Up @@ -72,6 +72,45 @@ public SagaManagerImpl(Saga<Data> saga) {
private SagaLockManager sagaLockManager;


public void setSagaCommandProducer(SagaCommandProducer sagaCommandProducer) {
this.sagaCommandProducer = sagaCommandProducer;
}

@Autowired
private SagaCommandProducer sagaCommandProducer;

public void setSagaInstanceRepository(SagaInstanceRepository sagaInstanceRepository) {
this.sagaInstanceRepository = sagaInstanceRepository;
}

public void setCommandProducer(CommandProducer commandProducer) {
this.commandProducer = commandProducer;
}

public void setMessageConsumer(MessageConsumer messageConsumer) {
this.messageConsumer = messageConsumer;
}

public void setIdGenerator(IdGenerator idGenerator) {
this.idGenerator = idGenerator;
}

public void setAggregateInstanceSubscriptionsDAO(AggregateInstanceSubscriptionsDAO aggregateInstanceSubscriptionsDAO) {
this.aggregateInstanceSubscriptionsDAO = aggregateInstanceSubscriptionsDAO;
}

public void setChannelMapping(ChannelMapping channelMapping) {
this.channelMapping = channelMapping;
}

public void setSagaLockManager(SagaLockManager sagaLockManager) {
this.sagaLockManager = sagaLockManager;
}

public void setDomainEventPublisher(DomainEventPublisher domainEventPublisher) {
this.domainEventPublisher = domainEventPublisher;
}

@Override
public SagaInstance create(Data sagaData) {
return create(sagaData, Optional.empty());
Expand Down Expand Up @@ -190,11 +229,7 @@ private String sendCommands(String sagaId, List<CommandWithDestination> commands

for (CommandWithDestination command : commands) {
lastRequestId = idGenerator.genId().asString();
Map<String, String> headers = new HashMap<>();
headers.put(SagaCommandHeaders.SAGA_TYPE, getSagaType());
headers.put(SagaCommandHeaders.SAGA_ID, sagaId);
headers.put(SagaCommandHeaders.SAGA_REQUEST_ID, lastRequestId);
commandProducer.send(command.getDestinationChannel(), command.getResource(), command.getCommand(), makeSagaReplyChannel(), headers);
sagaCommandProducer.sendCommand(getSagaType(), sagaId, command.getDestinationChannel(), command.getResource(), lastRequestId, command.getCommand(), makeSagaReplyChannel());
}

return lastRequestId;
Expand Down
@@ -1,5 +1,6 @@
package io.eventuate.tram.sagas.orchestration;

import io.eventuate.tram.commands.producer.CommandProducer;
import io.eventuate.tram.commands.producer.TramCommandProducerConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -19,4 +20,8 @@ public SagaInstanceRepository sagaInstanceRepository() {
return new SagaInstanceRepositoryJdbc();
}

@Bean
public SagaCommandProducer sagaCommandProducer(CommandProducer commandProducer) {
return new SagaCommandProducer(commandProducer);
}
}
@@ -1,7 +1,7 @@
CREATE SCHEMA IF NOT EXISTS eventuate AUTHORIZATION SA;
SET SCHEMA eventuate;

CREATE TABLE aggregate_instance_subscriptions(
CREATE TABLE IF NOT EXISTS aggregate_instance_subscriptions(
aggregate_type VARCHAR(200) DEFAULT NULL,
aggregate_id VARCHAR(1000) NOT NULL,
event_type VARCHAR(200) NOT NULL,
Expand All @@ -10,7 +10,7 @@ CREATE TABLE aggregate_instance_subscriptions(
PRIMARY KEY(aggregate_id, event_type, saga_id, saga_type)
);

CREATE TABLE saga_instance(
CREATE TABLE IF NOT EXISTS saga_instance(
saga_type VARCHAR(100) NOT NULL,
saga_id VARCHAR(100) NOT NULL,
state_name VARCHAR(100) NOT NULL,
Expand All @@ -21,7 +21,7 @@ CREATE TABLE saga_instance(
);


CREATE TABLE saga_instance_participants (
CREATE TABLE IF NOT EXISTS saga_instance_participants (
saga_type VARCHAR(100) NOT NULL,
saga_id VARCHAR(100) NOT NULL,
destination VARCHAR(100) NOT NULL,
Expand All @@ -30,13 +30,13 @@ CREATE TABLE saga_instance_participants (
);


create table saga_lock_table(
create table IF NOT EXISTS saga_lock_table(
target VARCHAR(100) PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
saga_Id VARCHAR(100) NOT NULL
);

create table saga_stash_table(
create table IF NOT EXISTS saga_stash_table(
message_id VARCHAR(100) PRIMARY KEY,
target VARCHAR(100) NOT NULL,
saga_type VARCHAR(100) NOT NULL,
Expand Down
@@ -0,0 +1,22 @@
package io.eventuate.tram.sagas.simpledsl;

import io.eventuate.tram.commands.common.Command;

import java.util.Set;

public class CommandEndpoint<C extends Command> {

private String commandChannel;
private Class<C> commandClass;
private Set<Class> replyClasses;

public CommandEndpoint(String commandChannel, Class<C> commandClass, Set<Class> replyClasses) {
this.commandChannel = commandChannel;
this.commandClass = commandClass;
this.replyClasses = replyClasses;
}

public String getCommandChannel() {
return commandChannel;
}
}
@@ -0,0 +1,36 @@
package io.eventuate.tram.sagas.simpledsl;

import io.eventuate.tram.commands.common.Command;

import java.util.HashSet;
import java.util.Set;

public class CommandEndpointBuilder<C extends Command> {

private String channel;
private Class<C> commandClass;
private Set<Class> replyClasses = new HashSet<>();

public CommandEndpointBuilder(Class<C> commandClass) {
this.commandClass = commandClass;
}

public static <C extends Command> CommandEndpointBuilder<C> forCommand(Class<C> commandClass) {
return new CommandEndpointBuilder<>(commandClass);
}

public CommandEndpointBuilder<C> withChannel(String channel) {
this.channel = channel;
return this;
}


public <T> CommandEndpointBuilder<C> withReply(Class<T> replyClass) {
this.replyClasses.add(replyClass);
return this;
}

public CommandEndpoint<C> build() {
return new CommandEndpoint<>(channel, commandClass, replyClasses);
}
}
@@ -1,5 +1,6 @@
package io.eventuate.tram.sagas.simpledsl;

import io.eventuate.tram.commands.common.Command;
import io.eventuate.tram.commands.consumer.CommandWithDestination;
import io.eventuate.tram.sagas.orchestration.SagaDefinition;

Expand All @@ -25,12 +26,21 @@ public InvokeParticipantStepBuilder<Data> withAction(Function<Data, CommandWithD
this.action = Optional.of(new ParticipantInvocationImpl<>(action));
return this;
}
public <C extends Command> InvokeParticipantStepBuilder<Data> withAction(CommandEndpoint<C> commandEndpoint, Function<Data, C> commandProvider) {
this.action = Optional.of(new ParticipantEndpointInvocationImpl<>(commandEndpoint, commandProvider));
return this;
}

public InvokeParticipantStepBuilder<Data> withCompensation(Function<Data, CommandWithDestination> compensation) {
this.compensation = Optional.of(new ParticipantInvocationImpl<>(compensation));
return this;
}

public <C extends Command> InvokeParticipantStepBuilder<Data> withCompensation(CommandEndpoint<C> commandEndpoint, Function<Data, C> commandProvider) {
this.compensation = Optional.of(new ParticipantEndpointInvocationImpl<>(commandEndpoint, commandProvider));
return this;
}

public <T> InvokeParticipantStepBuilder<Data> onReply(Class<T> replyClass, BiConsumer<Data, T> replyHandler) {
if (compensation.isPresent())
compensationReplyHandlers.put(replyClass.getName(), (data, rawReply) -> replyHandler.accept(data, (T) rawReply));
Expand Down
@@ -0,0 +1,31 @@
package io.eventuate.tram.sagas.simpledsl;

import io.eventuate.tram.commands.common.Command;
import io.eventuate.tram.commands.common.CommandReplyOutcome;
import io.eventuate.tram.commands.common.ReplyMessageHeaders;
import io.eventuate.tram.commands.consumer.CommandWithDestination;
import io.eventuate.tram.messaging.common.Message;

import java.util.function.Function;

public class ParticipantEndpointInvocationImpl<Data, C extends Command> implements ParticipantInvocation<Data> {


private final CommandEndpoint<C> commandEndpoint;
private final Function<Data, C> commandProvider;

public ParticipantEndpointInvocationImpl(CommandEndpoint<C> commandEndpoint, Function<Data, C> commandProvider) {
this.commandEndpoint = commandEndpoint;
this.commandProvider = commandProvider;
}

@Override
public boolean isSuccessfulReply(Message message) {
return CommandReplyOutcome.SUCCESS.name().equals(message.getRequiredHeader(ReplyMessageHeaders.REPLY_OUTCOME));
}

@Override
public CommandWithDestination makeCommandToSend(Data data) {
return new CommandWithDestination(commandEndpoint.getCommandChannel(), null, commandProvider.apply(data));
}
}
Expand Up @@ -15,7 +15,7 @@ public StepBuilder(SimpleSagaDefinitionBuilder<Data> builder) {
}

public LocalStepBuilder<Data> invokeLocal(Consumer<Data> localFunction) {
return new LocalStepBuilder<Data>(parent, localFunction);
return new LocalStepBuilder<>(parent, localFunction);
}


Expand All @@ -27,4 +27,13 @@ public <C extends Command> InvokeParticipantStepBuilder<Data> invokeParticipant(
public <C extends Command> InvokeParticipantStepBuilder<Data> withCompensation(Function<Data, CommandWithDestination> compensation) {
return new InvokeParticipantStepBuilder<>(parent).withCompensation(compensation);
}

public <C extends Command> InvokeParticipantStepBuilder<Data> invokeParticipant(CommandEndpoint<C> commandEndpoint, Function<Data, C> commandProvider) {
return new InvokeParticipantStepBuilder<>(parent).withAction(commandEndpoint, commandProvider);
}


public <C extends Command> InvokeParticipantStepBuilder<Data> withCompensation(CommandEndpoint<C> commandEndpoint, Function<Data, C> commandProvider) {
return new InvokeParticipantStepBuilder<>(parent).withCompensation(commandEndpoint, commandProvider);
}
}
4 changes: 2 additions & 2 deletions gradle.properties
Expand Up @@ -6,5 +6,5 @@ eventuateMavenRepoUrl=https://dl.bintray.com/eventuateio-oss/eventuate-maven-rel

springBootVersion=1.4.5.RELEASE
eventuateClientVersion=0.20.1.RELEASE
eventuateTramVersion=0.4.0.RELEASE
version=0.1.0-SNAPSHOT
eventuateTramVersion=0.5.0.RELEASE
version=0.5.0-SNAPSHOT
2 changes: 1 addition & 1 deletion mysql/Dockerfile
@@ -1,2 +1,2 @@
FROM eventuateio/eventuate-tram-mysql:0.2.0
FROM eventuateio/eventuate-tram-mysql:0.5.0.RELEASE
COPY tram-saga-schema.sql /docker-entrypoint-initdb.d
Expand Up @@ -5,8 +5,8 @@
import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.participants.ReserveCreditCommand;
import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.service.RejectOrderCommand;
import io.eventuate.tram.commands.consumer.CommandWithDestination;
import io.eventuate.tram.sagas.simpledsl.SimpleSaga;
import io.eventuate.tram.sagas.orchestration.SagaDefinition;
import io.eventuate.tram.sagas.simpledsl.SimpleSaga;

import static io.eventuate.tram.commands.consumer.CommandWithDestinationBuilder.send;

Expand All @@ -29,6 +29,7 @@ public SagaDefinition<CreateOrderSagaData> getSagaDefinition() {


private CommandWithDestination reserveCredit(CreateOrderSagaData data) {

long orderId = data.getOrderId();
Long customerId = data.getOrderDetails().getCustomerId();
Money orderTotal = data.getOrderDetails().getOrderTotal();
Expand Down
@@ -0,0 +1,54 @@
package io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.createorder;

import io.eventuate.examples.tram.sagas.ordersandcustomers.commondomain.Money;
import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.participants.ApproveOrderCommand;
import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.participants.ReserveCreditCommand;
import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.participants.proxy.CustomerServiceProxy;
import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.participants.proxy.OrderServiceProxy;
import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.service.RejectOrderCommand;
import io.eventuate.tram.sagas.orchestration.SagaDefinition;
import io.eventuate.tram.sagas.simpledsl.SimpleSaga;

public class CreateOrderSagaV2 implements SimpleSaga<CreateOrderSagaData> {

private CustomerServiceProxy customerService;
private OrderServiceProxy orderService;

public CreateOrderSagaV2(CustomerServiceProxy customerService, OrderServiceProxy orderService) {
this.customerService = customerService;
this.orderService = orderService;
}

private SagaDefinition<CreateOrderSagaData> sagaDefinition =
step()
.withCompensation(orderService.reject, this::makeRejectCommand)
.step()
.invokeParticipant(customerService.reserveCredit, this::makeReserveCreditCommand)
.step()
.invokeParticipant(orderService.approve, this::makeApproveCommand)
.build();


@Override
public SagaDefinition<CreateOrderSagaData> getSagaDefinition() {
return this.sagaDefinition;
}


private ReserveCreditCommand makeReserveCreditCommand(CreateOrderSagaData data) {
long orderId = data.getOrderId();
Long customerId = data.getOrderDetails().getCustomerId();
Money orderTotal = data.getOrderDetails().getOrderTotal();
return new ReserveCreditCommand(customerId, orderId, orderTotal);
}

private RejectOrderCommand makeRejectCommand(CreateOrderSagaData data) {
return new RejectOrderCommand(data.getOrderId());
}

private ApproveOrderCommand makeApproveCommand(CreateOrderSagaData data) {
return new ApproveOrderCommand(data.getOrderId());
}


}

0 comments on commit 82d42ff

Please sign in to comment.