diff --git a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java index ccc37383c..705e74e79 100644 --- a/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java +++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/OmegaContext.java @@ -82,7 +82,7 @@ public void addCompensationContext(Method compensationMethod, Object target) { compensationContexts.put(compensationMethod.toString(), new CompensationContext(target, compensationMethod)); } - public void compensate(String globalTxId, String localTxId, String compensationMethod, Object[] payloads) { + public void compensate(String globalTxId, String localTxId, String compensationMethod, Object... payloads) { CompensationContext compensationContext = compensationContexts.get(compensationMethod); try { diff --git a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java index 5982109d1..da9f3f8bf 100644 --- a/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java +++ b/omega/omega-spring-tx/src/main/java/io/servicecomb/saga/omega/transaction/spring/TransactionAspectConfig.java @@ -22,6 +22,8 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy; import io.servicecomb.saga.omega.context.OmegaContext; +import io.servicecomb.saga.omega.transaction.CompensationMessageHandler; +import io.servicecomb.saga.omega.transaction.MessageHandler; import io.servicecomb.saga.omega.transaction.MessageSender; import io.servicecomb.saga.omega.transaction.TransactionAspect; @@ -29,6 +31,11 @@ @EnableAspectJAutoProxy public class TransactionAspectConfig { + @Bean + MessageHandler messageHandler(MessageSender sender, OmegaContext context) { + return new CompensationMessageHandler(sender, context); + } + @Bean TransactionAspect transactionAspect(MessageSender sender, OmegaContext context) { return new TransactionAspect(sender, context); diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java index 4ffd546d5..7efa30444 100644 --- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java +++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java @@ -20,16 +20,14 @@ import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static io.servicecomb.saga.omega.transaction.spring.TransactionalUserService.ILLEGAL_USER; -import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertThat; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.stream.Collectors; import org.junit.After; import org.junit.Before; @@ -46,7 +44,10 @@ import io.servicecomb.saga.omega.context.UniqueIdGenerator; import io.servicecomb.saga.omega.transaction.MessageHandler; import io.servicecomb.saga.omega.transaction.MessageSender; -import io.servicecomb.saga.omega.transaction.TxEvent; +import io.servicecomb.saga.omega.transaction.TxAbortedEvent; +import io.servicecomb.saga.omega.transaction.TxCompensatedEvent; +import io.servicecomb.saga.omega.transaction.TxEndedEvent; +import io.servicecomb.saga.omega.transaction.TxStartedEvent; import io.servicecomb.saga.omega.transaction.spring.TransactionInterceptionTest.MessageConfig; @RunWith(SpringRunner.class) @@ -62,7 +63,7 @@ public class TransactionInterceptionTest { private final String email = uniquify("email"); @Autowired - private List messages; + private List messages; @Autowired private TransactionalUserService userService; @@ -94,11 +95,11 @@ public void sendsUserToRemote_AroundTransaction() throws Exception { String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString(); - assertEquals( - asList( - txStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, username, email), - txEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod)), - toString(messages) + assertArrayEquals( + new String[]{ + new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(), + new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString()}, + toArray(messages) ); User actual = userRepository.findOne(user.id()); @@ -107,19 +108,22 @@ public void sendsUserToRemote_AroundTransaction() throws Exception { @Test public void sendsAbortEvent_OnSubTransactionFailure() throws Exception { + Throwable throwable = null; + User user = new User(ILLEGAL_USER, email); try { - userService.add(new User(ILLEGAL_USER, email)); + userService.add(user); expectFailing(IllegalArgumentException.class); } catch (IllegalArgumentException ignored) { + throwable = ignored; } String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString(); - assertEquals( - asList( - txStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, ILLEGAL_USER, email), - txAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod)), - toString(messages) + assertArrayEquals( + new String[]{ + new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, user).toString(), + new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable).toString()}, + toArray(messages) ); } @@ -133,22 +137,32 @@ public void compensateOnTransactionException() throws Exception { String compensationMethod = TransactionalUserService.class.getDeclaredMethod("delete", User.class).toString(); - messageHandler.onReceive(globalTxId, this.localTxId, compensationMethod, user); - messageHandler.onReceive(globalTxId, localTxId, compensationMethod, anotherUser); + messageHandler.onReceive(globalTxId, this.localTxId, parentTxId, compensationMethod, user); + messageHandler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser); assertThat(userRepository.findOne(user.id()), is(nullValue())); assertThat(userRepository.findOne(anotherUser.id()), is(nullValue())); + + assertArrayEquals( + new String[]{ + new TxStartedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod, user).toString(), + new TxEndedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(), + new TxStartedEvent(globalTxId, localTxId, parentTxId, compensationMethod, anotherUser).toString(), + new TxEndedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString(), + new TxCompensatedEvent(globalTxId, this.localTxId, parentTxId, compensationMethod).toString(), + new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod).toString() + }, + toArray(messages) + ); } - private List toString(List messages) { - return messages.stream() - .map(String::new) - .collect(Collectors.toList()); + private String[] toArray(List messages) { + return messages.toArray(new String[messages.size()]); } @Configuration static class MessageConfig { - private final List messages = new ArrayList<>(); + private final List messages = new ArrayList<>(); @Bean OmegaContext omegaContext() { @@ -156,51 +170,13 @@ OmegaContext omegaContext() { } @Bean - List messages() { + List messages() { return messages; } @Bean MessageSender sender() { - return (event) -> messages.add(serialize(event)); - } - - private byte[] serialize(TxEvent event) { - if (TX_STARTED_EVENT.equals(event.type())) { - User user = ((User) event.payloads()[0]); - return txStartedEvent(event.globalTxId(), - event.localTxId(), - event.parentTxId(), - event.compensationMethod(), - user.username(), - user.email()).getBytes(); - } - return txEndedEvent(event.globalTxId(), - event.localTxId(), - event.parentTxId(), - event.compensationMethod()).getBytes(); - } - - @Bean - MessageHandler handler(OmegaContext omegaContext) { - return omegaContext::compensate; + return (event) -> messages.add(event.toString()); } } - - private static String txStartedEvent(String globalTxId, - String localTxId, - String parentTxId, - String compensationMethod, - String username, - String email) { - return globalTxId + ":" + localTxId + ":" + parentTxId + ":" + compensationMethod + ":" + TX_STARTED_EVENT + ":" + username + ":" + email; - } - - private static String txEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) { - return globalTxId + ":" + localTxId + ":" + parentTxId + ":" + compensationMethod + ":" + TX_ENDED_EVENT; - } - - private static String txAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) { - return globalTxId + ":" + localTxId + ":" + parentTxId + ":" + compensationMethod + ":" + TX_ENDED_EVENT; - } } diff --git a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/User.java b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/User.java index 6b2e55fa6..5af25e4bb 100644 --- a/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/User.java +++ b/omega/omega-spring-tx/src/test/java/io/servicecomb/saga/omega/transaction/spring/User.java @@ -70,4 +70,12 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(id, username, email); } + + @Override + public String toString() { + return "User{" + + "username='" + username + '\'' + + ", email='" + email + '\'' + + '}'; + } } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/CompensationMessageHandler.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/CompensationMessageHandler.java new file mode 100644 index 000000000..edc3243e7 --- /dev/null +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/CompensationMessageHandler.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.servicecomb.saga.omega.transaction; + +import io.servicecomb.saga.omega.context.OmegaContext; + +public class CompensationMessageHandler implements MessageHandler { + private final MessageSender sender; + private final OmegaContext omegaContext; + + public CompensationMessageHandler(MessageSender sender, OmegaContext omegaContext) { + this.sender = sender; + this.omegaContext = omegaContext; + } + + @Override + public void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) { + omegaContext.compensate(globalTxId, localTxId, compensationMethod, payloads); + sender.send(new TxCompensatedEvent(globalTxId, localTxId, parentTxId, compensationMethod)); + } +} diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java index 6cbd8af8f..9c164dda9 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/FailedTransactionInterceptor.java @@ -17,9 +17,6 @@ package io.servicecomb.saga.omega.transaction; -import java.io.PrintWriter; -import java.io.StringWriter; - class FailedTransactionInterceptor { private final MessageSender sender; @@ -28,12 +25,6 @@ class FailedTransactionInterceptor { } void intercept(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) { - sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace(throwable))); - } - - private String stackTrace(Throwable e) { - StringWriter writer = new StringWriter(); - e.printStackTrace(new PrintWriter(writer)); - return writer.toString(); + sender.send(new TxAbortedEvent(globalTxId, localTxId, parentTxId, compensationMethod, throwable)); } } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java index caf2da8bf..d867085e0 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageHandler.java @@ -18,5 +18,5 @@ package io.servicecomb.saga.omega.transaction; public interface MessageHandler { - void onReceive(String globalTxId, String localTxId, String compensationMethod, Object... payloads); + void onReceive(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads); } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxAbortedEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxAbortedEvent.java index 2d04a7dd5..0d7e5bad6 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxAbortedEvent.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxAbortedEvent.java @@ -17,8 +17,17 @@ package io.servicecomb.saga.omega.transaction; -class TxAbortedEvent extends TxEvent { - TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, String stackTrace) { - super(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace); +import java.io.PrintWriter; +import java.io.StringWriter; + +public class TxAbortedEvent extends TxEvent { + public TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) { + super(globalTxId, localTxId, parentTxId, compensationMethod, stackTrace(throwable)); + } + + private static String stackTrace(Throwable e) { + StringWriter writer = new StringWriter(); + e.printStackTrace(new PrintWriter(writer)); + return writer.toString(); } } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxCompensatedEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxCompensatedEvent.java new file mode 100644 index 000000000..8d518d6db --- /dev/null +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxCompensatedEvent.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.servicecomb.saga.omega.transaction; + +public class TxCompensatedEvent extends TxEvent { + public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) { + super(globalTxId, localTxId, parentTxId, compensationMethod); + } +} diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEndedEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEndedEvent.java index 2836948da..6922f2935 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEndedEvent.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEndedEvent.java @@ -17,8 +17,8 @@ package io.servicecomb.saga.omega.transaction; -class TxEndedEvent extends TxEvent { - TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) { +public class TxEndedEvent extends TxEvent { + public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) { super(globalTxId, localTxId, parentTxId, compensationMethod); } } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java index 1616f6934..a11a9ada4 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxEvent.java @@ -17,6 +17,8 @@ package io.servicecomb.saga.omega.transaction; +import java.util.Arrays; + public class TxEvent { private final long timestamp; private final String globalTxId; @@ -61,4 +63,15 @@ public String type() { public String compensationMethod() { return compensationMethod; } + + @Override + public String toString() { + return "TxEvent{" + + "globalTxId='" + globalTxId + '\'' + + ", localTxId='" + localTxId + '\'' + + ", parentTxId='" + parentTxId + '\'' + + ", compensationMethod='" + compensationMethod + '\'' + + ", payloads=" + Arrays.toString(payloads) + + '}'; + } } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java index 7ef3089f7..13534ac55 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/TxStartedEvent.java @@ -17,9 +17,9 @@ package io.servicecomb.saga.omega.transaction; -class TxStartedEvent extends TxEvent { +public class TxStartedEvent extends TxEvent { - TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object[] payloads) { + public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) { super(globalTxId, localTxId, parentTxId, compensationMethod, payloads); } } diff --git a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java new file mode 100644 index 000000000..1070e5936 --- /dev/null +++ b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/CompensationMessageHandlerTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.servicecomb.saga.omega.transaction; + +import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +import io.servicecomb.saga.omega.context.OmegaContext; + +public class CompensationMessageHandlerTest { + + private final List events = new ArrayList<>(); + private final MessageSender sender = events::add; + + private final String globalTxId = uniquify("globalTxId"); + private final String localTxId = uniquify("localTxId"); + private final String parentTxId = uniquify("parentTxId"); + private final String compensationMethod = getClass().getCanonicalName(); + private final String payload = uniquify("blah"); + + private final OmegaContext omegaContext = mock(OmegaContext.class); + private final CompensationMessageHandler handler = new CompensationMessageHandler(sender, omegaContext); + + @Test + public void sendsEventOnCompensationCompleted() throws Exception { + handler.onReceive(globalTxId, localTxId, parentTxId, compensationMethod, payload); + + assertThat(events.size(), is(1)); + + TxEvent event = events.get(0); + assertThat(event.globalTxId(), is(globalTxId)); + assertThat(event.localTxId(), is(localTxId)); + assertThat(event.parentTxId(), is(parentTxId)); + assertThat(event.type(), is("TxCompensatedEvent")); + assertThat(event.compensationMethod(), is(getClass().getCanonicalName())); + assertThat(event.payloads().length, is(0)); + + verify(omegaContext).compensate(globalTxId, localTxId, compensationMethod, payload); + } +} diff --git a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java index 86a671f73..b2e18ab7d 100644 --- a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java +++ b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PostTransactionInterceptorTest.java @@ -46,6 +46,7 @@ public void sendsSerializedMessage() throws Exception { assertThat(event.globalTxId(), is(globalTxId)); assertThat(event.localTxId(), is(localTxId)); assertThat(event.parentTxId(), is(parentTxId)); + assertThat(event.type(), is("TxEndedEvent")); assertThat(event.compensationMethod(), is(getClass().getCanonicalName())); assertThat(event.payloads().length, is(0)); } diff --git a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java index b7b2ec038..5d5d832c0 100644 --- a/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java +++ b/omega/omega-transaction/src/test/java/io/servicecomb/saga/omega/transaction/PreTransactionInterceptorTest.java @@ -49,6 +49,7 @@ public void sendsTxStartedEvent() throws Exception { assertThat(event.globalTxId(), is(globalTxId)); assertThat(event.localTxId(), is(localTxId)); assertThat(event.parentTxId(), is(parentTxId)); + assertThat(event.type(), is("TxStartedEvent")); assertThat(event.compensationMethod(), is(getClass().getCanonicalName())); assertThat(asList(event.payloads()), contains(message)); }