Skip to content

Commit

Permalink
JAV-225 ensured saga stored from partially completed parallel transac…
Browse files Browse the repository at this point in the history
…tions
  • Loading branch information
seanyinx committed Jul 29, 2017
1 parent 0d2be4b commit 0842501
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

interface Compensation extends Operation {

Compensation NO_OP_COMPENSATION = () -> {
Compensation SAGA_START_COMPENSATION = () -> {
};

Compensation SAGA_END_COMPENSATION = () -> {
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public boolean replay(Collection<Node<SagaTask>> nodes, Map<Operation, Collectio
event.play(iterator);
log.info("Completed playing event {}", event);
}
} else {
} else if (!completedOperations.containsKey(task.transaction())) {
// this transaction never started
iterator.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package io.servicecomb.saga.core;

import static io.servicecomb.saga.core.Compensation.NO_OP_COMPENSATION;
import static io.servicecomb.saga.core.Transaction.NO_OP_TRANSACTION;
import static io.servicecomb.saga.core.Compensation.SAGA_END_COMPENSATION;
import static io.servicecomb.saga.core.Transaction.SAGA_END_TRANSACTION;

class SagaEndTask implements SagaTask {

Expand All @@ -36,7 +36,7 @@ public long id() {

@Override
public Operation transaction() {
return NO_OP_TRANSACTION;
return SAGA_END_TRANSACTION;
}

@Override
Expand All @@ -55,6 +55,6 @@ public void abort() {

@Override
public Operation compensation() {
return NO_OP_COMPENSATION;
return SAGA_END_COMPENSATION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package io.servicecomb.saga.core;

import static io.servicecomb.saga.core.Compensation.NO_OP_COMPENSATION;
import static io.servicecomb.saga.core.Transaction.NO_OP_TRANSACTION;
import static io.servicecomb.saga.core.Compensation.SAGA_START_COMPENSATION;
import static io.servicecomb.saga.core.Transaction.SAGA_START_TRANSACTION;

class SagaStartTask implements SagaTask {

Expand All @@ -37,7 +37,7 @@ public long id() {

@Override
public Operation transaction() {
return NO_OP_TRANSACTION;
return SAGA_START_TRANSACTION;
}

@Override
Expand All @@ -57,6 +57,6 @@ public void abort() {

@Override
public Operation compensation() {
return NO_OP_COMPENSATION;
return SAGA_START_COMPENSATION;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.servicecomb.saga.core;

public interface Transaction extends Operation {
interface Transaction extends Operation {

Transaction NO_OP_TRANSACTION = () -> {
Transaction SAGA_START_TRANSACTION = () -> {
};

Transaction SAGA_END_TRANSACTION = () -> {
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package io.servicecomb.saga.core;

import static io.servicecomb.saga.core.Compensation.NO_OP_COMPENSATION;
import static io.servicecomb.saga.core.Transaction.NO_OP_TRANSACTION;
import static io.servicecomb.saga.core.Compensation.SAGA_START_COMPENSATION;
import static io.servicecomb.saga.core.Transaction.SAGA_START_TRANSACTION;

public class DummyTask implements SagaTask {

Expand All @@ -28,7 +28,7 @@ public long id() {

@Override
public Operation transaction() {
return NO_OP_TRANSACTION;
return SAGA_START_TRANSACTION;
}

@Override
Expand All @@ -48,6 +48,6 @@ public void abort() {

@Override
public Operation compensation() {
return NO_OP_COMPENSATION;
return SAGA_START_COMPENSATION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package io.servicecomb.saga.core;

import static io.servicecomb.saga.core.Compensation.NO_OP_COMPENSATION;
import static io.servicecomb.saga.core.Transaction.NO_OP_TRANSACTION;
import static io.servicecomb.saga.core.SagaEventMatcher.eventWith;
import static io.servicecomb.saga.core.Transaction.SAGA_END_TRANSACTION;
import static io.servicecomb.saga.core.Transaction.SAGA_START_TRANSACTION;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -78,15 +78,15 @@ public void recoverSagaWithEventsFromEventStore() {
coordinator.run();

assertThat(eventStore, contains(
eventWith(1L, NO_OP_TRANSACTION, SagaStartedEvent.class),
eventWith(1L, SAGA_START_TRANSACTION, SagaStartedEvent.class),
eventWith(2L, transaction1, TransactionStartedEvent.class),
eventWith(3L, transaction1, TransactionEndedEvent.class),
eventWith(4L, transaction2, TransactionStartedEvent.class),
eventWith(5L, transaction2, TransactionStartedEvent.class),
eventWith(6L, transaction2, TransactionEndedEvent.class),
eventWith(7L, transaction3, TransactionStartedEvent.class),
eventWith(8L, transaction3, TransactionEndedEvent.class),
eventWith(9L, NO_OP_COMPENSATION, SagaEndedEvent.class)
eventWith(9L, SAGA_END_TRANSACTION, SagaEndedEvent.class)
));
}
}

0 comments on commit 0842501

Please sign in to comment.