Skip to content

Commit

Permalink
Merge 606f028 into 49235b5
Browse files Browse the repository at this point in the history
  • Loading branch information
seanyinx committed Jan 2, 2018
2 parents 49235b5 + 606f028 commit 1636fc9
Show file tree
Hide file tree
Showing 15 changed files with 211 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void addCompensationContext(Method compensationMethod, Object target) {
compensationContexts.put(compensationMethod.toString(), new CompensationContext(target, compensationMethod));
}

public void compensate(String globalTxId, String localTxId, String compensationMethod, Object[] payloads) {
public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) {
CompensationContext compensationContext = compensationContexts.get(compensationMethod);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,20 @@
import org.springframework.context.annotation.EnableAspectJAutoProxy;

import io.servicecomb.saga.omega.context.OmegaContext;
import io.servicecomb.saga.omega.transaction.CompensationMessageHandler;
import io.servicecomb.saga.omega.transaction.MessageHandler;
import io.servicecomb.saga.omega.transaction.MessageSender;
import io.servicecomb.saga.omega.transaction.TransactionAspect;

@Configuration
@EnableAspectJAutoProxy
public class TransactionAspectConfig {

@Bean
MessageHandler messageHandler(MessageSender sender, OmegaContext context) {
return new CompensationMessageHandler(sender, context);
}

@Bean
TransactionAspect transactionAspect(MessageSender sender, OmegaContext context) {
return new TransactionAspect(sender, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static io.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThat;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import org.junit.After;
import org.junit.Before;
Expand All @@ -46,7 +44,10 @@
import io.servicecomb.saga.omega.context.UniqueIdGenerator;
import io.servicecomb.saga.omega.transaction.MessageHandler;
import io.servicecomb.saga.omega.transaction.MessageSender;
import io.servicecomb.saga.omega.transaction.TxEvent;
import io.servicecomb.saga.omega.transaction.TxAbortedEvent;
import io.servicecomb.saga.omega.transaction.TxCompensatedEvent;
import io.servicecomb.saga.omega.transaction.TxEndedEvent;
import io.servicecomb.saga.omega.transaction.TxStartedEvent;
import io.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig;

@RunWith(SpringRunner.class)
Expand All @@ -62,7 +63,7 @@ public class TransactionInterceptionTest {
private final String email = uniquify("email");

@Autowired
private List<byte[]> messages;
private List<String> messages;

@Autowired
private TransactionalUserService userService;
Expand Down Expand Up @@ -94,11 +95,11 @@ public void sendsUserToRemote_AroundTransaction() throws Exception {

String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();

assertEquals(
asList(
txStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, username, email),
txEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod)),
toString(messages)
assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()},
toArray(messages)
);

User actual = userRepository.findOne(user.id());
Expand All @@ -107,19 +108,22 @@ public void sendsUserToRemote_AroundTransaction() throws Exception {

@Test
public void sendsAbortEvent_OnSubTransactionFailure() throws Exception {
Throwable throwable = null;
User user = new User(ILLEGAL_USER, email);
try {
userService.add(new User(ILLEGAL_USER, email));
userService.add(user);
expectFailing(IllegalArgumentException.class);
} catch (IllegalArgumentException ignored) {
throwable = ignored;
}

String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();

assertEquals(
asList(
txStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, ILLEGAL_USER, email),
txAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod)),
toString(messages)
assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(),
new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable).toString()},
toArray(messages)
);
}

Expand All @@ -133,74 +137,46 @@ public void compensateOnTransactionException() throws Exception {

String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString();

messageHandler.onReceive(globalTxId, this.localTxId, compensationMethod, user);
messageHandler.onReceive(globalTxId, localTxId, compensationMethod, anotherUser);
messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user);
messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser);

assertThat(userRepository.findOne(user.id()), is(nullValue()));
assertThat(userRepository.findOne(anotherUser.id()), is(nullValue()));

assertArrayEquals(
new String[]{
new TxStartedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod, user).toString(),
new TxEndedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(),
new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser).toString(),
new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(),
new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()
},
toArray(messages)
);
}

