Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* ✨ New API: FormatterUtil.formatCurrency(BigDecimal amount, RoundingMode roundingMode)
* WebResourceController.requestWebResource with @Daemon to serve web resources with enhanced reliability.
* Refactoring: predefined URI constants inside BootURI
* Performance improvement: Agent_PDFBox - Serial graphics processing converted to parallel processing
* New API: LargeFileStreamHandler for streaming large file response with low memory usage, and support for WebSocket for partial content delivery.

## Version 2.6.9 (2026-04-24)

Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@
<jwt.version>0.13.0</jwt.version>

<!-- NIO Netty -->
<netty.version>4.2.13.Final</netty.version>
<netty.version>4.2.14.Final</netty.version>
<netty-tcnative.version>2.0.77.Final</netty-tcnative.version>
<!-- gRPC and protobuf -->
<grpc.version>1.81.0</grpc.version>
<guava.version>33.6.0-jre</guava.version>
<protobuf.version>4.34.1</protobuf.version>
<protobuf.version>4.35.0</protobuf.version>
<!-- Web JAX-RS -->
<swagger.core.version>2.2.49</swagger.core.version>
<!--<elastic-apm.version>1.36.0</elastic-apm.version>-->
Expand All @@ -217,7 +217,7 @@
<guice.version>7.0.0</guice.version>

<!-- JPA -->
<hibernate.version>7.3.4.Final</hibernate.version>
<hibernate.version>7.3.5.Final</hibernate.version>
<hikari-cp.version>7.0.2</hikari-cp.version>

