Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SCB-1707] Forward compensation only sends once a failure event if multiple retries fail #630

Merged
merged 3 commits into from Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,7 +24,7 @@ public class TxStartedEvent extends TxEvent {
private String compensationMethod;
private byte[] payloads;
private String retryMethod;
private int retries;
private int forwardRetries;

public String getCompensationMethod() {
return compensationMethod;
Expand All @@ -50,12 +50,12 @@ public void setRetryMethod(String retryMethod) {
this.retryMethod = retryMethod;
}

public int getRetries() {
return retries;
public int getForwardRetries() {
return forwardRetries;
}

public void setRetries(int retries) {
this.retries = retries;
public void setForwardRetries(int forwardRetries) {
this.forwardRetries = forwardRetries;
}

public static Builder builder() {
Expand Down Expand Up @@ -110,8 +110,8 @@ public Builder retryMethod(String retryMethod) {
return this;
}

public Builder retries(int retries) {
txStartedEvent.setRetries(retries);
public Builder forwardRetries(int forwardRetries) {
txStartedEvent.setForwardRetries(forwardRetries);
return this;
}

Expand Down
Expand Up @@ -32,7 +32,7 @@ public AddTxEventDomain(TxStartedEvent event) {
this.event = event;
this.compensationMethod = event.getCompensationMethod();
this.payloads = event.getPayloads();
this.retries = event.getRetries();
this.retries = event.getForwardRetries();
}

public TxState getState() {
Expand Down
Expand Up @@ -106,9 +106,9 @@ public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObser
message.getParentTxId().isEmpty() ? null : message.getParentTxId(),
message.getType(),
message.getCompensationMethod(),
message.getTimeout(),
message.getForwardTimeout(),
message.getRetryMethod(),
message.getRetries(),
message.getForwardRetries(),
message.getPayloads().toByteArray()
));

Expand Down
Expand Up @@ -108,7 +108,7 @@ public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObser
.instanceId(message.getInstanceId())
.globalTxId(message.getGlobalTxId())
.createTime(new Date())
.timeout(message.getTimeout()).build();
.timeout(message.getForwardTimeout()).build();
} else if (message.getType().equals(EventType.SagaEndedEvent.name())) {
event = org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent.builder()
.serviceName(message.getServiceName())
Expand Down Expand Up @@ -137,7 +137,7 @@ public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObser
.parentTxId(message.getParentTxId().isEmpty() ? null : message.getParentTxId())
.compensationMethod(message.getCompensationMethod())
.retryMethod(message.getRetryMethod())
.retries(message.getRetries())
.forwardRetries(message.getForwardRetries())
.createTime(new Date())
.payloads(message.getPayloads().toByteArray()).build();
} else if (message.getType().equals(EventType.TxEndedEvent.name())) {
Expand Down
Expand Up @@ -553,7 +553,7 @@ private GrpcTxEvent eventOf(EventType eventType,
String compensationMethod,
int timeout,
String retryMethod,
int retries) {
int forwardRetries) {

return GrpcTxEvent.newBuilder()
.setServiceName(serviceName)
Expand All @@ -564,9 +564,9 @@ private GrpcTxEvent eventOf(EventType eventType,
.setParentTxId(parentTxId == null ? "" : parentTxId)
.setType(eventType.name())
.setCompensationMethod(compensationMethod)
.setTimeout(timeout)
.setForwardTimeout(timeout)
.setRetryMethod(retryMethod)
.setRetries(retries)
.setForwardRetries(forwardRetries)
.setPayloads(ByteString.copyFrom(payloads))
.build();
}
Expand Down
Expand Up @@ -535,7 +535,7 @@ private GrpcTxEvent eventOf(EventType eventType,
String compensationMethod,
int timeout,
String retryMethod,
int retries) {
int forwardRetries) {

return GrpcTxEvent.newBuilder()
.setServiceName(serviceName)
Expand All @@ -546,9 +546,9 @@ private GrpcTxEvent eventOf(EventType eventType,
.setParentTxId(parentTxId == null ? "" : parentTxId)
.setType(eventType.name())
.setCompensationMethod(compensationMethod)
.setTimeout(timeout)
.setForwardTimeout(timeout)
.setRetryMethod(retryMethod)
.setRetries(retries)
.setForwardRetries(forwardRetries)
.setPayloads(ByteString.copyFrom(payloads))
.build();
}
Expand Down
Expand Up @@ -255,7 +255,7 @@ private GrpcTxEvent eventOf(EventType eventType,
String compensationMethod,
int timeout,
String retryMethod,
int retries) {
int forwardRetries) {

return GrpcTxEvent.newBuilder()
.setServiceName(serviceName)
Expand All @@ -266,9 +266,9 @@ private GrpcTxEvent eventOf(EventType eventType,
.setParentTxId(parentTxId == null ? "" : parentTxId)
.setType(eventType.name())
.setCompensationMethod(compensationMethod)
.setTimeout(timeout)
.setForwardTimeout(timeout)
.setRetryMethod(retryMethod)
.setRetries(retries)
.setForwardRetries(forwardRetries)
.setPayloads(ByteString.copyFrom(payloads))
.build();
}
Expand Down
Expand Up @@ -255,23 +255,22 @@ public void retrySubTransactionSuccess() {
assertThat(entity.getStatusCode(), is(OK));
assertThat(entity.getBody(), is("Greetings, eric; Welcome to visit the zoo, eric"));

await().atMost(10, SECONDS).until(() -> eventRepo.count() == 8);
await().atMost(10, SECONDS).until(() -> eventRepo.count() == 7);

List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
assertThat(distinctGlobalTxIds.size(), greaterThanOrEqualTo(1));

String globalTxId = distinctGlobalTxIds.get(0);
List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
assertThat(events.size(), is(8));
assertThat(events.size(), is(7));

assertThat(events.get(0).type(), is("SagaStartedEvent"));
assertThat(events.get(1).type(), is("TxStartedEvent"));
assertThat(events.get(2).type(), is("TxEndedEvent"));
assertThat(events.get(3).type(), is("TxStartedEvent"));
assertThat(events.get(4).type(), is("TxAbortedEvent"));
assertThat(events.get(5).type(), is("TxStartedEvent"));
assertThat(events.get(6).type(), is("TxEndedEvent"));
assertThat(events.get(7).type(), is("SagaEndedEvent"));
assertThat(events.get(4).type(), is("TxStartedEvent"));
assertThat(events.get(5).type(), is("TxEndedEvent"));
assertThat(events.get(6).type(), is("SagaEndedEvent"));

assertThat(compensatedMessages.isEmpty(), is(true));
}
Expand All @@ -286,27 +285,25 @@ public void compensateWhenRetryReachesMaximum() throws InterruptedException {

assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));

await().atMost(10, SECONDS).until(() -> eventRepo.count() == 12);
await().atMost(10, SECONDS).until(() -> eventRepo.count() == 10);

List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
assertThat(distinctGlobalTxIds.size(), greaterThanOrEqualTo(1));

String globalTxId = distinctGlobalTxIds.get(0);
List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
assertThat(events.size(), is(12));
assertThat(events.size(), is(10));

assertThat(events.get(0).type(), is("SagaStartedEvent"));
assertThat(events.get(1).type(), is("TxStartedEvent"));
assertThat(events.get(2).type(), is("TxEndedEvent"));
assertThat(events.get(3).type(), is("TxStartedEvent"));
assertThat(events.get(4).type(), is("TxAbortedEvent"));
assertThat(events.get(4).type(), is("TxStartedEvent"));
assertThat(events.get(5).type(), is("TxStartedEvent"));
assertThat(events.get(6).type(), is("TxAbortedEvent"));
assertThat(events.get(7).type(), is("TxStartedEvent"));
assertThat(events.get(8).type(), is("TxAbortedEvent"));
// This event is for the whole saga event
assertThat(events.get(9).type(), is("TxAbortedEvent"));
assertThat(events.get(10).type(), is("TxCompensatedEvent"));
assertThat(events.get(7).type(), is("TxAbortedEvent"));
assertThat(events.get(8).type(), is("TxCompensatedEvent"));
assertThat(events.get(9).type(), is("SagaEndedEvent"));

assertThat(compensatedMessages, Matchers.contains("Goodbye, " + GreetingController.TRESPASSER));
}
Expand Down
Expand Up @@ -259,23 +259,22 @@ public void retrySubTransactionSuccess() {
assertThat(entity.getStatusCode(), is(OK));
assertThat(entity.getBody(), is("Greetings, eric; Welcome to visit the zoo, eric"));

await().atMost(10, SECONDS).until(() -> eventRepo.count() == 8);
await().atMost(10, SECONDS).until(() -> eventRepo.count() == 7);

List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
assertThat(distinctGlobalTxIds.size(), greaterThanOrEqualTo(1));

String globalTxId = distinctGlobalTxIds.get(0);
List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
assertThat(events.size(), is(8));
assertThat(events.size(), is(7));

assertThat(events.get(0).type(), is("SagaStartedEvent"));
assertThat(events.get(1).type(), is("TxStartedEvent"));
assertThat(events.get(2).type(), is("TxEndedEvent"));
assertThat(events.get(3).type(), is("TxStartedEvent"));
assertThat(events.get(4).type(), is("TxAbortedEvent"));
assertThat(events.get(5).type(), is("TxStartedEvent"));
assertThat(events.get(6).type(), is("TxEndedEvent"));
assertThat(events.get(7).type(), is("SagaEndedEvent"));
assertThat(events.get(4).type(), is("TxStartedEvent"));
assertThat(events.get(5).type(), is("TxEndedEvent"));
assertThat(events.get(6).type(), is("SagaEndedEvent"));

assertThat(compensatedMessages.isEmpty(), is(true));
}
Expand All @@ -290,27 +289,24 @@ public void compensateWhenRetryReachesMaximum() throws InterruptedException {

assertThat(entity.getStatusCode(), is(INTERNAL_SERVER_ERROR));

await().atMost(10, SECONDS).until(() -> eventRepo.count() == 12);
await().atMost(10, SECONDS).until(() -> eventRepo.count() == 10);

List<String> distinctGlobalTxIds = eventRepo.findDistinctGlobalTxId();
assertThat(distinctGlobalTxIds.size(), greaterThanOrEqualTo(1));

String globalTxId = distinctGlobalTxIds.get(0);
List<TxEvent> events = eventRepo.findByGlobalTxIdOrderByCreationTime(globalTxId);
assertThat(events.size(), is(12));
assertThat(events.size(), is(10));

assertThat(events.get(0).type(), is("SagaStartedEvent"));
assertThat(events.get(1).type(), is("TxStartedEvent"));
assertThat(events.get(2).type(), is("TxEndedEvent"));
assertThat(events.get(3).type(), is("TxStartedEvent"));
assertThat(events.get(4).type(), is("TxAbortedEvent"));
assertThat(events.get(4).type(), is("TxStartedEvent"));
assertThat(events.get(5).type(), is("TxStartedEvent"));
assertThat(events.get(6).type(), is("TxAbortedEvent"));
assertThat(events.get(7).type(), is("TxStartedEvent"));
assertThat(events.get(8).type(), is("TxAbortedEvent"));
// This event is for the whole saga event
assertThat(events.get(9).type(), is("TxAbortedEvent"));
assertThat(events.get(10).type(), is("TxCompensatedEvent"));
assertThat(events.get(7).type(), is("TxAbortedEvent"));
assertThat(events.get(8).type(), is("TxCompensatedEvent"));
assertThat(events.get(9).type(), is("SagaEndedEvent"));

assertThat(compensatedMessages, Matchers.contains("Goodbye, " + GreetingController.TRESPASSER));
}
Expand Down
Expand Up @@ -19,8 +19,6 @@

import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.lang.invoke.MethodHandles;
import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
import org.apache.servicecomb.pack.omega.connector.grpc.core.LoadBalanceContext;
import org.apache.servicecomb.pack.omega.context.ServiceConfig;
Expand All @@ -37,8 +35,6 @@
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc;
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
import org.apache.servicecomb.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceStub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GrpcSagaClientMessageSender implements SagaMessageSender {

Expand Down Expand Up @@ -113,10 +109,10 @@ private GrpcTxEvent convertEvent(TxEvent event) {
.setLocalTxId(event.localTxId())
.setParentTxId(event.parentTxId() == null ? "" : event.parentTxId())
.setType(event.type().name())
.setTimeout(event.timeout())
.setForwardTimeout(event.timeout())
.setCompensationMethod(event.compensationMethod())
.setRetryMethod(event.retryMethod() == null ? "" : event.retryMethod())
.setRetries(event.retries())
.setForwardRetries(event.forwardRetries())
.setPayloads(payloads);

return builder.build();
Expand Down
Expand Up @@ -171,9 +171,9 @@ public void onTxEvent(GrpcTxEvent request, StreamObserver<GrpcAck> responseObser
request.getLocalTxId(),
request.getParentTxId(),
request.getCompensationMethod(),
request.getTimeout(),
request.getForwardTimeout(),
request.getRetryMethod(),
request.getRetries(),
request.getForwardRetries(),
new String(request.getPayloads().toByteArray())));

sleep();
Expand Down
Expand Up @@ -203,19 +203,16 @@ public void retryTillSuccess() {
fail("unexpected exception throw: " + e);
}

assertThat(messages.size(), is(4));
assertThat(messages.size(), is(3));

assertThat(messages.get(0),
is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, user, 1)
.toString()));

String abortedEvent = messages.get(1);
assertThat(abortedEvent, allOf(containsString("TxAbortedEvent"), containsString("Retry harder")));

assertThat(messages.get(2),
assertThat(messages.get(1),
is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, user, 1)
.toString()));
assertThat(messages.get(3),
assertThat(messages.get(2),
is(new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2).toString()));

assertThat(userRepository.count(), is(1L));
Expand All @@ -234,19 +231,16 @@ public void retryReachesMaximumThenThrowsException() {
assertThat(e.getMessage(), is("Retry harder"));
}

assertThat(messages.size(), is(4));
assertThat(messages.size(), is(3));
assertThat(messages.get(0),
is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 2, user, 3)
.toString()));

String abortedEvent1 = messages.get(1);
assertThat(abortedEvent1, allOf(containsString("TxAbortedEvent"), containsString("Retry harder")));

assertThat(messages.get(2),
assertThat(messages.get(1),
is(new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod2, 0, retryMethod, 1, user, 3)
.toString()));

String abortedEvent2 = messages.get(3);
String abortedEvent2 = messages.get(2);
assertThat(abortedEvent2, allOf(containsString("TxAbortedEvent"), containsString("Retry harder")));

assertThat(userRepository.count(), is(0L));
Expand Down
Expand Up @@ -25,19 +25,19 @@
public abstract class AbstractRecoveryPolicy implements RecoveryPolicy {

public abstract Object applyTo(ProceedingJoinPoint joinPoint, Compensable compensable,
CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries)
CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int forwardRetries)
throws Throwable;

@Override
public Object apply(ProceedingJoinPoint joinPoint, Compensable compensable,
CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int retries)
CompensableInterceptor interceptor, OmegaContext context, String parentTxId, int forwardRetries)
throws Throwable {
Object result;
if(compensable.forwardTimeout()>0){
RecoveryPolicyTimeoutWrapper wrapper = new RecoveryPolicyTimeoutWrapper(this);
result = wrapper.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
result = wrapper.applyTo(joinPoint, compensable, interceptor, context, parentTxId, forwardRetries);
} else {
result = this.applyTo(joinPoint, compensable, interceptor, context, parentTxId, retries);
result = this.applyTo(joinPoint, compensable, interceptor, context, parentTxId, forwardRetries);
}
return result;
}
Expand Down
Expand Up @@ -30,9 +30,9 @@ public class CompensableInterceptor implements EventAwareInterceptor {

@Override
public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
int retries, Object... message) {
int forwardRetries, Object... message) {
return sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod,
timeout, retriesMethod, retries, message));
timeout, retriesMethod, forwardRetries, message));
}

@Override
Expand Down