Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration test facility to ensure and assert processed events #1426

Open
hatzlj opened this issue Apr 30, 2020 · 2 comments
Open

Integration test facility to ensure and assert processed events #1426

hatzlj opened this issue Apr 30, 2020 · 2 comments
Labels
Status: Under Discussion Use to signal that the issue in question is being discussed. Type: Feature Use to signal an issue is completely new to the project.

Comments

@hatzlj
Copy link
Contributor

hatzlj commented Apr 30, 2020

When integration testing an axon application that involves tracking event processors one quickly comes to the point when you need
a) to make sure that all events are processed by all event processors (so that the application is in a final state we can assert on), and
b) to determine, which events exactly have been published

To facilitate this, it would be nice to have some component in the axon-test module that allows to
a) "wait" for all TEPs to catch up to a certain state before continuing, and
b) return all events that have been published within a certain period of time (e.g. between two certain "marker"-events) to assert them in an integration test

We're currently doing this by implementing a TestEventRecorder that can be used from within an integration test. On a high level it works the following way:

  1. Set the TestEventRecorder to "record" state at the beginning of an integration test case and add a testCorrelationId to identify which events belong to the current test case.
  2. Execute the action that triggers producing the desired events in the integration test.
  3. Let the TestEventRecorder record all events having this testCorrelationId until "stop" is called for a certain testCorrelationId.
  4. "Catch-up" all other event processors on "stopping" the TestEventRecorder by sending a RecordingCatchupEvent directly to the event-bus and determine the tracking-token related to this event when it is processed in the TestEventRecorder - then we can use the axon-config to get all other tracking event processors and simply block the stop method to wait until all production event processors have processsed up to this TrackingToken (see example code below)
  5. After stop returned, we can return all events that have been recorded for a certain testCorrelationId and use them to assert the expected events in the integration test

Since publishing and processing of events is happening at different times, we additionally implemented a TestEventCorrelationIterceptor (which is a regular axon MessageDispatchInterceptor) that adds a testCorrelationId to the event metadata when publishing an event. Thus we make sure that the TestEventRecorder separates the recorded events per correlationId, even if the processing of in the TestEventRecorder happens after we already signaled the recorder to stop recording from within the integration test.

Example for catching up other processors:

private void catchUpTrackingProcessors(TrackingEventProcessor recordingProcessor) {
    final TrackingToken recordingToken =
        recordingProcessor
            .processingStatus()
            .values()
            .stream()
            .map(EventTrackerStatus::getTrackingToken)
            .filter(Objects::nonNull)
            .reduce((tokenA, tokenB) -> tokenA.covers(tokenB) ? tokenA : tokenB)
            .orElse(null);


    if (recordingToken == null) {
        log.debug("trackingToken for event recorder not found");
        throw new RuntimeException("failed to catch up");
    }


    log.debug("processors need to catch up to {}", recordingToken);


    // wait for all other processors to catch up
    final Collection<TrackingEventProcessor> otherProcessors = getOtherTrackingProcessors();
    log.debug("iterating {} processors to catch up", otherProcessors.size());
    for (TrackingEventProcessor processor : otherProcessors) {
        if (!processor.isRunning()) {
            log.debug("skipping processor {} since it is already shutting down", processor.toString());
            continue;
        }


        final Set<Integer> currentSegments = processor.processingStatus().keySet();
        for (Integer segmentId : currentSegments) {
            EventTrackerStatus currentStatus = processor.processingStatus().get(segmentId);
            if (currentStatus == null) {
                log.debug("segment {} of processor {} vanished meanwhile.", segmentId, processor.toString());
                continue;
            }


            TrackingToken currentTrackingToken = currentStatus.getTrackingToken();
            log.debug("segment {} of processor {} {} to token {}. recording token is {}",
                segmentId, processor.toString(), currentStatus.isCaughtUp() ? "is caught up" : "is not caught" +
                    " up"
                , currentTrackingToken, recordingToken);
            if (currentTrackingToken != null && currentTrackingToken.equals(recordingToken) && currentStatus.isCaughtUp()) {
                // continue with next segment
                continue;
            }


            // run an exponential backoff of max. 5 secs to wait for finishing
            final ExponentialBackOff exponentialBackOff = new ExponentialBackOff(INITIAL_BACKOFF_MS, 1.5);
            exponentialBackOff.setMaxElapsedTime(MAX_BACKOFF_MS);
            final BackOffExecution backOff = exponentialBackOff.start();
            do {
                final long currentInterval = backOff.nextBackOff();
                if (currentInterval == BackOffExecution.STOP) {
                    log.debug("backoff expired waiting for segment {} of processor {} to catch up to {}.",
                        segmentId, processor.toString(), recordingToken);
                    throw new RuntimeException("failed to catch up");
                }
                log.debug("segment {} of processor {} needs to catch up to {} from {}. Waiting for {}ms ...",
                    segmentId, processor.toString(), recordingToken, currentTrackingToken, currentInterval);


                synchronized (processor) {
                    try {
                        processor.wait(currentInterval);
                        // do live refresh of current Status and current Tracking token
                        currentStatus = processor.processingStatus().get(segmentId);
                        currentTrackingToken =
                            Optional.ofNullable(currentStatus)
                                .map(EventTrackerStatus::getTrackingToken)
                                .orElse(null);
                    } catch (InterruptedException e) {
                        // re-set the interrupted state on the current thread, since we don't expect to be
                        // waked up
                        Thread.currentThread().interrupt();
                    }
                }
            } while (currentTrackingToken == null ||
                (!currentTrackingToken.equals(recordingToken)) && !currentStatus.isCaughtUp());
            log.debug("segment {} of processor {} is caught up to the catchup token {}.",
                segmentId, processor.toString(), recordingToken);
            // continue with next segment
        }
    }
}

