Skip to content

Commit

Permalink
JAV-225 able to process tasks in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
seanyinx committed Jul 28, 2017
1 parent 75ef6a4 commit 9fe9291
Show file tree
Hide file tree
Showing 36 changed files with 725 additions and 387 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2017 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.servicecomb.saga.core;

import io.servicecomb.saga.core.dag.Node;
import io.servicecomb.saga.core.dag.Traveller;
import java.util.Collection;
import java.util.Map;

abstract class AbstractSagaState implements SagaState {

private final Traveller<SagaTask> traveller;

AbstractSagaState(Traveller<SagaTask> traveller) {
this.traveller = traveller;
}

@Override
public boolean hasNext() {
return traveller.hasNext();
}

@Override
public void run() {
Collection<Node<SagaTask>> nodes = traveller.nodes();

// finish pending tasks from saga log at startup
invoke(nodes);
nodes.clear();

while (traveller.hasNext()) {
traveller.next();
invoke(nodes);
nodes.clear();
}
}

abstract void invoke(Collection<Node<SagaTask>> nodes);

@Override
public void replay(Map<Operation, Collection<SagaEvent>> completedOperationsCopy) {
boolean played = false;
Collection<Node<SagaTask>> nodes = traveller.nodes();
while (traveller.hasNext() && !played) {
traveller.next();
played = replay(nodes, completedOperationsCopy);
}
}

abstract boolean replay(Collection<Node<SagaTask>> nodes,
Map<Operation, Collection<SagaEvent>> completedOperationsCopy);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,21 @@

package io.servicecomb.saga.core;

import java.lang.invoke.MethodHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BackwardRecovery implements RecoveryPolicy {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Override
public SagaState apply(SagaState sagaState) {
return CompensationState.INSTANCE;
public void apply(SagaTask task) {
try {
task.commit();
} catch (Exception e) {
log.info("Applying {} policy", description());
task.abort();
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,28 @@

package io.servicecomb.saga.core;

import java.util.Deque;
import java.util.Queue;
import io.servicecomb.saga.core.dag.Node;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

class CompensationEndedEvent extends SagaEvent {

CompensationEndedEvent(long id, Compensation compensation) {
CompensationEndedEvent(long id, SagaTask compensation) {
super(id, compensation);
}

@Override
public SagaState play(SagaState currentState, Queue<SagaTask> pendingTasks, Deque<SagaTask> executedTasks,
IdGenerator<Long> eventIdGenerator) {
public void gatherTo(Map<Operation, Collection<SagaEvent>> completedOperations, Set<SagaTask> orphanOperations) {
completedOperations.get(payload().compensation()).add(this);
orphanOperations.remove(payload());
}

eventIdGenerator.nextId();
eventIdGenerator.nextId();
executedTasks.pop();
return currentState;
@Override
public void play(IdGenerator<Long> idGenerator, Iterator<Node<SagaTask>> iterator) {
idGenerator.nextId();
idGenerator.nextId();
iterator.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,27 @@

package io.servicecomb.saga.core;

import java.util.Deque;
import java.util.Queue;
import io.servicecomb.saga.core.dag.Node;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;

class CompensationStartedEvent extends SagaEvent {

CompensationStartedEvent(long id, Compensation compensation) {
CompensationStartedEvent(long id, SagaTask compensation) {
super(id, compensation);
}

@Override
public SagaState play(SagaState currentState, Queue<SagaTask> pendingTasks, Deque<SagaTask> executedTasks,
IdGenerator<Long> eventIdGenerator) {
return CompensationState.INSTANCE;
public void gatherTo(Map<Operation, Collection<SagaEvent>> completedOperations, Set<SagaTask> orphanOperations) {
completedOperations.put(payload().compensation(), new LinkedList<>());
completedOperations.get(payload().compensation()).add(this);
orphanOperations.add(payload());
}

@Override
public void play(IdGenerator<Long> idGenerator, Iterator<Node<SagaTask>> iterator) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,67 @@

package io.servicecomb.saga.core;

import io.servicecomb.saga.core.dag.Node;
import io.servicecomb.saga.core.dag.Traveller;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

enum CompensationState implements SagaState {
INSTANCE;
class CompensationState extends AbstractSagaState {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final IdGenerator<Long> idGenerator;
private final Map<Operation, Collection<SagaEvent>> completedOperations;

CompensationState(IdGenerator<Long> idGenerator,
Map<Operation, Collection<SagaEvent>> completedOperations, Traveller<SagaTask> traveller) {
super(traveller);
this.idGenerator = idGenerator;
this.completedOperations = completedOperations;
}

@Override
public void invoke(Deque<SagaTask> executedTasks, Queue<SagaTask> pendingTasks) {
SagaTask task = executedTasks.peek();
log.info("Starting task {} id={}", task.description(), task.id());
task.abort();
task.compensate();

log.info("Completed task {} id={}", task.description(), task.id());
executedTasks.pop();
}

@Override
void invoke(Collection<Node<SagaTask>> nodes) {
for (Node<SagaTask> node : nodes) {
SagaTask task = node.value();

if (completedOperations.containsKey(task.transaction())) {
log.info("Starting task {} id={}", task.description(), task.id());
task.compensate();
log.info("Completed task {} id={}", task.description(), task.id());
}
}
}

boolean replay(Collection<Node<SagaTask>> nodes,
Map<Operation, Collection<SagaEvent>> completedOperations) {

for (Iterator<Node<SagaTask>> iterator = nodes.iterator(); iterator.hasNext(); ) {
SagaTask task = iterator.next().value();
if (completedOperations.containsKey(task.compensation())) {
for (SagaEvent event : completedOperations.get(task.compensation())) {
log.info("Start playing event {} id={}", event.description(), event.id());
event.play(idGenerator, iterator);
log.info("Completed playing event {} id={}", event.description(), event.id());
}
} else {
iterator.remove();
}
}
return !nodes.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@

package io.servicecomb.saga.core;

import java.lang.invoke.MethodHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ForwardRecovery implements RecoveryPolicy {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Override
public SagaState apply(SagaState sagaState) {
return sagaState;
public void apply(SagaTask task) {
boolean success = false;
do {
try {
task.commit();
success = true;
} catch (Exception ignored) {
log.info("Applying {} policy", description());
}
} while (!success);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ class LoggingRecoveryPolicy implements RecoveryPolicy {
}

@Override
public SagaState apply(SagaState sagaState) {
log.info("Applying {} policy", recoveryPolicy.description());
return recoveryPolicy.apply(sagaState);
public void apply(SagaTask task) {
log.info("Starting task {} id={}", task.description(), task.id());
recoveryPolicy.apply(task);
log.info("Completed task {} id={}", task.description(), task.id());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ interface Operation {
Operation NO_OP = () -> {
};

Operation END_OP = () -> {
};

void run();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@

interface RecoveryPolicy extends Descriptive {

SagaState apply(SagaState sagaState);
void apply(SagaTask task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,32 @@ public long id() {
return id;
}

@Override
public Operation transaction() {
return request.transaction();
}

@Override
public void commit() {
eventStore.offer(new TransactionStartedEvent(idGenerator.nextId(), request.transaction()));
eventStore.offer(new TransactionStartedEvent(idGenerator.nextId(), this));
request.commit();
eventStore.offer(new TransactionEndedEvent(idGenerator.nextId(), request.transaction()));
eventStore.offer(new TransactionEndedEvent(idGenerator.nextId(), this));
}

@Override
public void abort() {
eventStore.offer(new CompensationStartedEvent(idGenerator.nextId(), request.compensation()));
public void compensate() {
eventStore.offer(new CompensationStartedEvent(idGenerator.nextId(), this));
request.abort();
eventStore.offer(new CompensationEndedEvent(idGenerator.nextId(), request.compensation()));
eventStore.offer(new CompensationEndedEvent(idGenerator.nextId(), this));
}

@Override
public void abort() {
eventStore.offer(new TransactionAbortedEvent(idGenerator.nextId(), this));
}

@Override
public Operation compensation() {
return request.compensation();
}
}

0 comments on commit 9fe9291

Please sign in to comment.