Skip to content

Commit

Permalink
SCB-152 sent tx compensated event on compensation completed
Browse files Browse the repository at this point in the history
Signed-off-by: seanyinx <sean.yin@huawei.com>
  • Loading branch information
seanyinx authored and WillemJiang committed Jan 2, 2018
1 parent 49235b5 commit 9c178a0
Show file tree
Hide file tree
Showing 15 changed files with 211 additions and 82 deletions.
Expand Up @@ -82,7 +82,7 @@ public void addCompensationContext(Method compensationMethod, Object target) {
compensationContexts.put(compensationMethod.toString(), new CompensationContext(target, compensationMethod));
}

public void compensate(String globalTxId, String localTxId, String compensationMethod, Object[] payloads) {
public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
CompensationContext compensationContext = compensationContexts.get(compensationMethod);

try {
Expand Down
Expand Up @@ -22,13 +22,20 @@
import org.springframework.context.annotation.EnableAspectJAutoProxy;

import io.servicecomb.saga.omega.context.OmegaContext;
import io.servicecomb.saga.omega.transaction.CompensationMessageHandler;
import io.servicecomb.saga.omega.transaction.MessageHandler;
import io.servicecomb.saga.omega.transaction.MessageSender;
import io.servicecomb.saga.omega.transaction.TransactionAspect;

@Configuration
@EnableAspectJAutoProxy
public class TransactionAspectConfig {

@Bean
MessageHandler messageHandler(MessageSender sender, OmegaContext context) {
return new CompensationMessageHandler(sender, context);
}

@Bean
TransactionAspect transactionAspect(MessageSender sender, OmegaContext context) {
return new TransactionAspect(sender, context);
Expand Down
Expand Up @@ -20,16 +20,14 @@
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static io.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThat;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import org.junit.After;
import org.junit.Before;
Expand All @@ -46,7 +44,10 @@
import io.servicecomb.saga.omega.context.UniqueIdGenerator;
import io.servicecomb.saga.omega.transaction.MessageHandler;
import io.servicecomb.saga.omega.transaction.MessageSender;
import io.servicecomb.saga.omega.transaction.TxEvent;
import io.servicecomb.saga.omega.transaction.TxAbortedEvent;
import io.servicecomb.saga.omega.transaction.TxCompensatedEvent;
import io.servicecomb.saga.omega.transaction.TxEndedEvent;
import io.servicecomb.saga.omega.transaction.TxStartedEvent;
import io.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;

@RunWith(SpringRunner.class)
Expand All @@ -62,7 +63,7 @@ public class TransactionInterceptionTest {
private final String email = uniquify("email");

@Autowired
private List<byte[]> messages;
private List<String> messages;

@Autowired
private TransactionalUserService userService;
Expand Down Expand Up @@ -94,11 +95,11 @@ public void sendsUserToRemote_AroundTransaction() throws Exception {

String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();

assertEquals(
asList(
txStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, username, email),
txEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod)),
toString(messages)
assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
toArray(messages)
);

User actual = userRepository.findOne(user.id());
Expand All @@ -107,19 +108,22 @@ public void sendsUserToRemote_AroundTransaction() throws Exception {

@Test
public void sendsAbortEvent_OnSubTransactionFailure() throws Exception {
Throwable throwable = null;
User user = new User(ILLEGAL_USER, email);
try {
userService.add(new User(ILLEGAL_USER, email));
userService.add(user);
expectFailing(IllegalArgumentException.class);
} catch (IllegalArgumentException ignored) {
throwable = ignored;
}

String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();

assertEquals(
asList(
txStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, ILLEGAL_USER, email),
txAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod)),
toString(messages)
assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable).toString()},
toArray(messages)
);
}

Expand All @@ -133,74 +137,46 @@ public void compensateOnTransactionException() throws Exception {

String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();

messageHandler.onReceive(globalTxId, this.localTxId, compensationMethod, user);
messageHandler.onReceive(globalTxId, localTxId, compensationMethod, anotherUser);
messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user);
messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser);

assertThat(userRepository.findOne(user.id()), is(nullValue()));
assertThat(userRepository.findOne(anotherUser.id()), is(nullValue()));

assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod, user).toString(),
new TxEndedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(),
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser).toString(),
new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()
},
toArray(messages)
);
}

