Skip to content

Commit

Permalink
Ensure events are cleared from server memory on servers that are disc…
Browse files Browse the repository at this point in the history
…onnected from the session's client.
  • Loading branch information
kuujo committed Aug 22, 2015
1 parent f251182 commit 5b19f78
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 27 deletions.
Expand Up @@ -59,6 +59,7 @@ public static Builder builder(KeepAliveRequest request) {
}

private long commandSequence;
private long eventSequence;

public KeepAliveRequest(ReferenceManager<KeepAliveRequest> referenceManager) {
super(referenceManager);
Expand All @@ -78,6 +79,15 @@ public long commandSequence() {
return commandSequence;
}

/**
* Returns the event sequence number.
*
* @return The event sequence number.
*/
public long eventSequence() {
return eventSequence;
}

@Override
public void readObject(BufferInput buffer, Serializer serializer) {
super.readObject(buffer, serializer);
Expand Down Expand Up @@ -107,7 +117,7 @@ public boolean equals(Object object) {

@Override
public String toString() {
return String.format("%s[session=%d, commandSequence=%d]", getClass().getSimpleName(), session, commandSequence);
return String.format("%s[session=%d, commandSequence=%d, eventSequence=%d]", getClass().getSimpleName(), session, commandSequence, eventSequence);
}

/**
Expand Down Expand Up @@ -138,6 +148,19 @@ public Builder withCommandSequence(long commandSequence) {
return this;
}

/**
* Sets the event sequence number.
*
* @param eventSequence The command sequence number.
* @return The request builder.
*/
public Builder withEventSequence(long eventSequence) {
if (eventSequence < 0)
throw new IllegalArgumentException("eventSequence cannot be negative");
request.commandSequence = eventSequence;
return this;
}

@Override
public KeepAliveRequest build() {
super.build();
Expand Down
Expand Up @@ -527,6 +527,8 @@ protected CompletableFuture<KeepAliveResponse> keepAlive(KeepAliveRequest reques
try (KeepAliveEntry entry = context.getLog().create(KeepAliveEntry.class)) {
entry.setTerm(context.getTerm());
entry.setSession(request.session());
entry.setCommandSequence(request.commandSequence());
entry.setEventSequence(request.eventSequence());
entry.setTimestamp(timestamp);
index = context.getLog().append(entry);
LOGGER.debug("{} - Appended {}", context.getMember().id(), entry);
Expand Down
Expand Up @@ -546,7 +546,11 @@ CompletableFuture<Void> apply(KeepAliveEntry entry) {
LOGGER.warn("Expired session: " + entry.getSession());
future = expireSession(entry.getSession());
} else {
session.setIndex(entry.getIndex()).setTimestamp(entry.getTimestamp()).clearCommands(entry.getSequence());
session.setIndex(entry.getIndex())
.setTimestamp(entry.getTimestamp())
.clearCommands(entry.getCommandSequence())
.clearEvents(entry.getEventSequence());

future = new CompletableFuture<>();
stateExecutor.tick(Instant.ofEpochMilli(entry.getTimestamp()));
context.execute(() -> future.complete(null));
Expand Down
Expand Up @@ -223,9 +223,6 @@ ServerSession setConnection(Connection connection) {

@Override
public CompletableFuture<Void> publish(Object event) {
if (connection == null)
return Futures.exceptionalFuture(new UnknownSessionException("connection lost"));

long eventSequence = ++eventVersion;
events.put(eventSequence, event);
sendEvent(eventSequence, event);
Expand All @@ -238,7 +235,7 @@ public CompletableFuture<Void> publish(Object event) {
* @param version The version to clear.
* @return The server session.
*/
private ServerSession clearEvents(long version) {
ServerSession clearEvents(long version) {
if (version > eventLowWaterMark) {
for (long i = eventLowWaterMark + 1; i <= version; i++) {
events.remove(i);
Expand Down Expand Up @@ -270,20 +267,22 @@ private ServerSession resendEvents(long sequence) {
* @param event The event to send.
*/
private void sendEvent(long eventSequence, Object event) {
connection.<PublishRequest, PublishResponse>send(PublishRequest.builder()
.withSession(id())
.withEventSequence(eventSequence)
.withMessage(event)
.build()).whenComplete((response, error) -> {
if (isOpen() && error == null) {
if (response.status() == Response.Status.OK) {
clearEvents(response.eventSequence());
} else {
clearEvents(response.eventSequence());
resendEvents(response.eventSequence());
if (connection != null) {
connection.<PublishRequest, PublishResponse>send(PublishRequest.builder()
.withSession(id())
.withEventSequence(eventSequence)
.withMessage(event)
.build()).whenComplete((response, error) -> {
if (isOpen() && error == null) {
if (response.status() == Response.Status.OK) {
clearEvents(response.eventSequence());
} else {
clearEvents(response.eventSequence());
resendEvents(response.eventSequence());
}
}
}
});
});
}
}

/**
Expand Down
Expand Up @@ -29,7 +29,8 @@
*/
@SerializeWith(id=304)
public class KeepAliveEntry extends SessionEntry<KeepAliveEntry> {
private long sequence;
private long commandSequence;
private long eventSequence;

public KeepAliveEntry(ReferenceManager<Entry<?>> referenceManager) {
super(referenceManager);
Expand All @@ -40,8 +41,8 @@ public KeepAliveEntry(ReferenceManager<Entry<?>> referenceManager) {
*
* @return The command sequence number.
*/
public long getSequence() {
return sequence;
public long getCommandSequence() {
return commandSequence;
}

/**
Expand All @@ -50,26 +51,48 @@ public long getSequence() {
* @param commandSequence The command sequence number.
* @return The keep alive entry.
*/
public KeepAliveEntry setSequence(long commandSequence) {
this.sequence = commandSequence;
public KeepAliveEntry setCommandSequence(long commandSequence) {
this.commandSequence = commandSequence;
return this;
}

/**
* Returns the event sequence number.
*
* @return The event sequence number.
*/
public long getEventSequence() {
return eventSequence;
}

/**
* Sets the event sequence number.
*
* @param eventSequence The event sequence number.
* @return The keep alive entry.
*/
public KeepAliveEntry setEventSequence(long eventSequence) {
this.eventSequence = eventSequence;
return this;
}

@Override
public void readObject(BufferInput buffer, Serializer serializer) {
super.readObject(buffer, serializer);
sequence = buffer.readLong();
commandSequence = buffer.readLong();
eventSequence = buffer.readLong();
}

@Override
public void writeObject(BufferOutput buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
buffer.writeLong(sequence);
buffer.writeLong(commandSequence);
buffer.writeLong(eventSequence);
}

@Override
public String toString() {
return String.format("%s[index=%d, term=%d, session=%d, sequence=%d, timestamp=%d]", getClass().getSimpleName(), getIndex(), getTerm(), getSession(), getSequence(), getTimestamp());
return String.format("%s[index=%d, term=%d, session=%d, commandSequence=%d, eventSequence=%d, timestamp=%d]", getClass().getSimpleName(), getIndex(), getTerm(), getSession(), getCommandSequence(), getEventSequence(), getTimestamp());
}

}

0 comments on commit 5b19f78

Please sign in to comment.