Skip to content

Commit

Permalink
Storing tracking token in Saga instances where applicable
Browse files Browse the repository at this point in the history
  • Loading branch information
abuijze committed Jun 9, 2016
1 parent 07768c2 commit ead7c58
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
/*
* Copyright (c) 2010-2016. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -15,10 +18,12 @@

import org.axonframework.common.Assert;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.saga.metamodel.SagaModel;
import org.axonframework.eventsourcing.eventstore.TrackingToken;

import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -38,7 +43,7 @@ public class AnnotatedSaga<T> extends SagaLifecycle implements Saga<T> {
private volatile boolean isActive = true;
private final String sagaId;
private final T sagaInstance;
private TrackingToken trackingToken;
private AtomicReference<TrackingToken> trackingToken;

/**
* Creates an AnnotatedSaga instance to wrap the given {@code annotatedSaga}, identifier with the given
Expand All @@ -48,15 +53,17 @@ public class AnnotatedSaga<T> extends SagaLifecycle implements Saga<T> {
* @param sagaId The identifier of this Saga instance
* @param associationValues The current associations of this Saga
* @param annotatedSaga The object instance representing the Saga
* @param trackingToken The token identifying the position in a stream the saga has last processed
* @param metaModel The model describing Saga structure
*/
public AnnotatedSaga(String sagaId, Set<AssociationValue> associationValues,
T annotatedSaga, SagaModel<T> metaModel) {
T annotatedSaga, TrackingToken trackingToken, SagaModel<T> metaModel) {
Assert.notNull(annotatedSaga, "SagaInstance may not be null");
this.sagaId = sagaId;
this.associationValues = new AssociationValuesImpl(associationValues);
this.sagaInstance = annotatedSaga;
this.metaModel = metaModel;
this.trackingToken = new AtomicReference<>(trackingToken);
}

@Override
Expand Down Expand Up @@ -92,8 +99,10 @@ public final boolean handle(EventMessage<?> event) {
.filter(h -> getAssociationValues().contains(h.getAssociationValue(event)))
.findFirst().map(h -> {
try {
// TODO: If events is tracked event, check for (and store) token
executeWithResult(() -> h.handle(event, sagaInstance));
if (event instanceof TrackedEventMessage) {
this.trackingToken.set(((TrackedEventMessage) event).trackingToken());
}
} catch (RuntimeException | Error e) {
throw e;
} catch (Exception e) {
Expand All @@ -117,7 +126,7 @@ public boolean isActive() {

@Override
public TrackingToken trackingToken() {
return trackingToken;
return trackingToken.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public AnnotatedSaga<T> newInstance(Callable<T> sagaFactory) {
T sagaRoot = sagaFactory.call();
injector.injectResources(sagaRoot);
AnnotatedSaga<T> saga = new AnnotatedSaga<>(IdentifierFactory.getInstance().generateIdentifier(),
Collections.emptySet(), sagaRoot, sagaModel);
Collections.emptySet(), sagaRoot, null, sagaModel);
unitOfWork.onPrepareCommit(u -> {
if (saga.isActive()) {
storeSaga(saga);
Expand Down Expand Up @@ -176,7 +176,7 @@ protected AnnotatedSaga<T> doLoadSaga(String sagaIdentifier) {
if (entry != null) {
T saga = entry.saga();
injector.injectResources(saga);
return new AnnotatedSaga<>(sagaIdentifier, entry.associationValues(), saga, sagaModel);
return new AnnotatedSaga<>(sagaIdentifier, entry.associationValues(), saga, entry.trackingToken(), sagaModel);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
/*
* Copyright (c) 2010-2016. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -31,7 +34,7 @@ public class AnnotatedSagaTest {
@Test
public void testInvokeSaga() throws Exception {
StubAnnotatedSaga testSubject = new StubAnnotatedSaga();
AnnotatedSaga<StubAnnotatedSaga> s = new AnnotatedSaga<>("id", Collections.emptySet(), testSubject,
AnnotatedSaga<StubAnnotatedSaga> s = new AnnotatedSaga<>("id", Collections.emptySet(), testSubject, null,
new DefaultSagaMetaModelFactory().modelOf(StubAnnotatedSaga.class));
s.doAssociateWith(new AssociationValue("propertyName", "id"));
s.handle(new GenericEventMessage<>(new RegularEvent("id")));
Expand All @@ -43,7 +46,7 @@ public void testInvokeSaga() throws Exception {
@Test
public void testEndedAfterInvocation_BeanProperty() throws Exception {
StubAnnotatedSaga testSubject = new StubAnnotatedSaga();
AnnotatedSaga<StubAnnotatedSaga> s = new AnnotatedSaga<>("id", Collections.emptySet(), testSubject,
AnnotatedSaga<StubAnnotatedSaga> s = new AnnotatedSaga<>("id", Collections.emptySet(), testSubject, null,
new DefaultSagaMetaModelFactory().modelOf(StubAnnotatedSaga.class));
s.doAssociateWith(new AssociationValue("propertyName", "id"));
s.handle(new GenericEventMessage<>(new RegularEvent("id")));
Expand All @@ -56,7 +59,7 @@ public void testEndedAfterInvocation_BeanProperty() throws Exception {
@Test
public void testEndedAfterInvocation_WhenAssociationIsRemoved() throws Exception {
StubAnnotatedSaga testSubject = new StubAnnotatedSagaWithExplicitAssociationRemoval();
AnnotatedSaga<StubAnnotatedSaga> s = new AnnotatedSaga<>("id", Collections.emptySet(), testSubject,
AnnotatedSaga<StubAnnotatedSaga> s = new AnnotatedSaga<>("id", Collections.emptySet(), testSubject, null,
new DefaultSagaMetaModelFactory().modelOf(StubAnnotatedSaga.class));
s.doAssociateWith(new AssociationValue("propertyName", "id"));
s.handle(new GenericEventMessage<>(new RegularEvent("id")));
Expand All @@ -69,7 +72,7 @@ public void testEndedAfterInvocation_WhenAssociationIsRemoved() throws Exception
@Test
public void testEndedAfterInvocation_UniformAccessPrinciple() throws Exception {
StubAnnotatedSaga testSubject = new StubAnnotatedSaga();
AnnotatedSaga<StubAnnotatedSaga> s = new AnnotatedSaga<>("id", Collections.emptySet(), testSubject,
AnnotatedSaga<StubAnnotatedSaga> s = new AnnotatedSaga<>("id", Collections.emptySet(), testSubject, null,
new DefaultSagaMetaModelFactory().modelOf(StubAnnotatedSaga.class));
s.doAssociateWith(new AssociationValue("propertyName", "id"));
s.handle(new GenericEventMessage<>(new UniformAccessEvent("id")));
Expand Down

0 comments on commit ead7c58

Please sign in to comment.