private List<String> toString(List<byte[]> messages) {
return messages.stream()
.map(String::new)
.collect(Collectors.toList());
private String[] toArray(List<String> messages) {
return messages.toArray(new String[messages.size()]);
}

@Configuration
static class MessageConfig {
private final List<byte[]> messages = new ArrayList<>();
private final List<String> messages = new ArrayList<>();

@Bean
OmegaContext omegaContext() {
return new OmegaContext(new UniqueIdGenerator());
}

@Bean
List<byte[]> messages() {
List<String> messages() {
return messages;
}

@Bean
MessageSender sender() {
return (event) -> messages.add(serialize(event));
}

private byte[] serialize(TxEvent event) {
if (TX_STARTED_EVENT.equals(event.type())) {
User user = ((User) event.payloads()[0]);
return txStartedEvent(event.globalTxId(),
event.localTxId(),
event.parentTxId(),
event.compensationMethod(),
user.username(),
user.email()).getBytes();
}
return txEndedEvent(event.globalTxId(),
event.localTxId(),
event.parentTxId(),
event.compensationMethod()).getBytes();
}

@Bean
MessageHandler handler(OmegaContext omegaContext) {
return omegaContext::compensate;
return (event) -> messages.add(event.toString());
}
}

private static String txStartedEvent(String globalTxId,
String localTxId,
String parentTxId,
String compensationMethod,
String username,
String email) {
return globalTxId + ":" + localTxId + ":" + parentTxId + ":" + compensationMethod + ":" + TX_STARTED_EVENT + ":" + username + ":" + email;
}

private static String txEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
return globalTxId + ":" + localTxId + ":" + parentTxId + ":" + compensationMethod + ":" + TX_ENDED_EVENT;
}

private static String txAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
return globalTxId + ":" + localTxId + ":" + parentTxId + ":" + compensationMethod + ":" + TX_ENDED_EVENT;
}
}
Expand Up @@ -70,4 +70,12 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(id, username, email);
}

@Override
public String toString() {
return "User{" +
"username='" + username + '\'' +
", email='" + email + '\'' +
'}';
}
}
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.servicecomb.saga.omega.transaction;

import io.servicecomb.saga.omega.context.OmegaContext;

public class CompensationMessageHandler implements MessageHandler {
private final MessageSender sender;
private final OmegaContext omegaContext;

public CompensationMessageHandler(MessageSender sender, OmegaContext omegaContext) {
this.sender = sender;
this.omegaContext = omegaContext;
}

@Override
public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
omegaContext.compensate(globalTxId, localTxId, compensationMethod, payloads);
sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
}
}
Expand Up @@ -17,9 +17,6 @@

package io.servicecomb.saga.omega.transaction;

import java.io.PrintWriter;
import java.io.StringWriter;

class FailedTransactionInterceptor {
private final MessageSender sender;

Expand All @@ -28,12 +25,6 @@ class FailedTransactionInterceptor {
}

void intercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace(throwable)));
}

private String stackTrace(Throwable e) {
StringWriter writer = new StringWriter();
e.printStackTrace(new PrintWriter(writer));
return writer.toString();
sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable));
}
}
Expand Up @@ -18,5 +18,5 @@
package io.servicecomb.saga.omega.transaction;

public interface MessageHandler {
void onReceive(String globalTxId, String localTxId, String compensationMethod, Object... payloads);
void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads);
}
Expand Up @@ -17,8 +17,17 @@

package io.servicecomb.saga.omega.transaction;

class TxAbortedEvent extends TxEvent {
TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, String stackTrace) {
super(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace);
import java.io.PrintWriter;
import java.io.StringWriter;

public class TxAbortedEvent extends TxEvent {
public TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
super(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace(throwable));
}

private static String stackTrace(Throwable e) {
StringWriter writer = new StringWriter();
e.printStackTrace(new PrintWriter(writer));
return writer.toString();
}
}
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.servicecomb.saga.omega.transaction;

public class TxCompensatedEvent extends TxEvent {
public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
super(globalTxId, localTxId, parentTxId, compensationMethod);
}
}
Expand Up @@ -17,8 +17,8 @@

package io.servicecomb.saga.omega.transaction;

class TxEndedEvent extends TxEvent {
TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
public class TxEndedEvent extends TxEvent {
public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
super(globalTxId, localTxId, parentTxId, compensationMethod);
}
}
Expand Up @@ -17,6 +17,8 @@

package io.servicecomb.saga.omega.transaction;

import java.util.Arrays;

public class TxEvent {
private final long timestamp;
private final String globalTxId;
Expand Down Expand Up @@ -61,4 +63,15 @@ public String type() {
public String compensationMethod() {
return compensationMethod;
}

@Override
public String toString() {
return "TxEvent{" +
"globalTxId='" + globalTxId + '\'' +
", localTxId='" + localTxId + '\'' +
", parentTxId='" + parentTxId + '\'' +
", compensationMethod='" + compensationMethod + '\'' +
", payloads=" + Arrays.toString(payloads) +
'}';
}
}
Expand Up @@ -17,9 +17,9 @@

package io.servicecomb.saga.omega.transaction;

class TxStartedEvent extends TxEvent {
public class TxStartedEvent extends TxEvent {

TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object[] payloads) {
public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
super(globalTxId, localTxId, parentTxId, compensationMethod, payloads);
}
}

0 comments on commit 9c178a0

Please sign in to comment.