diff --git a/avaje-jex/src/main/java/io/avaje/jex/http/sse/SseClient.java b/avaje-jex/src/main/java/io/avaje/jex/http/sse/SseClient.java index 6c3c4adc..97f7577d 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/http/sse/SseClient.java +++ b/avaje-jex/src/main/java/io/avaje/jex/http/sse/SseClient.java @@ -41,6 +41,14 @@ static ExchangeHandler handler(Consumer consumer) { */ void keepAlive(); + /** + * Add a callback that will be called either when connection is closed through {@link #close()}, + * or when the {@link Emitter} is detected as closed. + * + * @param task task to run + */ + void onClose(Runnable task); + /** * Attempt to send a comment. If the {@link Emitter} fails to emit (remote client has * disconnected), the {@link #close()} function will be called instead. diff --git a/avaje-jex/src/main/java/io/avaje/jex/http/sse/SseClientImpl.java b/avaje-jex/src/main/java/io/avaje/jex/http/sse/SseClientImpl.java index 4e5911a0..5db9c7a3 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/http/sse/SseClientImpl.java +++ b/avaje-jex/src/main/java/io/avaje/jex/http/sse/SseClientImpl.java @@ -20,6 +20,7 @@ final class SseClientImpl implements SseClient { private final JsonService jsonService; private final Context ctx; private CompletableFuture blockingFuture; + private Runnable closeCallback = () -> {}; SseClientImpl(Context ctx) { this.emitter = new Emitter(ctx.exchange().getResponseBody()); @@ -27,9 +28,16 @@ final class SseClientImpl implements SseClient { this.ctx = ctx; } + @Override + public void onClose(Runnable task) { + this.closeCallback = task; + } + @Override public void close() { - if (terminated.getAndSet(true) && blockingFuture != null) { + if (terminated.getAndSet(true)) return; + closeCallback.run(); + if (blockingFuture != null) { blockingFuture.complete(null); } }