Skip to content

Commit

Permalink
✨ websocket frame aggregator(#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
fzdwx committed Jun 8, 2022
1 parent f38cc90 commit d514193
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package sky.starter.domain;

import io.github.fzdwx.lambada.http.Router;
import io.github.fzdwx.lambada.lang.StringPool;
import io.github.fzdwx.lambada.lang.StrPool;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestMethod;
Expand Down Expand Up @@ -215,8 +215,8 @@ private static SortedSet<PathPattern> parse(PathPatternParser parser, String...
}
SortedSet<PathPattern> result = new TreeSet<>();
for (String path : patterns) {
if (StringUtils.hasText(path) && !path.startsWith(StringPool.SLASH)) {
path = StringPool.SLASH + path;
if (StringUtils.hasText(path) && !path.startsWith(StrPool.SLASH)) {
path = StrPool.SLASH + path;
}
result.add(parser.parse(path));
}
Expand Down
19 changes: 18 additions & 1 deletion sky-infrastructure/src/main/java/core/http/ext/WebSocket.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package core.http.ext;

import core.http.inter.WebSocketImpl;
import core.socket.Listener;
import core.socket.Socket;
import core.http.inter.WebSocketImpl;
import io.github.fzdwx.lambada.anno.Nullable;
import io.github.fzdwx.lambada.fun.Hooks;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
Expand All @@ -11,6 +12,7 @@
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketScheme;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.timeout.IdleStateHandler;
Expand Down Expand Up @@ -132,6 +134,21 @@ default WebSocket enableCompression() {
*/
WebSocketServerCompressionHandler compressionHandler();

/**
* customer webSocketFrameAggregator
*
* @param webSocketFrameAggregator bodyAggregator
* @return {@link WebSocket }
* @apiNote maxContentLength is {@link Integer#MAX_VALUE}
*/
WebSocket webSocketFrameAggregator(@Nullable WebSocketFrameAggregator webSocketFrameAggregator);

/**
* get webSocketFrameAggregator
*/
@Nullable
WebSocketFrameAggregator webSocketFrameAggregator();

/**
* @since 0.07
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.github.fzdwx.lambada.Lang;
import io.github.fzdwx.lambada.http.ContentType;
import io.github.fzdwx.lambada.http.HttpMethod;
import io.github.fzdwx.lambada.lang.StringPool;
import io.github.fzdwx.lambada.lang.StrPool;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -91,11 +91,11 @@ public void handle(final HttpServerRequest request, final HttpServerResponse res
}

if (file.isDirectory()) {
if (uri.endsWith(StringPool.SLASH)) {
if (uri.endsWith(StrPool.SLASH)) {
final String html = getFileListing(file, uri);
response.html(html);
} else {
response.redirect(uri + StringPool.SLASH_CHAR);
response.redirect(uri + StrPool.SLASH_CHAR);
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import core.socket.Listener;
import core.socket.Socket;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
Expand Down Expand Up @@ -56,16 +55,16 @@ protected void channelRead0(final ChannelHandlerContext ctx, final WebSocketFram
return;
}

// todo dispatch ping pong
if (msg instanceof PingWebSocketFrame) {
listener.onPing(msg.content());
}
if (msg instanceof PongWebSocketFrame) {
listener.onPong(msg.content());
}

// TODO 是否需要让服务端手动处理 close frame
if (msg instanceof CloseWebSocketFrame) {
ctx.writeAndFlush(msg.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
ctx.channel().close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

import core.http.Headers;
import core.http.ext.HttpServerRequest;
import core.serializer.JsonSerializer;
import core.socket.Socket;
import core.http.ext.WebSocket;
import core.http.handler.BodyHandler;
import core.http.handler.WebSocketHandler;
import core.serializer.JsonSerializer;
import core.socket.Socket;
import io.github.fzdwx.lambada.Collections;
import io.github.fzdwx.lambada.Lang;
import io.github.fzdwx.lambada.fun.Hooks;
Expand Down Expand Up @@ -204,6 +205,9 @@ public void upgradeToWebSocket(Hooks<WebSocket> h) {

// handshake
webSocket.beforeHandshake(session);
if (!webSocket.channel().isActive() || !webSocket.channel().isOpen()) {
return;
}

//region parse subProtocol
if (Lang.isNotBlank(webSocket.subProtocols())) {
Expand All @@ -228,6 +232,14 @@ public void upgradeToWebSocket(Hooks<WebSocket> h) {
pipeline.addLast(webSocket.compressionHandler());
}

// websocket frame body aggregator
if (webSocket.webSocketFrameAggregator() != null) {
pipeline.addLast(webSocket.webSocketFrameAggregator());
}

// remove Http request body aggregator
pipeline.remove(BodyHandler.class);

// add handler
pipeline.addLast(new WebSocketHandler(webSocket, session));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package core.http.inter;

import core.http.ext.HttpServerRequest;
import core.socket.Socket;
import core.http.ext.WebSocket;
import core.socket.Socket;
import io.github.fzdwx.lambada.fun.Hooks;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
Expand All @@ -13,6 +13,7 @@
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketScheme;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.timeout.IdleStateHandler;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class WebSocketImpl implements WebSocket {
private WebSocketServerCompressionHandler compressionHandler;
private IdleStateHandler idleStateHandler;
private WebSocketScheme scheme;
private WebSocketFrameAggregator webSocketFrameAggregator = new WebSocketFrameAggregator(Integer.MAX_VALUE);

public WebSocketImpl(Socket socket, final HttpServerRequest httpServerRequest) {
this.socket = socket;
Expand Down Expand Up @@ -136,6 +138,17 @@ public WebSocketServerCompressionHandler compressionHandler() {
return this.compressionHandler;
}

@Override
public WebSocket webSocketFrameAggregator(final WebSocketFrameAggregator webSocketFrameAggregator) {
this.webSocketFrameAggregator = webSocketFrameAggregator;
return this;
}

@Override
public WebSocketFrameAggregator webSocketFrameAggregator() {
return this.webSocketFrameAggregator;
}

@Override
public WebSocket send(final String text, final Hooks<ChannelFuture> h) {
h.call(send(text));
Expand Down
9 changes: 8 additions & 1 deletion sky-infrastructure/src/test/java/http/HttpServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,15 @@ void test22() {
void test_websocket() {
HttpServer.create()
.requestHandler(((request, response) -> {

request.upgradeToWebSocket(ws -> {

ws.mountBeforeHandshake(h -> {

ws.send("hello");
// ws.reject();
});

ws.mountOpen(h -> {
ws.send("tttttt");
});
Expand All @@ -54,7 +61,7 @@ void test_websocket() {
ws.send("hello").addListener(f -> {
System.out.println(f.cause());
});
ws.reject(WebSocketCloseStatus.ENDPOINT_UNAVAILABLE);
// ws.reject(WebSocketCloseStatus.ENDPOINT_UNAVAILABLE);
});

});
Expand Down

0 comments on commit d514193

Please sign in to comment.