Skip to content
This repository has been archived by the owner on Dec 19, 2017. It is now read-only.

Handle inconsistencies in command response event indexes #293

Merged
merged 3 commits into from Mar 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -58,9 +58,9 @@ final class ClientSequencer {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientSequencer.class);

private final ClientSessionState state;
private long requestSequence;
private long responseSequence;
private long eventIndex;
long requestSequence;
long responseSequence;
long eventIndex;
private final Queue<EventCallback> eventCallbacks = new ArrayDeque<>();
private final Map<Long, ResponseCallback> responseCallbacks = new HashMap<>();

Expand Down Expand Up @@ -91,7 +91,7 @@ public long nextRequest() {
*/
public void sequenceEvent(PublishRequest request, Runnable callback) {
if (requestSequence == responseSequence) {
LOGGER.debug("{} - Completing event {}", state.getSessionId(), request);
LOGGER.debug("{} - Completing {}", state.getSessionId(), request);
callback.run();
eventIndex = request.eventIndex();
} else {
Expand Down Expand Up @@ -150,7 +150,7 @@ private void completeResponses() {
if (requestSequence == responseSequence) {
EventCallback eventCallback = eventCallbacks.poll();
while (eventCallback != null) {
LOGGER.debug("{} - Completing event {}", state.getSessionId(), eventCallback.request);
LOGGER.debug("{} - Completing {}", state.getSessionId(), eventCallback.request);
eventCallback.run();
eventIndex = eventCallback.request.eventIndex();
eventCallback = eventCallbacks.poll();
Expand All @@ -172,23 +172,39 @@ private boolean completeResponse(OperationResponse response, Runnable callback)

// If the response's event index is greater than the current event index, that indicates that events that were
// published prior to the response have not yet been completed. Attempt to complete pending events.
if (response.eventIndex() > eventIndex) {
long responseEventIndex = response.eventIndex();
if (responseEventIndex > eventIndex) {
// For each pending event with an eventIndex less than or equal to the response eventIndex, complete the event.
// This is safe since we know that sequenced responses should see sequential order of events.
EventCallback eventCallback = eventCallbacks.peek();
while (eventCallback != null && eventCallback.request.eventIndex() <= response.eventIndex()) {
while (eventCallback != null && eventCallback.request.eventIndex() <= responseEventIndex) {
eventCallbacks.remove();
LOGGER.debug("{} - Completing event {}", state.getSessionId(), eventCallback.request);
LOGGER.debug("{} - Completing {}", state.getSessionId(), eventCallback.request);
eventCallback.run();
eventIndex = eventCallback.request.eventIndex();
eventCallback = eventCallbacks.peek();
}

// If the response event index is still greater than the last sequenced event index, check
// enqueued events to determine whether any events can be skipped. This is necessary to
// ensure that a response with a missing event can still trigger prior events.
if (responseEventIndex > eventIndex) {
for (EventCallback event : eventCallbacks) {
// If the event's previous index is consistent with the current event index and the event
// index is greater than the response event index, set the response event index to the
// event's previous index.
if (event.request.previousIndex() <= eventIndex && event.request.eventIndex() >= response.eventIndex()) {
responseEventIndex = event.request.previousIndex();
break;
}
}
}
}

// If after completing pending events the eventIndex is greater than or equal to the response's eventIndex, complete the response.
// Note that the event protocol initializes the eventIndex to the session ID.
if (response.eventIndex() <= eventIndex || (eventIndex == 0 && response.eventIndex() == state.getSessionId())) {
LOGGER.debug("{} - Completing response {}", state.getSessionId(), response);
if (responseEventIndex <= eventIndex || (eventIndex == 0 && responseEventIndex == state.getSessionId())) {
LOGGER.debug("{} - Completing {}", state.getSessionId(), response);
callback.run();
return true;
} else {
Expand Down
Expand Up @@ -193,4 +193,86 @@ public void testSequenceResponses() throws Throwable {
assertTrue(run.get());
}

/**
* Tests sequencing responses with a missing PublishRequest.
*/
public void testSequenceMissingEvent() throws Throwable {
ClientSessionState state = new ClientSessionState(UUID.randomUUID().toString());
state.setSessionId(1)
.setCommandRequest(2)
.setResponseIndex(15)
.setEventIndex(5);

AtomicInteger run = new AtomicInteger();

ClientSequencer sequencer = new ClientSequencer(state);
sequencer.requestSequence = 2;
sequencer.responseSequence = 1;
sequencer.eventIndex = 5;

CommandResponse commandResponse = CommandResponse.builder()
.withStatus(Response.Status.OK)
.withIndex(20)
.withEventIndex(10)
.build();
sequencer.sequenceResponse(2, commandResponse, () -> assertEquals(run.getAndIncrement(), 0));

PublishRequest publishRequest = PublishRequest.builder()
.withSession(1)
.withEventIndex(25)
.withPreviousIndex(5)
.build();
sequencer.sequenceEvent(publishRequest, () -> assertEquals(run.getAndIncrement(), 1));

assertEquals(run.get(), 2);
}

/**
* Tests sequencing multiple responses that indicate missing events.
*/
public void testSequenceMultipleMissingEvents() throws Throwable {
ClientSessionState state = new ClientSessionState(UUID.randomUUID().toString());
state.setSessionId(1)
.setCommandRequest(2)
.setResponseIndex(15)
.setEventIndex(5);

AtomicInteger run = new AtomicInteger();

ClientSequencer sequencer = new ClientSequencer(state);
sequencer.requestSequence = 3;
sequencer.responseSequence = 1;
sequencer.eventIndex = 5;

CommandResponse commandResponse2 = CommandResponse.builder()
.withStatus(Response.Status.OK)
.withIndex(20)
.withEventIndex(10)
.build();
sequencer.sequenceResponse(3, commandResponse2, () -> assertEquals(run.getAndIncrement(), 1));

CommandResponse commandResponse1 = CommandResponse.builder()
.withStatus(Response.Status.OK)
.withIndex(18)
.withEventIndex(8)
.build();
sequencer.sequenceResponse(2, commandResponse1, () -> assertEquals(run.getAndIncrement(), 0));

PublishRequest publishRequest1 = PublishRequest.builder()
.withSession(1)
.withEventIndex(25)
.withPreviousIndex(5)
.build();
sequencer.sequenceEvent(publishRequest1, () -> assertEquals(run.getAndIncrement(), 2));

PublishRequest publishRequest2 = PublishRequest.builder()
.withSession(1)
.withEventIndex(28)
.withPreviousIndex(8)
.build();
sequencer.sequenceEvent(publishRequest2, () -> assertEquals(run.getAndIncrement(), 3));

assertEquals(run.get(), 4);
}

}