private List<String> toString(List<byte[]> messages) {
return messages.stream()
.map(String::new)
.collect(Collectors.toList());
private String[] toArray(List<String> messages) {
return messages.toArray(new String[messages.size()]);
}

@Configuration
static class MessageConfig {
private final List<byte[]> messages = new ArrayList<>();
private final List<String> messages = new ArrayList<>();

@Bean
OmegaContext omegaContext() {
return new OmegaContext(new UniqueIdGenerator());
}

@Bean
List<byte[]> messages() {
List<String> messages() {
return messages;
}

@Bean
MessageSender sender() {
return (event) -> messages.add(serialize(event));
}

private byte[] serialize(TxEvent event) {
if (TX_STARTED_EVENT.equals(event.type())) {
User user = ((User) event.payloads()[0]);
return txStartedEvent(event.globalTxId(),
event.localTxId(),
event.parentTxId(),
event.compensationMethod(),
user.username(),
user.email()).getBytes();
}
return txEndedEvent(event.globalTxId(),
event.localTxId(),
event.parentTxId(),
event.compensationMethod()).getBytes();
}

@Bean
MessageHandler handler(OmegaContext omegaContext) {
return omegaContext::compensate;
return (event) -> messages.add(event.toString());
}
}

private static String txStartedEvent(String globalTxId,
String localTxId,
String parentTxId,
String compensationMethod,
String username,
String email) {
return globalTxId + ":" + localTxId + ":" + parentTxId + ":" + compensationMethod + ":" + TX_STARTED_EVENT + ":" + username + ":" + email;
}

private static String txEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
return globalTxId + ":" + localTxId + ":" + parentTxId + ":" + compensationMethod + ":" + TX_ENDED_EVENT;
}

private static String txAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
return globalTxId + ":" + localTxId + ":" + parentTxId + ":" + compensationMethod + ":" + TX_ENDED_EVENT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,12 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(id, username, email);
}

@Override
public String toString() {
return "User{" +
"username='" + username + '\'' +
", email='" + email + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.servicecomb.saga.omega.transaction;

import io.servicecomb.saga.omega.context.OmegaContext;

public class CompensationMessageHandler implements MessageHandler {
private final MessageSender sender;
private final OmegaContext omegaContext;

public CompensationMessageHandler(MessageSender sender, OmegaContext omegaContext) {
this.sender = sender;
this.omegaContext = omegaContext;
}

@Override
public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
omegaContext.compensate(globalTxId, localTxId, compensationMethod, payloads);
sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package io.servicecomb.saga.omega.transaction;

import java.io.PrintWriter;
import java.io.StringWriter;

class FailedTransactionInterceptor {
private final MessageSender sender;

Expand All @@ -28,12 +25,6 @@ class FailedTransactionInterceptor {
}

void intercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace(throwable)));
}

private String stackTrace(Throwable e) {
StringWriter writer = new StringWriter();
e.printStackTrace(new PrintWriter(writer));
return writer.toString();
sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
package io.servicecomb.saga.omega.transaction;

public interface MessageHandler {
void onReceive(String globalTxId, String localTxId, String compensationMethod, Object... payloads);
void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@

package io.servicecomb.saga.omega.transaction;

class TxAbortedEvent extends TxEvent {
TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, String stackTrace) {
super(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace);
import java.io.PrintWriter;
import java.io.StringWriter;

public class TxAbortedEvent extends TxEvent {
public TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) {
super(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace(throwable));
}

private static String stackTrace(Throwable e) {
StringWriter writer = new StringWriter();
e.printStackTrace(new PrintWriter(writer));
return writer.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.servicecomb.saga.omega.transaction;

public class TxCompensatedEvent extends TxEvent {
public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
super(globalTxId, localTxId, parentTxId, compensationMethod);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package io.servicecomb.saga.omega.transaction;

class TxEndedEvent extends TxEvent {
TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
public class TxEndedEvent extends TxEvent {
public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) {
super(globalTxId, localTxId, parentTxId, compensationMethod);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package io.servicecomb.saga.omega.transaction;

import java.util.Arrays;

public class TxEvent {
private final long timestamp;
private final String globalTxId;
Expand Down Expand Up @@ -61,4 +63,15 @@ public String type() {
public String compensationMethod() {
return compensationMethod;
}

@Override
public String toString() {
return "TxEvent{" +
"globalTxId='" + globalTxId + '\'' +
", localTxId='" + localTxId + '\'' +
", parentTxId='" + parentTxId + '\'' +
", compensationMethod='" + compensationMethod + '\'' +
", payloads=" + Arrays.toString(payloads) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package io.servicecomb.saga.omega.transaction;

class TxStartedEvent extends TxEvent {
public class TxStartedEvent extends TxEvent {

TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object[] payloads) {
public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) {
super(globalTxId, localTxId, parentTxId, compensationMethod, payloads);
}
}

0 comments on commit 1636fc9

Please sign in to comment.