Skip to content

Commit

Permalink
The WebSocketBase implementation reuses the frame handler to aggregat…
Browse files Browse the repository at this point in the history
…e binary/text frames, this overrides a frame handler that could be set to handle frames in addition of processing them.

The implementation has been modified so that the frame aggregator for binary/text frames has its own handler, the frame aggregator and the frame handler are both notified with incoming frames.

fixes #4794
  • Loading branch information
vietj committed Aug 7, 2023
1 parent 24b7b7f commit 9770518
Showing 1 changed file with 34 additions and 6 deletions.
40 changes: 34 additions & 6 deletions src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public abstract class WebSocketImplBase<S extends WebSocketBase> implements WebS
private Object metric;
private Handler<Buffer> handler;
private Handler<WebSocketFrameInternal> frameHandler;
private FrameAggregator frameAggregator;
private Handler<Buffer> pongHandler;
private Handler<Void> drainHandler;
private Handler<Throwable> exceptionHandler;
Expand Down Expand Up @@ -525,9 +526,14 @@ protected void handleClose(boolean graceful) {
}

private void receiveFrame(WebSocketFrameInternal frame) {
Handler<WebSocketFrameInternal> frameAggregator;
Handler<WebSocketFrameInternal> frameHandler;
synchronized (conn) {
frameHandler = this.frameHandler;
frameAggregator = this.frameAggregator;
}
if (frameAggregator != null) {
context.dispatch(frame, frameAggregator);
}
if (frameHandler != null) {
context.dispatch(frame, frameHandler);
Expand Down Expand Up @@ -670,10 +676,21 @@ public S frameHandler(Handler<WebSocketFrame> handler) {
public WebSocketBase textMessageHandler(Handler<String> handler) {
synchronized (conn) {
checkClosed();
if (frameHandler == null || frameHandler.getClass() != FrameAggregator.class) {
frameHandler = new FrameAggregator();
if (handler != null) {
if (frameAggregator == null) {
frameAggregator = new FrameAggregator();
}
frameAggregator.textMessageHandler = handler;
} else {
if (frameAggregator != null) {
if (frameAggregator.binaryMessageHandler == null) {
frameAggregator = null;
} else {
frameAggregator.textMessageHandler = null;
frameAggregator.textMessageBuffer = null;
}
}
}
((FrameAggregator) frameHandler).textMessageHandler = handler;
return this;
}
}
Expand All @@ -682,10 +699,21 @@ public WebSocketBase textMessageHandler(Handler<String> handler) {
public S binaryMessageHandler(Handler<Buffer> handler) {
synchronized (conn) {
checkClosed();
if (frameHandler == null || frameHandler.getClass() != FrameAggregator.class) {
frameHandler = new FrameAggregator();
if (handler != null) {
if (frameAggregator == null) {
frameAggregator = new FrameAggregator();
}
frameAggregator.binaryMessageHandler = handler;
} else {
if (frameAggregator != null) {
if (frameAggregator.textMessageHandler == null) {
frameAggregator = null;
} else {
frameAggregator.binaryMessageHandler = null;
frameAggregator.binaryMessageBuffer = null;
}
}
}
((FrameAggregator) frameHandler).binaryMessageHandler = handler;
return (S) this;
}
}
Expand Down

0 comments on commit 9770518

Please sign in to comment.