diff --git a/CHANGES.md b/CHANGES.md
index 50c8ecc0..3ab77386 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -14,6 +14,8 @@
* ✨ New API: FormatterUtil.formatCurrency(BigDecimal amount, RoundingMode roundingMode)
* WebResourceController.requestWebResource with @Daemon to serve web resources with enhanced reliability.
* Refactoring: predefined URI constants inside BootURI
+* Performance improvement: Agent_PDFBox - Serial graphics processing converted to parallel processing
+* New API: LargeFileStreamHandler for streaming large file response with low memory usage, and support for WebSocket for partial content delivery.
## Version 2.6.9 (2026-04-24)
diff --git a/pom.xml b/pom.xml
index 467fe9aa..e89eb5f6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -186,12 +186,12 @@
0.13.0
- 4.2.13.Final
+ 4.2.14.Final
2.0.77.Final
1.81.0
33.6.0-jre
- 4.34.1
+ 4.35.0
2.2.49
@@ -217,7 +217,7 @@
7.0.0
- 7.3.4.Final
+ 7.3.5.Final
7.0.2
diff --git a/src/main/java/org/summerboot/jexpress/boot/BootErrorCode.java b/src/main/java/org/summerboot/jexpress/boot/BootErrorCode.java
index 9360b0b5..350ef201 100644
--- a/src/main/java/org/summerboot/jexpress/boot/BootErrorCode.java
+++ b/src/main/java/org/summerboot/jexpress/boot/BootErrorCode.java
@@ -96,7 +96,7 @@ private static int applyOffset(int code, boolean wihtOffset) {
int AUTH_NO_PERMISSION = getErrorCode(AUTH_BASE + 6);
int AUTH_FORBIDDEN_IP = getErrorCode(AUTH_BASE + 7);
int AUTH_FORBIDDEN_JWT = getErrorCode(AUTH_BASE + 8);
- int AUTH_FORBIDDEN_REQUST = getErrorCode(AUTH_BASE + 9);
+ int AUTH_FORBIDDEN_REQUEST = getErrorCode(AUTH_BASE + 9);
//Integration
int ACCESS_BASE = getErrorCode(50, true);
diff --git a/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java b/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java
index e1290e3d..9cf46227 100644
--- a/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java
+++ b/src/main/java/org/summerboot/jexpress/boot/ScanedGuiceModule.java
@@ -141,10 +141,12 @@ public void configure() {
} catch (RuntimeException ex) {
}
Set namedWebsocket = channelHandlerNames.get(Service.ChannelHandlerType.Websocket);
- for (String s : namedWebsocket) {
- if (s == null || !s.startsWith(WebSocketAuthHandler_OTT.WS_PATH_PREFIX + "/")) {
- String errorMessage = "@Service(binding = ChannelHandler.class, named = \"" + s + "\", type = Service.ChannelHandlerType.Websocket): named field value must start with " + WebSocketAuthHandler_OTT.WS_PATH_PREFIX + "/, but found: " + s;
- throw new IllegalArgumentException(errorMessage);
+ if (namedWebsocket != null) {
+ for (String s : namedWebsocket) {
+ if (s == null || !s.startsWith(WebSocketAuthHandler_OTT.WS_PATH_PREFIX)) {
+ String errorMessage = "@Service(binding = ChannelHandler.class, named = \"" + s + "\", type = Service.ChannelHandlerType.Websocket): named field value must start with " + WebSocketAuthHandler_OTT.WS_PATH_PREFIX + ", but found: " + s;
+ throw new IllegalArgumentException(errorMessage);
+ }
}
}
}
diff --git a/src/main/java/org/summerboot/jexpress/nio/server/BootHttpFileUploadHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/BootHttpFileUploadHandler.java
index 44d0a1a6..51170090 100644
--- a/src/main/java/org/summerboot/jexpress/nio/server/BootHttpFileUploadHandler.java
+++ b/src/main/java/org/summerboot/jexpress/nio/server/BootHttpFileUploadHandler.java
@@ -279,8 +279,7 @@ protected long precheck(ChannelHandlerContext ctx, HttpRequest req) {
caller = authenticate(httpHeaders, context);
if (caller == null) {
- Err err = new Err(BootErrorCode.AUTH_NO_PERMISSION, null, "Unauthorized Caller", null);
- context.error(err).status(HttpResponseStatus.UNAUTHORIZED);
+ context.error(Err.UNAUTHORIZED_401).status(HttpResponseStatus.UNAUTHORIZED);
NioHttpUtil.sendResponse(ctx, true, context, null, null);
return 0;
}
diff --git a/src/main/java/org/summerboot/jexpress/nio/server/HttpNioChannelInitializer.java b/src/main/java/org/summerboot/jexpress/nio/server/HttpNioChannelInitializer.java
index 9663d5d2..37160663 100644
--- a/src/main/java/org/summerboot/jexpress/nio/server/HttpNioChannelInitializer.java
+++ b/src/main/java/org/summerboot/jexpress/nio/server/HttpNioChannelInitializer.java
@@ -126,7 +126,7 @@ protected void initChannelPipeline(ChannelPipeline channelPipeline, NioConfig ni
// 关键点:我们将路径设置为 null。设置为 null 意味着它不会主动去拦截并匹配固定 URL,
// 而是只要看到带有符合标准的 WebSocket Upgrade 请求头,它就会自动在原地执行握手升级!
// 这样无论我们前面把 URI 改成 /ws/chat 还是 /ws/game,它都能兼容升级。
- String webSocketURI = WebSocketAuthHandler_OTT.WS_PATH_PREFIX;
+ String webSocketURI = WebSocketAuthHandler_OTT.WS_PATH;
channelPipeline.addLast(WebSocketAuthHandler_OTT.CHANNEL_CHANNEL_NAME_NEXT, new WebSocketServerProtocolHandler(webSocketURI, null, allowExtensions, maxFrameSize, allowMaskMismatch, checkStartsWith, dropPongFrames, handshakeTimeoutMillis));
// 3. 注意:这里【不要】像之前一样 addLast(new BusinessHandler) 了。
diff --git a/src/main/java/org/summerboot/jexpress/nio/server/NioServerHttpRequestHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/NioServerHttpRequestHandler.java
index 1ed5bbd2..bb407afd 100644
--- a/src/main/java/org/summerboot/jexpress/nio/server/NioServerHttpRequestHandler.java
+++ b/src/main/java/org/summerboot/jexpress/nio/server/NioServerHttpRequestHandler.java
@@ -171,13 +171,13 @@ public void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest
} else {
String error = GeoIpUtil.callerAddressFilter(context.remoteIP(), nioCfg.getCallerAddressFilterWhitelist(), nioCfg.getCallerAddressFilterBlacklist(), nioCfg.getCallerAddressFilterOption());
if (error != null) {
- Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_IP, null, "Forbidden caller IP", null, "Forbidden caller IP: " + error);
+ Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_IP, null, "Blocked caller IP", null, "Blocked caller IP: " + error);
context.error(err).status(HttpResponseStatus.FORBIDDEN);
} else {
String request = httpMethod + httpRequestUri;
error = SecurityUtil.whitelistbalcklistilter("request", request, nioCfg.getRequestFilterWhitelist(), nioCfg.getRequestFilterBlacklist());
if (error != null) {
- Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_REQUST, null, "Forbidden caller request", null, "Forbidden caller request: " + error);
+ Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_REQUEST, null, "Blocked URL", null, "Blocked URL: " + error);
context.error(err).status(HttpResponseStatus.FORBIDDEN);
} else {
processorSettings = service(ctx, requestHeaders, httpMethod, httpRequestUri, parameters, httpPostRequestBody, context);
diff --git a/src/main/java/org/summerboot/jexpress/nio/server/domain/Err.java b/src/main/java/org/summerboot/jexpress/nio/server/domain/Err.java
index bdf4f6f4..3d7451b2 100644
--- a/src/main/java/org/summerboot/jexpress/nio/server/domain/Err.java
+++ b/src/main/java/org/summerboot/jexpress/nio/server/domain/Err.java
@@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.summerboot.jexpress.boot.BootErrorCode;
import org.summerboot.jexpress.util.BeanUtil;
/**
@@ -46,6 +47,8 @@ public class Err extends AdditionalJsonFields {
@JsonIgnore
protected Object internalInfo;
+ public static final Err UNAUTHORIZED_401 = new Err(BootErrorCode.AUTH_LOGIN_FAILED, null, "Authentication Required - Unknown caller", null);
+
public Err() {
}
diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java
index 7564e0cc..a2cdf6b7 100644
--- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java
+++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/BootWebSocketHandler.java
@@ -27,13 +27,15 @@
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
-import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.tika.Tika;
+import org.apache.tika.mime.MimeTypes;
import org.summerboot.jexpress.nio.server.NioConfig;
import org.summerboot.jexpress.nio.server.NioHttpUtil;
import org.summerboot.jexpress.security.auth.Caller;
+import org.summerboot.jexpress.util.FileUtil;
/**
* usage example:
@@ -65,10 +67,13 @@ abstract public class BootWebSocketHandler extends SimpleChannelInboundHandler {
+ Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get();
+ if (caller == null) {
+ clients.remove(ctx.channel());
+ ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate());
+ ctx.close();
+ log.warn("OTT auth failed - " + ctx.channel().remoteAddress() + ": " + ctx);
+ ctx.close();
+ return;
+ }
+
+ clients.add(ctx.channel());
+ log.trace(() -> "handlerAdded: " + ctx.channel().remoteAddress());
+
+ String message = onCallerConnected(ctx, caller);
+ if (message != null) {
+ sendToAllChannels(message, true);
+ }
+ };
+ NioConfig.cfg.getBizExecutor().execute(asyncTask);
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ log.trace(() -> "channelActive: " + ctx.channel().remoteAddress());
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) {
+ clients.remove(ctx.channel());
+ log.trace(() -> "handlerRemoved: " + ctx.channel().remoteAddress());
+ }
+
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
clients.remove(ctx.channel());
@@ -104,56 +145,43 @@ protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throw
}
}
+ protected void onTextWebSocketFrame(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
+ String txt = msg.text();
+ processMessage(ctx, txt, null);
+ }
+
protected void onBinaryWebSocketFrame(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
ByteBuf bb = msg.content();
byte[] data = ByteBufUtil.getBytes(bb);
- Runnable asyncTask = () -> {
- Caller caller = (Caller) ctx.channel().attr(KEY_CALLER).get();
- if (caller == null) {
- clients.remove(ctx.channel());
- ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate());
- ctx.close();
- log.warn("OTT auth failed " + ctx.channel().remoteAddress());
- return;
- }
- String responseText = onMessage(ctx, caller, data);
- if (responseText != null) {
- sendToChannel(ctx, responseText);
- }
- };
- NioConfig.cfg.getBizExecutor().execute(asyncTask);
+ processMessage(ctx, null, data);
}
- protected void onTextWebSocketFrame(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
- String txt = msg.text();
+ protected void onContinuationWebSocketFrame(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {
+
+ }
+
+ protected void processMessage(ChannelHandlerContext ctx, String text, byte[] data) {
Runnable asyncTask = () -> {
- Caller caller = (Caller) ctx.channel().attr(KEY_CALLER).get();
- if (caller == null) {
- /*caller = auth(ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get());//use the first message as token to auth
- if (caller == null) {
- clients.remove(ctx.channel());
- ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate());
- ctx.close();
- log.warn("OTT auth failed " + ctx.channel().remoteAddress() + ": " + txt);
- return;
+ Caller caller = (Caller) ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get();
+ if (text != null) {
+ String responseText = onMessage(ctx, caller, text);
+ if (responseText != null) {
+ sendToAllChannels(responseText, true);
+ }
+ } else if (data != null) {
+ String[] mimeType = FileUtil.getMIMEShortExtension(data);
+ StringBuilder sb = new StringBuilder();
+ byte[] processedData = onMessage(ctx, caller, data, mimeType[0], mimeType[1], mimeType[2], sb);
+ if (processedData != null) {
+ sendToAllChannels(processedData, true);
+ }
+ if (!sb.isEmpty()) {
+ sendToAllChannels(sb.toString(), true);
}
- ctx.channel().attr(KEY_CALLER).set(caller);
- String message = onCallerConnected(ctx, caller);
- if (message != null) {
- sendToAllChannels(message, true);
- }*/
}
- String responseText = onMessage(ctx, caller, txt);
- if (responseText != null) {
- //sendToChannel(ctx, responseText);
- sendToAllChannels(responseText, true);
- }
};
NioConfig.cfg.getBizExecutor().execute(asyncTask);
- }
-
- protected void onContinuationWebSocketFrame(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {
}
@@ -171,7 +199,7 @@ protected Caller auth(Caller caller) {
*/
abstract protected String onMessage(ChannelHandlerContext ctx, Caller caller, String txt);
- abstract protected String onMessage(ChannelHandlerContext ctx, Caller caller, byte[] data);
+ abstract protected byte[] onMessage(ChannelHandlerContext ctx, Caller caller, byte[] data, String mimeType, String fileType, String fileExtension, StringBuilder builder);
public void sendToChannel(ChannelHandlerContext ctx, String message) {
ctx.writeAndFlush(new TextWebSocketFrame(message));
@@ -194,7 +222,11 @@ public void sendToAllChannels(byte[] data, boolean auth) {
public void sendToAllChannels(WebSocketFrame message, boolean auth) {
if (auth) {
clients.stream()
- .filter(channel -> channel.attr(KEY_CALLER).get() != null)
+ /*.filter(channel -> {
+ Caller caller = channel.attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get();
+ return caller != null;
+ })*/
+ .filter(channel -> channel.attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get() != null)
.forEach(channel -> channel.writeAndFlush(message.retainedDuplicate()));
} else {
clients.stream()
@@ -202,43 +234,4 @@ public void sendToAllChannels(WebSocketFrame message, boolean auth) {
}
}
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) {
- Runnable asyncTask = () -> {
- Caller caller = (Caller) ctx.channel().attr(KEY_CALLER).get();
- if (caller == null) {
- caller = auth(ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get());//use the first message as token to auth
- if (caller == null) {
- clients.remove(ctx.channel());
- ctx.writeAndFlush(MSG_AUTH_FAILED.retainedDuplicate());
- ctx.close();
- log.warn("OTT " + ctx.channel().remoteAddress() + ": " + ctx);
- ctx.close();
- return;
- }
-
- ctx.channel().attr(KEY_CALLER).set(caller);
- clients.add(ctx.channel());
- log.trace(() -> "handlerAdded: " + ctx.channel().remoteAddress());
-
- String message = onCallerConnected(ctx, caller);
- if (message != null) {
- sendToAllChannels(message, true);
- }
- }
- };
- NioConfig.cfg.getBizExecutor().execute(asyncTask);
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- log.trace(() -> "channelActive: " + ctx.channel().remoteAddress());
- }
-
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) {
- clients.remove(ctx.channel());
- log.trace(() -> "handlerRemoved: " + ctx.channel().remoteAddress());
- }
-
}
diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/ChatModuleHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/ChatModuleHandler.java
deleted file mode 100644
index 7e6ca6fb..00000000
--- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/ChatModuleHandler.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2005-2026 Du Law Office - jExpress, The Summer Boot Framework Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://apache.org
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.summerboot.jexpress.nio.server.websocket;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketFrame;
-import org.summerboot.jexpress.security.auth.Caller;
-
-public class ChatModuleHandler extends SimpleChannelInboundHandler {
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
- Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get();
-
- if (frame instanceof TextWebSocketFrame) {
- System.out.println("[聊天模块] 收到用户 " + caller.getDisplayName() + " 的文本: " + ((TextWebSocketFrame) frame).text());
- } else if (frame instanceof BinaryWebSocketFrame) {
- System.out.println("[聊天模块] 收到用户 " + caller.getDisplayName() + " 的二进制数据流");
- }
- }
-}
\ No newline at end of file
diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/GameModuleHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/GameModuleHandler.java
deleted file mode 100644
index efc0f4a8..00000000
--- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/GameModuleHandler.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2005-2026 Du Law Office - jExpress, The Summer Boot Framework Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://apache.org
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.summerboot.jexpress.nio.server.websocket;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketFrame;
-import org.summerboot.jexpress.security.auth.Caller;
-
-public class GameModuleHandler extends SimpleChannelInboundHandler {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
- Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get();
-
- if (frame instanceof TextWebSocketFrame) {
- System.out.println("[游戏模块] 收到用户 " + caller.getDisplayName() + " 的文本: " + ((TextWebSocketFrame) frame).text());
- } else if (frame instanceof BinaryWebSocketFrame) {
- System.out.println("[游戏模块] 收到用户 " + caller.getDisplayName() + " 的二进制数据流");
- }
- }
-}
diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java
new file mode 100644
index 00000000..d029bd04
--- /dev/null
+++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/LargeFileStreamHandler.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2005-2026 Du Law Office - jExpress, The Summer Boot Framework Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://apache.org
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.summerboot.jexpress.nio.server.websocket;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.util.AttributeKey;
+import org.summerboot.jexpress.security.auth.Caller;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Changski Tie Zheng Zhang 张铁铮, 魏泽北, 杜旺财, 杜富贵
+ */
+public abstract class LargeFileStreamHandler extends SimpleChannelInboundHandler {
+
+ private static final AttributeKey FILE_CHANNEL_KEY = AttributeKey.valueOf("fileChannel");
+ private static final AttributeKey FILE_STREAM_KEY = AttributeKey.valueOf("fileStream");
+ private static final AttributeKey FILE_SIZE_KEY = AttributeKey.valueOf("fileSize");
+
+ // Used to remember the physical file object currently being written in the connection context.
+ private static final AttributeKey TARGET_FILE_KEY = AttributeKey.valueOf("targetFile");
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
+ Caller caller = ctx.channel().attr(WebSocketAuthHandler_OTT.USER_ID_KEY).get();
+ String userId = caller == null ? "anonymous" : caller.getUid();
+
+ // 1. 【起始帧】收到文件的第一个分片
+ if (frame instanceof BinaryWebSocketFrame) {
+ BinaryWebSocketFrame startFrame = (BinaryWebSocketFrame) frame;
+
+ FileChannel fileChannel = ctx.channel().attr(FILE_CHANNEL_KEY).get();
+
+ // Initialize on first frame only
+ if (fileChannel == null) {
+ Path targetPath = Paths.get("data", "uploads", "huge_file_" + userId + "_" + System.currentTimeMillis() + ".dat").toAbsolutePath();
+ Path parent = targetPath.getParent();
+ if (parent != null) {
+ Files.createDirectories(parent);
+ }
+ File targetFile = targetPath.toFile();
+ FileOutputStream fos = new FileOutputStream(targetFile, true);
+ fileChannel = fos.getChannel();
+
+ ctx.channel().attr(TARGET_FILE_KEY).set(targetFile);
+ ctx.channel().attr(FILE_STREAM_KEY).set(fos);
+ ctx.channel().attr(FILE_CHANNEL_KEY).set(fileChannel);
+ }
+
+ // Write and send ACK
+ long currentSize = writeAndGetNewSize(startFrame.content(), fileChannel);
+ ctx.channel().attr(FILE_SIZE_KEY).set(currentSize);
+
+ // IMPORTANT: Do NOT check isFinalFragment() here - each ws.send() arrives as a complete frame!
+ // Instead, the client must signal completion separately or the server must track expected size
+ sendAck(ctx, currentSize, false); // Always false for intermediate chunks
+ }
+
+ // 2. [Continuous Frames] Received numerous subsequent fragments
+ else if (frame instanceof ContinuationWebSocketFrame) {
+ ContinuationWebSocketFrame continuationFrame = (ContinuationWebSocketFrame) frame;
+ FileChannel fileChannel = ctx.channel().attr(FILE_CHANNEL_KEY).get();
+
+ if (fileChannel == null) {
+ ctx.close();
+ return;
+ }
+
+ long currentSize = writeAndGetNewSize(continuationFrame.content(), fileChannel);
+ ctx.channel().attr(FILE_SIZE_KEY).set(currentSize);
+
+ sendAck(ctx, currentSize, continuationFrame.isFinalFragment());
+
+ if (continuationFrame.isFinalFragment()) {
+ closeAndCleanUp(ctx, caller);
+ }
+ }
+
+ // 3. [Special Frame] The client sends a text message to indicate that the transmission is complete.
+ else if (frame instanceof TextWebSocketFrame) {
+ TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
+ String text = textFrame.text();
+
+ if ("UPLOAD_COMPLETE".equals(text)) {
+ closeAndCleanUp(ctx, caller);
+ }
+ }
+ }
+
+ private long writeAndGetNewSize(ByteBuf content, FileChannel fileChannel) throws IOException {
+ long position = fileChannel.size();
+ content.readBytes(fileChannel, position, content.readableBytes());
+ return fileChannel.size();
+ }
+
+ private void sendAck(ChannelHandlerContext ctx, long uploadedSize, boolean isFinished) throws IOException {
+ Map ackMap = new HashMap<>();
+ ackMap.put("status", isFinished ? "COMPLETE" : "PROGRESS");
+ ackMap.put("uploadedSize", uploadedSize);
+
+ String jsonAck = objectMapper.writeValueAsString(ackMap);
+ ctx.channel().writeAndFlush(new TextWebSocketFrame(jsonAck));
+ }
+
+ /**
+ * Shut down resources, perform disk cleanup, and finally trigger the lifecycle cleanup hook.
+ */
+ private void closeAndCleanUp(ChannelHandlerContext ctx, Caller caller) throws IOException {
+ FileChannel fileChannel = ctx.channel().attr(FILE_CHANNEL_KEY).getAndSet(null);
+ FileOutputStream fos = ctx.channel().attr(FILE_STREAM_KEY).getAndSet(null);
+ File targetFile = ctx.channel().attr(TARGET_FILE_KEY).getAndSet(null);
+
+ if (fileChannel != null) {
+ // 极其重要:强制将操作系统缓存区的数据刷入物理存储介质
+ fileChannel.force(true);
+ fileChannel.close();
+ }
+ if (fos != null) {
+ fos.close();
+ }
+
+ // ==========================================
+ // [Core Trigger Point] At this point, the file has been completely written to disk, allowing for a safe invocation of the business cleanup method.
+ // ==========================================
+ if (targetFile != null && targetFile.exists()) {
+ ctx.channel().writeAndFlush(new TextWebSocketFrame("{\"status\":\"ALL_TASKS_COMPLETE\"}"));
+ onUploadCompleted(ctx, targetFile, caller);
+ }
+ }
+
+ /**
+ * Business callback hook after successful lossless transfer and disk write of large files
+ */
+ abstract protected void onUploadCompleted(ChannelHandlerContext ctx, File targetFile, Caller caller);
+
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ FileChannel fileChannel = ctx.channel().attr(FILE_CHANNEL_KEY).getAndSet(null);
+ FileOutputStream fos = ctx.channel().attr(FILE_STREAM_KEY).getAndSet(null);
+
+ if (fileChannel != null) fileChannel.close();
+ if (fos != null) fos.close();
+
+ ctx.close();
+ }
+}
+
diff --git a/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java b/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java
index a42f5517..c6d09ec6 100644
--- a/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java
+++ b/src/main/java/org/summerboot/jexpress/nio/server/websocket/WebSocketAuthHandler_OTT.java
@@ -23,9 +23,12 @@
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.AttributeKey;
import org.summerboot.jexpress.security.auth.Authenticator;
import org.summerboot.jexpress.security.auth.Caller;
@@ -40,7 +43,8 @@ public class WebSocketAuthHandler_OTT extends ChannelInboundHandlerAdapter {
public static final AttributeKey USER_ID_KEY = AttributeKey.valueOf("userId");
public static final String CHANNEL_NAME = "WebSocketAuthHandler_OTT";
public static final String CHANNEL_CHANNEL_NAME_NEXT = "BootWebSocketServerProtocolHandler";
- public static final String WS_PATH_PREFIX = "/ws";
+ public static final String WS_PATH = "/ws";
+ public static final String WS_PATH_PREFIX = WS_PATH + "/";
protected final Injector injector;
@@ -76,6 +80,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
String oneTimeTicket = uriRequested.substring(uriPredefinedOTT.length());
Caller caller = verifyAndDestroyTicket(oneTimeTicket); // 校验并销毁 Ticket
if (caller == null) {
+ sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED));
break;
}
// save OTT result to channel attr
@@ -94,24 +99,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ctx.fireChannelRead(msg);
return;
}
- /*if (uriRequested.startsWith(CHAT_PATH_PREFIX)) {
- String oneTimeTicket = uriRequested.substring(CHAT_PATH_PREFIX.length());
- Caller caller = verifyAndDestroyTicket(oneTimeTicket); // 校验并销毁 Ticket
-
- if (caller != null) {
- ctx.channel().attr(USER_ID_KEY).set(caller);
-
- // 【核心点】动态向管道末尾添加聊天专属业务 Handler
- //ctx.pipeline().addLast("businessHandler", new ChatModuleHandler());
- ChannelHandler ch = injector.getInstance(Key.get(ChannelHandler.class, Names.named("/ws/chat")));
- //ch = new ChatModuleHandler();
- ctx.pipeline().addAfter(CHANNEL_CHANNEL_NAME_NEXT, "wsChatModuleHandler", ch);
-
- // 重写 URI,让下游的 WebSocketServerProtocolHandler 能够精准匹配升级
- request.setUri("/ws/chat");
- ctx.fireChannelRead(msg);
- return;
- }*/
}
// 3. 认证失败或路径不匹配:直接拒绝连接
@@ -123,14 +110,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ctx.fireChannelRead(msg);
}
- private Caller verifyAndDestroyTicket(String oneTimeTicket) {
+ protected Caller verifyAndDestroyTicket(String oneTimeTicket) {
if (authenticator == null) {
return null;
}
return authenticator.oneTimeTicketVerifyAndDestroy(oneTimeTicket);
}
- private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
+ protected void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
HttpUtil.setContentLength(res, 0);
ctx.channel().writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
}
diff --git a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java
index 42398cd1..1516e58d 100644
--- a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java
+++ b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/BootController.java
@@ -291,9 +291,10 @@ public void logout(@Parameter(hidden = true) final ServiceRequest request, @Para
@Path(BootURI.CURRENT_VERSION + BootURI.API_NF_OTT)
@Daemon
@RequiresHealthCheck("")
- public String oneTimeTicketAuthenticate(@HeaderParam(NioHttpUtil.HTTP_HEADER_AUTH_TOKEN) String authHeader, @Parameter(hidden = true) final SessionContext context) {
+ @Log(responseBody = false)
+ public String oneTimeTicketAuthenticate(@QueryParam("wsURI") String wsURI, @HeaderParam(NioHttpUtil.HTTP_HEADER_AUTH_TOKEN) String authHeader, @Parameter(hidden = true) final SessionContext context) {
String jwt = BootAuthenticator.getBearerToken(authHeader);
- return auth.oneTimeTicketAuthenticate(jwt, context);
+ return auth.oneTimeTicketAuthenticate(wsURI, jwt, context);
}
@Operation(
diff --git a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/JaxRsRequestProcessor.java b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/JaxRsRequestProcessor.java
index 765c3101..a0635274 100644
--- a/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/JaxRsRequestProcessor.java
+++ b/src/main/java/org/summerboot/jexpress/nio/server/ws/rs/JaxRsRequestProcessor.java
@@ -405,8 +405,7 @@ public boolean authorizationCheck(final ChannelHandlerContext channelHandlerCtx,
boolean isAuthorized = false;
Caller caller = context.caller();
if (caller == null) {
- context.status(HttpResponseStatus.UNAUTHORIZED)
- .error(new Err(BootErrorCode.AUTH_NO_PERMISSION, null, "Authentication Required - Unknown caller", null));
+ context.error(Err.UNAUTHORIZED_401).status(HttpResponseStatus.UNAUTHORIZED);
return false;
}
diff --git a/src/main/java/org/summerboot/jexpress/security/auth/Authenticator.java b/src/main/java/org/summerboot/jexpress/security/auth/Authenticator.java
index fc874159..8a9eda17 100644
--- a/src/main/java/org/summerboot/jexpress/security/auth/Authenticator.java
+++ b/src/main/java/org/summerboot/jexpress/security/auth/Authenticator.java
@@ -137,11 +137,12 @@ public interface Authenticator {
* in production, generate a random string as one-time ticket, store it in redis with key "ws:ticket:" + oneTimeTicket, value = caller (or json string),
* and set expire time to 10 seconds. return the one-time ticket string to caller.
*
- * @param jwt
+ * @param wsURI WebSocket URI
+ * @param jwt caller's JWT, can be used to verify caller's identity and generate one-time ticket for specific user
* @param context contains caller info, e.g. caller.getUid() can be used to generate one-time ticket for specific user
* @return (32 to 64 chars + prefix) random string as one-time ticket, e.g. t_f87yfs7shfash7kk7a877asdf
*/
- String oneTimeTicketAuthenticate(String jwt, SessionContext context);
+ String oneTimeTicketAuthenticate(String wsURI, String jwt, SessionContext context);
/**
* in production, call redis.getdel("ws:ticket:" + oneTimeTicket)
diff --git a/src/main/java/org/summerboot/jexpress/security/auth/BootAuthenticator.java b/src/main/java/org/summerboot/jexpress/security/auth/BootAuthenticator.java
index 12821fad..15e1ca53 100644
--- a/src/main/java/org/summerboot/jexpress/security/auth/BootAuthenticator.java
+++ b/src/main/java/org/summerboot/jexpress/security/auth/BootAuthenticator.java
@@ -100,7 +100,7 @@ public String signJWT(String username, String pwd, E metaData, int validForMinut
@Override
public String signJWT(Caller caller, int validForMinutes, final SessionContext context) {
if (caller == null) {
- context.status(HttpResponseStatus.UNAUTHORIZED);
+ context.error(Err.UNAUTHORIZED_401).status(HttpResponseStatus.UNAUTHORIZED);
return null;
}
@@ -363,7 +363,7 @@ public Caller verifyToken(String authToken, AuthTokenCache cache, Integer errorC
if (error == null) {
caller = fromJwt(claims);
} else {
- Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_JWT, null, "Forbidden JWT", null, "Forbidden JWT: " + error);
+ Err err = new Err(BootErrorCode.AUTH_FORBIDDEN_JWT, null, "Blocked JWT", null, "Blocked JWT: " + error);
context.error(err).status(HttpResponseStatus.FORBIDDEN);
}
}
@@ -503,7 +503,7 @@ public ServerCall.Listener interceptCall(ServerCall= 1024.0d && unitIndex < units.length - 1) {
+ size /= 1024.0d;
+ unitIndex++;
+ }
+
+ // Manual two-decimal formatting avoids String.format(Locale, ...) overhead.
+ long scaled = Math.round(size * 100.0d);
+ long integerPart = scaled / 100;
+ int fractionalPart = (int) (scaled % 100);
+
+ StringBuilder sb = new StringBuilder(16);
+ sb.append(integerPart).append('.');
+ if (fractionalPart < 10) {
+ sb.append('0');
+ }
+ sb.append(fractionalPart).append(units[unitIndex]);
+ return sb.toString();
+ }
}
diff --git a/src/main/java/org/summerboot/jexpress/util/pdf/Agent_PDFBox.java b/src/main/java/org/summerboot/jexpress/util/pdf/Agent_PDFBox.java
index 9a9a2d76..bb65857f 100644
--- a/src/main/java/org/summerboot/jexpress/util/pdf/Agent_PDFBox.java
+++ b/src/main/java/org/summerboot/jexpress/util/pdf/Agent_PDFBox.java
@@ -30,6 +30,7 @@
import org.apache.pdfbox.rendering.ImageType;
import org.apache.pdfbox.rendering.PDFRenderer;
import org.apache.pdfbox.rendering.RenderDestination;
+import org.summerboot.jexpress.util.ApplicationUtil;
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
@@ -41,6 +42,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
/**
* @author Changski Tie Zheng Zhang 张铁铮, 魏泽北, 杜旺财, 杜富贵
@@ -316,10 +319,24 @@ public static List pdf2Images(File pdfFile, String password, floa
public static List pdf2Images(PDDocument document, float dpi, ImageType imageType, RenderDestination destination) throws IOException {
PDFRenderer renderer = new PDFRenderer(document);
int totalPages = document.getNumberOfPages();
- List images = new ArrayList();
- for (int currentPage = 0; currentPage < totalPages; currentPage++) {
+ List images = new ArrayList(totalPages);
+ /*for (int currentPage = 0; currentPage < totalPages; currentPage++) {
BufferedImage image = renderer.renderImage(currentPage, dpi / 72f, imageType, destination);
images.add(image);
+ }*/
+ List> tasks = new ArrayList<>(totalPages);
+ for (int currentPage = 0; currentPage < totalPages; currentPage++) {
+ final int index = currentPage;
+ Callable task = () -> {
+ BufferedImage image = renderer.renderImage(index, dpi / 72f, imageType, destination);
+ return image;
+ };
+ tasks.add(task);
+ }
+ try {
+ ApplicationUtil.runAndWaitForAllResults(tasks, images);
+ } catch (ExecutionException e) {
+ e.printStackTrace();
}
return images;
}
@@ -332,13 +349,29 @@ public static List pdf2Images(PDDocument document, float dpi, Ima
* @throws IOException
*/
public static List images2Bytes(List images, String formatName) throws IOException {
- List imageDataList = new ArrayList(images.size());
+ int totalPages = images.size();
+ List imageDataList = new ArrayList(totalPages);
+ List> tasks = new ArrayList<>(totalPages);
+ /*for (BufferedImage image : images) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) {
+ ImageIO.write(image, formatName, baos);
+ byte[] imageData = baos.toByteArray();
+ imageDataList.add(imageData);
+ }
+ }*/
for (BufferedImage image : images) {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) {
- ImageIO.write(image, formatName, baos);
- byte[] imageData = baos.toByteArray();
- imageDataList.add(imageData);
- }
+ Callable task = () -> {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) {
+ ImageIO.write(image, formatName, baos);
+ return baos.toByteArray();
+ }
+ };
+ tasks.add(task);
+ }
+ try {
+ ApplicationUtil.runAndWaitForAllResults(tasks, imageDataList);
+ } catch (ExecutionException e) {
+ e.printStackTrace();
}
return imageDataList;
}