Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions avaje-jex/src/main/java/io/avaje/jex/http/sse/SseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ static ExchangeHandler handler(Consumer<SseClient> consumer) {
*/
void keepAlive();

/**
* Add a callback that will be called either when connection is closed through {@link #close()},
* or when the {@link Emitter} is detected as closed.
*
* @param task task to run
*/
void onClose(Runnable task);

/**
* Attempt to send a comment. If the {@link Emitter} fails to emit (remote client has
* disconnected), the {@link #close()} function will be called instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,24 @@ final class SseClientImpl implements SseClient {
private final JsonService jsonService;
private final Context ctx;
private CompletableFuture<?> blockingFuture;
private Runnable closeCallback = () -> {};

SseClientImpl(Context ctx) {
this.emitter = new Emitter(ctx.exchange().getResponseBody());
jsonService = ctx.jsonService();
this.ctx = ctx;
}

@Override
public void onClose(Runnable task) {
this.closeCallback = task;
}

@Override
public void close() {
if (terminated.getAndSet(true) && blockingFuture != null) {
if (terminated.getAndSet(true)) return;
closeCallback.run();
if (blockingFuture != null) {
blockingFuture.complete(null);
}
}
Expand Down