Skip to content

Commit

Permalink
JAV-328 renamed classes according to the paper
Browse files Browse the repository at this point in the history
  • Loading branch information
seanyinx committed Sep 14, 2017
1 parent 0e0ae94 commit a34500a
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 87 deletions.
Expand Up @@ -34,7 +34,7 @@
import io.servicecomb.saga.core.SagaTask;
import io.servicecomb.saga.core.ToJsonFormat;
import io.servicecomb.saga.core.application.interpreter.FromJsonFormat;
import io.servicecomb.saga.core.application.interpreter.JsonRequestInterpreter;
import io.servicecomb.saga.core.application.interpreter.GraphBuilder;
import io.servicecomb.saga.core.dag.GraphCycleDetectorImpl;
import io.servicecomb.saga.infrastructure.EmbeddedEventStore;
import java.util.HashMap;
Expand All @@ -49,16 +49,16 @@
import kamon.annotation.Segment;

@EnableKamon
public class SagaCoordinator {
public class SagaExecutionComponent {

private final PersistentStore persistentStore;
private final JsonRequestInterpreter requestInterpreter;
private final FromJsonFormat fromJsonFormat;
private final ToJsonFormat toJsonFormat;
private final Executor executorService;
private final FallbackPolicy fallbackPolicy;
private final GraphBuilder graphBuilder;

public SagaCoordinator(
public SagaExecutionComponent(
PersistentStore persistentStore,
FromJsonFormat fromJsonFormat,
ToJsonFormat toJsonFormat) {
Expand All @@ -70,21 +70,21 @@ public SagaCoordinator(
Executors.newFixedThreadPool(5));
}

public SagaCoordinator(
public SagaExecutionComponent(
int retryDelay,
PersistentStore persistentStore,
FromJsonFormat fromJsonFormat,
ToJsonFormat toJsonFormat,
ExecutorService executorService) {
this.fallbackPolicy = new FallbackPolicy(retryDelay);
this.persistentStore = persistentStore;
this.requestInterpreter = new JsonRequestInterpreter(new GraphCycleDetectorImpl<>());
this.graphBuilder = new GraphBuilder(new GraphCycleDetectorImpl<>());
this.fromJsonFormat = fromJsonFormat;
this.toJsonFormat = toJsonFormat;
this.executorService = executorService;
}

@Segment(name = "runSagaCoordinator", category = "application", library = "kamon")
@Segment(name = "runSagaExecutionComponent", category = "application", library = "kamon")
public void run(String requestJson) {
String sagaId = UUID.randomUUID().toString();
EventStore sagaLog = new EmbeddedEventStore();
Expand All @@ -96,7 +96,7 @@ public void run(String requestJson) {
executorService,
definition.policy(),
sagaTasks(sagaId, requestJson, sagaLog),
requestInterpreter.interpret(definition.requests()));
graphBuilder.build(definition.requests()));
saga.run();
}

Expand All @@ -116,7 +116,7 @@ public void reanimate() {
executorService,
definition.policy(),
sagaTasks(event.sagaId, requestJson, eventStore),
requestInterpreter.interpret(definition.requests()));
graphBuilder.build(definition.requests()));

saga.play();
saga.run();
Expand Down
Expand Up @@ -19,21 +19,31 @@
import io.servicecomb.saga.core.NoOpSagaRequest;
import io.servicecomb.saga.core.SagaException;
import io.servicecomb.saga.core.SagaRequest;
import io.servicecomb.saga.core.dag.GraphCycleDetector;
import io.servicecomb.saga.core.dag.Node;
import io.servicecomb.saga.core.dag.SingleLeafDirectedAcyclicGraph;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import kamon.annotation.EnableKamon;
import kamon.annotation.Segment;

@EnableKamon
class GraphBuilder {
public class GraphBuilder {

private final GraphCycleDetector<SagaRequest> detector;

public GraphBuilder(GraphCycleDetector<SagaRequest> detector) {
this.detector = detector;
}

@Segment(name = "buildGraph", category = "application", library = "kamon")
SingleLeafDirectedAcyclicGraph<SagaRequest> build(SagaRequest[] sagaRequests) {
public SingleLeafDirectedAcyclicGraph<SagaRequest> build(SagaRequest[] sagaRequests) {
Map<String, Node<SagaRequest>> requestNodes = requestsToNodes(sagaRequests);

return linkNodesToGraph(sagaRequests, requestNodes);
SingleLeafDirectedAcyclicGraph<SagaRequest> graph = linkNodesToGraph(sagaRequests, requestNodes);
detectCycle(graph);
return graph;
}

private SingleLeafDirectedAcyclicGraph<SagaRequest> linkNodesToGraph(
Expand Down Expand Up @@ -88,4 +98,12 @@ private Map<String, Node<SagaRequest>> requestsToNodes(SagaRequest[] sagaRequest
}
return requestMap;
}

private void detectCycle(SingleLeafDirectedAcyclicGraph<SagaRequest> graph) {
Set<Node<SagaRequest>> jointNodes = detector.cycleJoints(graph);

if (!jointNodes.isEmpty()) {
throw new SagaException("Cycle detected in the request graph at nodes " + jointNodes);
}
}
}

This file was deleted.

Expand Up @@ -29,7 +29,7 @@
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;

import io.servicecomb.saga.core.application.SagaCoordinator;
import io.servicecomb.saga.core.application.SagaExecutionComponent;
import io.servicecomb.saga.core.application.interpreter.FromJsonFormat;
import io.servicecomb.saga.infrastructure.EmbeddedEventStore;
import java.io.IOException;
Expand All @@ -44,7 +44,7 @@
import org.mockito.Mockito;

@SuppressWarnings("unchecked")
public class SagaCoordinatorTest {
public class SagaExecutionComponentTest {

private static final String requestJson = "[\n"
+ " {\n"
Expand Down Expand Up @@ -129,7 +129,7 @@ public SagaRequest[] requests() {
private final FromJsonFormat fromJsonFormat = Mockito.mock(FromJsonFormat.class);
private final EmbeddedPersistentStore eventStore = new EmbeddedPersistentStore();

private final SagaCoordinator coordinator = new SagaCoordinator(
private final SagaExecutionComponent coordinator = new SagaExecutionComponent(
eventStore,
fromJsonFormat,
null
Expand Down
Expand Up @@ -50,7 +50,7 @@
import org.mockito.Mockito;

@SuppressWarnings("unchecked")
public class JsonRequestInterpreterTest {
public class GraphBuilderTest {

private final SagaRequest request1 = new SagaRequestImpl(
"request-aaa",
Expand Down Expand Up @@ -89,16 +89,16 @@ public class JsonRequestInterpreterTest {
private final SagaRequest[] duplicateRequests = {duplicateRequest, duplicateRequest};

private final GraphCycleDetector<SagaRequest> detector = Mockito.mock(GraphCycleDetector.class);
private final JsonRequestInterpreter interpreter = new JsonRequestInterpreter(detector);
private final GraphBuilder graphBuilder = new GraphBuilder(detector);

@Before
public void setUp() throws Exception {
when(detector.cycleJoints(any())).thenReturn(emptySet());
}

@Test
public void interpretsParallelRequests() {
SingleLeafDirectedAcyclicGraph<SagaRequest> tasks = interpreter.interpret(requests);
public void buildsGraphOfParallelRequests() {
SingleLeafDirectedAcyclicGraph<SagaRequest> tasks = graphBuilder.build(requests);

Traveller<SagaRequest> traveller = new ByLevelTraveller<>(tasks, new FromRootTraversalDirection<>());
Collection<Node<SagaRequest>> nodes = traveller.nodes();
Expand All @@ -122,7 +122,7 @@ public void interpretsParallelRequests() {
@Test
public void blowsUpWhenJsonContainsDuplicateRequestId() {
try {
interpreter.interpret(duplicateRequests);
graphBuilder.build(duplicateRequests);
fail(SagaException.class.getSimpleName() + " is expected, but none thrown");
} catch (SagaException e) {
assertThat(e.getMessage(),
Expand All @@ -136,7 +136,7 @@ public void blowsUpWhenGraphContainsCycle() {
when(detector.cycleJoints(any())).thenReturn(singleton(new Node<>(0L, null)));

try {
interpreter.interpret(requests);
graphBuilder.build(requests);
expectFailing(SagaException.class);
} catch (SagaException e) {
assertThat(e.getMessage(), startsWith("Cycle detected in the request graph at nodes "));
Expand Down
Expand Up @@ -24,7 +24,7 @@

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import io.servicecomb.provider.rest.common.RestSchema;
import io.servicecomb.saga.core.application.SagaCoordinator;
import io.servicecomb.saga.core.application.SagaExecutionComponent;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -43,19 +43,19 @@
@RestSchema(schemaId = "saga-endpoint")
public class SagaController {

private final SagaCoordinator sagaCoordinator;
private final SagaExecutionComponent sagaExecutionComponent;
private final SagaEventRepo repo;

@Autowired
public SagaController(SagaCoordinator sagaCoordinator, SagaEventRepo repo) {
this.sagaCoordinator = sagaCoordinator;
public SagaController(SagaExecutionComponent sagaExecutionComponent, SagaEventRepo repo) {
this.sagaExecutionComponent = sagaExecutionComponent;
this.repo = repo;
}

@Trace("processRequests")
@RequestMapping(value = "requests", method = POST, consumes = TEXT_PLAIN_VALUE, produces = TEXT_PLAIN_VALUE)
public ResponseEntity<String> processRequests(@RequestBody String request) {
sagaCoordinator.run(request);
sagaExecutionComponent.run(request);
return ResponseEntity.ok("success");
}

Expand Down
Expand Up @@ -16,7 +16,7 @@

package io.servicecomb.saga.spring;

import io.servicecomb.saga.core.application.SagaCoordinator;
import io.servicecomb.saga.core.application.SagaExecutionComponent;
import java.lang.invoke.MethodHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,7 +29,7 @@ class SagaRecoveryListener implements ApplicationListener<ApplicationReadyEvent>
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
log.info("Recovering pending sagas from saga log");
applicationReadyEvent.getApplicationContext().getBean(SagaCoordinator.class).reanimate();
applicationReadyEvent.getApplicationContext().getBean(SagaExecutionComponent.class).reanimate();
log.info("Recovered pending sagas from saga log successfully");
}
}
Expand Up @@ -19,7 +19,7 @@
import io.servicecomb.saga.core.JacksonToJsonFormat;
import io.servicecomb.saga.core.PersistentStore;
import io.servicecomb.saga.core.ToJsonFormat;
import io.servicecomb.saga.core.application.SagaCoordinator;
import io.servicecomb.saga.core.application.SagaExecutionComponent;
import io.servicecomb.saga.core.application.interpreter.FromJsonFormat;
import io.servicecomb.saga.format.JacksonFromJsonFormat;
import io.servicecomb.saga.format.JacksonSagaEventFormat;
Expand Down Expand Up @@ -62,14 +62,14 @@ PersistentStore persistentStore(SagaEventRepo repo, ToJsonFormat toJsonFormat, S
}

@Bean
SagaCoordinator sagaCoordinator(
SagaExecutionComponent sagaExecutionComponent(
@Value("${saga.thread.count:5}") int numberOfThreads,
@Value("${saga.retry.delay:3000}") int retryDelay,
PersistentStore persistentStore,
ToJsonFormat format,
FromJsonFormat fromJsonFormat) {

return new SagaCoordinator(
return new SagaExecutionComponent(
retryDelay,
persistentStore,
fromJsonFormat,
Expand Down
Expand Up @@ -25,7 +25,7 @@

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.servicecomb.saga.core.application.SagaCoordinator;
import io.servicecomb.saga.core.application.SagaExecutionComponent;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand All @@ -48,7 +48,7 @@ public class SagaControllerTest {
private final ObjectMapper objectMapper = new ObjectMapper();

@MockBean
private SagaCoordinator sagaCoordinator;
private SagaExecutionComponent sagaExecutionComponent;

@MockBean
private SagaEventRepo repo;
Expand Down

0 comments on commit a34500a

Please sign in to comment.