Example for the TestEventCorrelationInterceptor

/**
 * simple message interceptor that serializes the current {@link #currentTestCorrelationId} to the
 * {@link Message#withMetaData(Map)} prior to message dispatching
 */
public class TestEventCorrelationInterceptor implements MessageDispatchInterceptor<Message<?>> {


    private static final Logger log = LoggerFactory.getLogger(TestEventCorrelationInterceptor.class);
    public static final String EVENT_RECORDER_METADATA_KEY = "ai.ikosa.portal.commander.util" +
        ".TestEventsRecorderInterceptor";
    public static final String UNSET_CORRELATION_ID = "unset";


    public String currentTestCorrelationId = UNSET_CORRELATION_ID;
    public boolean unsetOnNextCatchup = false;


    public void setCurrentTestCorrelationId(String currentTestCorrelationId) {
        this.currentTestCorrelationId = currentTestCorrelationId;
        unsetOnNextCatchup = false;
    }


    public void unsetOnNextCatchup() {
        unsetOnNextCatchup = true;
    }


    public TestEventCorrelationInterceptor() {
        super();
    }


    /**
     * Handles a list of messages and serializes the {@link #currentTestCorrelationId} to the
     * Messages metadata before dispatching them, to correlate recorded messages together
     * see {@link MessageDispatchInterceptor#handle(List)}
     *
     * @param messages
     * @return
     */
    @Override
    public BiFunction<Integer, Message<?>, Message<?>> handle(List<? extends Message<?>> messages) {
        return (index, message) -> {
            Map<String, String> metaData = Map.of(EVENT_RECORDER_METADATA_KEY, this.currentTestCorrelationId);
            if (message instanceof GenericEventMessage && message.getPayloadType().equals(TestEventRecorder.RecordingCatchupEvent.class) && unsetOnNextCatchup) {
                this.currentTestCorrelationId = UNSET_CORRELATION_ID;
                unsetOnNextCatchup = false;
            }
            return message.andMetaData(metaData);
        };
    }
}

There has also been a recent discussion on this topic in the Axon Google Group: https://groups.google.com/d/msg/axonframework/AIIKVlYFEcs/Oi2UEsvQBgAJ

@smcvb
Copy link
Member

smcvb commented Apr 30, 2020

Thanks for filing this issue with us @hatzlj.
At first, I thought this was a duplicate of the the Projection Testing issue #729.
It's however going one step further I'd say.

When time allows it, we will discuss how to proceed further with thus.
We will update this ticket accordingly as soon as we have made decisions around implementation and prioritization.

@smcvb smcvb added Status: Under Discussion Use to signal that the issue in question is being discussed. Type: Feature Use to signal an issue is completely new to the project. labels Apr 30, 2020
@smcvb
Copy link
Member

smcvb commented Aug 7, 2024

As there are ideas to expand and potentially reimagine the test fixtures within Axon Framework for 5.0.0, there is a chance this issue will be picked up in that flow.
However, we are not yet ready to add this issue to milestone 5.0.0, as there is a lot of work to be processed before we can begin on additional features like this.

Nonetheless, an update was due, given that the last time I replied was over 4 years ago.
With that said, we'll keep everybody posted on the progress of this issue, right here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Status: Under Discussion Use to signal that the issue in question is being discussed. Type: Feature Use to signal an issue is completely new to the project.
Projects
None yet
Development

No branches or pull requests

2 participants