<!-- Cache -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private static int applyOffset(int code, boolean wihtOffset) {
int AUTH_NO_PERMISSION = getErrorCode(AUTH_BASE + 6);
int AUTH_FORBIDDEN_IP = getErrorCode(AUTH_BASE + 7);
int AUTH_FORBIDDEN_JWT = getErrorCode(AUTH_BASE + 8);
int AUTH_FORBIDDEN_REQUST = getErrorCode(AUTH_BASE + 9);
int AUTH_FORBIDDEN_REQUEST = getErrorCode(AUTH_BASE + 9);

//Integration
int ACCESS_BASE = getErrorCode(50, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,12 @@ public void configure() {
} catch (RuntimeException ex) {
}
Set<String> namedWebsocket = channelHandlerNames.get(Service.ChannelHandlerType.Websocket);
for (String s : namedWebsocket) {
if (s == null || !s.startsWith(WebSocketAuthHandler_OTT.WS_PATH_PREFIX + "/")) {
String errorMessage = "@Service(binding = ChannelHandler.class, named = \"" + s + "\", type = Service.ChannelHandlerType.Websocket): named field value must start with " + WebSocketAuthHandler_OTT.WS_PATH_PREFIX + "/, but found: " + s;
throw new IllegalArgumentException(errorMessage);
if (namedWebsocket != null) {
for (String s : namedWebsocket) {
if (s == null || !s.startsWith(WebSocketAuthHandler_OTT.WS_PATH_PREFIX)) {
String errorMessage = "@Service(binding = ChannelHandler.class, named = \"" + s + "\", type = Service.ChannelHandlerType.Websocket): named field value must start with " + WebSocketAuthHandler_OTT.WS_PATH_PREFIX + ", but found: " + s;
throw new IllegalArgumentException(errorMessage);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,7 @@ protected long precheck(ChannelHandlerContext ctx, HttpRequest req) {

caller = authenticate(httpHeaders, context);
if (caller == null) {
Err err = new Err(BootErrorCode.AUTH_NO_PERMISSION, null, "Unauthorized Caller", null);
context.error(err).status(HttpResponseStatus.UNAUTHORIZED);
context.error(Err.UNAUTHORIZED_401).status(HttpResponseStatus.UNAUTHORIZED);
NioHttpUtil.sendResponse(ctx, true, context, null, null);
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected void initChannelPipeline(ChannelPipeline channelPipeline, NioConfig ni
// 关键点:我们将路径设置为 null。设置为 null 意味着它不会主动去拦截并匹配固定 URL,
// 而是只要看到带有符合标准的 WebSocket Upgrade 请求头,它就会自动在原地执行握手升级!
// 这样无论我们前面把 URI 改成 /ws/chat 还是 /ws/game,它都能兼容升级。
String webSocketURI = WebSocketAuthHandler_OTT.WS_PATH_PREFIX;
String webSocketURI = WebSocketAuthHandler_OTT.WS_PATH;
channelPipeline.addLast(WebSocketAuthHandler_OTT.CHANNEL_CHANNEL_NAME_NEXT, new WebSocketServerProtocolHandler(webSocketURI, null, allowExtensions, maxFrameSize, allowMaskMismatch, checkStartsWith, dropPongFrames, handshakeTimeoutMillis));

// 3. 注意:这里【不要】像之前一样 addLast(new BusinessHandler) 了。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ public void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest
} else {
String error = GeoIpUtil.callerAddressFilter(context.remoteIP(), nioCfg.getCallerAddressFilterWhitelist(), nioCfg.getCallerAddressFilterBlacklist(), nioCfg.getCallerAddressFilterOption());
if (error != null) {
Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_IP, null, "Forbidden caller IP", null, "Forbidden caller IP: " + error);
Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_IP, null, "Blocked caller IP", null, "Blocked caller IP: " + error);
context.error(err).status(HttpResponseStatus.FORBIDDEN);
} else {
String request = httpMethod + httpRequestUri;
error = SecurityUtil.whitelistbalcklistilter("request", request, nioCfg.getRequestFilterWhitelist(), nioCfg.getRequestFilterBlacklist());
if (error != null) {
Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_REQUST, null, "Forbidden caller request", null, "Forbidden caller request: " + error);
Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_REQUEST, null, "Blocked URL", null, "Blocked URL: " + error);
context.error(err).status(HttpResponseStatus.FORBIDDEN);
} else {
processorSettings = service(ctx, requestHeaders, httpMethod, httpRequestUri, parameters, httpPostRequestBody, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.summerboot.jexpress.boot.BootErrorCode;
import org.summerboot.jexpress.util.BeanUtil;

/**
Expand All @@ -46,6 +47,8 @@ public class Err extends AdditionalJsonFields {
@JsonIgnore
protected Object internalInfo;

public static final Err UNAUTHORIZED_401 = new Err(BootErrorCode.AUTH_LOGIN_FAILED, null, "Authentication Required - Unknown caller", null);

public Err() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tika.Tika;
import org.apache.tika.mime.MimeTypes;
import org.summerboot.jexpress.nio.server.NioConfig;
import org.summerboot.jexpress.nio.server.NioHttpUtil;
import org.summerboot.jexpress.security.auth.Caller;
import org.summerboot.jexpress.util.FileUtil;

/**
* usage example:
Expand Down Expand Up @@ -65,10 +67,13 @@ abstract public class BootWebSocketHandler extends SimpleChannelInboundHandler<W

protected Logger log = LogManager.getLogger(this.getClass());
protected static final TextWebSocketFrame MSG_AUTH_FAILED = new TextWebSocketFrame("401 Unauthorized");
protected static final AttributeKey KEY_CALLER = AttributeKey.valueOf("caller");

protected final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


protected static final Tika TIKA = new Tika();
protected static final MimeTypes REGISTRY = MimeTypes.getDefaultMimeTypes();

/*@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
Expand All @@ -86,6 +91,42 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
}
}*/


@Override
public void handlerAdded(ChannelHandlerContext ctx) {
Runnable asyncTask = () -> {
Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get();
if (caller == null) {
clients.remove(ctx.channel());
ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate());
ctx.close();
log.warn("OTT auth failed - " + ctx.channel().remoteAddress() + ": " + ctx);
ctx.close();
return;
}

clients.add(ctx.channel());
log.trace(() -> "handlerAdded: " + ctx.channel().remoteAddress());

String message = onCallerConnected(ctx, caller);
if (message != null) {
sendToAllChannels(message, true);
}
};
NioConfig.cfg.getBizExecutor().execute(asyncTask);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.trace(() -> "channelActive: " + ctx.channel().remoteAddress());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
clients.remove(ctx.channel());
log.trace(() -> "handlerRemoved: " + ctx.channel().remoteAddress());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
clients.remove(ctx.channel());
Expand All @@ -104,56 +145,43 @@ protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throw
}
}

protected void onTextWebSocketFrame(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
String txt = msg.text();
processMessage(ctx, txt, null);
}

protected void onBinaryWebSocketFrame(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
ByteBuf bb = msg.content();
byte[] data = ByteBufUtil.getBytes(bb);
Runnable asyncTask = () -> {
Caller caller = (Caller) ctx.channel().attr(KEY_CALLER).get();
if (caller == null) {
clients.remove(ctx.channel());
ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate());
ctx.close();
log.warn("OTT auth failed " + ctx.channel().remoteAddress());
return;
}
String responseText = onMessage(ctx, caller, data);
if (responseText != null) {
sendToChannel(ctx, responseText);
}
};
NioConfig.cfg.getBizExecutor().execute(asyncTask);
processMessage(ctx, null, data);
}

protected void onTextWebSocketFrame(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
String txt = msg.text();
protected void onContinuationWebSocketFrame(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {

}

protected void processMessage(ChannelHandlerContext ctx, String text, byte[] data) {
Runnable asyncTask = () -> {
Caller caller = (Caller) ctx.channel().attr(KEY_CALLER).get();
if (caller == null) {
/*caller = auth(ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get());//use the first message as token to auth
if (caller == null) {
clients.remove(ctx.channel());
ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate());
ctx.close();
log.warn("OTT auth failed " + ctx.channel().remoteAddress() + ": " + txt);
return;
Caller caller = (Caller) ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get();
if (text != null) {
String responseText = onMessage(ctx, caller, text);
if (responseText != null) {
sendToAllChannels(responseText, true);
}
} else if (data != null) {
String[] mimeType = FileUtil.getMIMEShortExtension(data);
StringBuilder sb = new StringBuilder();
byte[] processedData = onMessage(ctx, caller, data, mimeType[0], mimeType[1], mimeType[2], sb);
if (processedData != null) {
sendToAllChannels(processedData, true);
}
if (!sb.isEmpty()) {
sendToAllChannels(sb.toString(), true);
}
ctx.channel().attr(KEY_CALLER).set(caller);
String message = onCallerConnected(ctx, caller);
if (message != null) {
sendToAllChannels(message, true);
}*/
}

String responseText = onMessage(ctx, caller, txt);
if (responseText != null) {
//sendToChannel(ctx, responseText);
sendToAllChannels(responseText, true);
}
};
NioConfig.cfg.getBizExecutor().execute(asyncTask);
}

protected void onContinuationWebSocketFrame(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {

}

Expand All @@ -171,7 +199,7 @@ protected Caller auth(Caller caller) {
*/
abstract protected String onMessage(ChannelHandlerContext ctx, Caller caller, String txt);

abstract protected String onMessage(ChannelHandlerContext ctx, Caller caller, byte[] data);
abstract protected byte[] onMessage(ChannelHandlerContext ctx, Caller caller, byte[] data, String mimeType, String fileType, String fileExtension, StringBuilder builder);

public void sendToChannel(ChannelHandlerContext ctx, String message) {
ctx.writeAndFlush(new TextWebSocketFrame(message));
Expand All @@ -194,51 +222,16 @@ public void sendToAllChannels(byte[] data, boolean auth) {
public void sendToAllChannels(WebSocketFrame message, boolean auth) {
if (auth) {
clients.stream()
.filter(channel -> channel.attr(KEY_CALLER).get() != null)
/*.filter(channel -> {
Caller caller = channel.attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get();
return caller != null;
})*/
.filter(channel -> channel.attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get() != null)
.forEach(channel -> channel.writeAndFlush(message.retainedDuplicate()));
} else {
clients.stream()
.forEach(channel -> channel.writeAndFlush(message.retainedDuplicate()));
}
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
Runnable asyncTask = () -> {
Caller caller = (Caller) ctx.channel().attr(KEY_CALLER).get();
if (caller == null) {
caller = auth(ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get());//use the first message as token to auth
if (caller == null) {
clients.remove(ctx.channel());
ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate());
ctx.close();
log.warn("OTT " + ctx.channel().remoteAddress() + ": " + ctx);
ctx.close();
return;
}

ctx.channel().attr(KEY_CALLER).set(caller);
clients.add(ctx.channel());
log.trace(() -> "handlerAdded: " + ctx.channel().remoteAddress());

String message = onCallerConnected(ctx, caller);
if (message != null) {
sendToAllChannels(message, true);
}
}
};
NioConfig.cfg.getBizExecutor().execute(asyncTask);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.trace(() -> "channelActive: " + ctx.channel().remoteAddress());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
clients.remove(ctx.channel());
log.trace(() -> "handlerRemoved: " + ctx.channel().remoteAddress());
}

}

This file was deleted.

Loading
Loading