Skip to content

Commit

Permalink
优化精简流解析功能
Browse files Browse the repository at this point in the history
  • Loading branch information
Vinlic committed Apr 3, 2024
1 parent c1e6244 commit 6f22635
Showing 1 changed file with 30 additions and 65 deletions.
95 changes: 30 additions & 65 deletions src/api/controllers/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,45 +511,23 @@ async function receiveStream(model: string, convId: string, stream: any) {
: "";
}
};
let chunk = Buffer.from([]);
let temp = Buffer.from([]);
// 将流数据传到转换器,每个buffer去除数据头5字节
// 将流数据传到转换器
stream.on("data", (buffer: Buffer) => {
const parts: Buffer[] = [];
let length = 0;
let sizeLength = 0;
let i = 0;
if (buffer[buffer.length - 1] != 125) {
temp = Buffer.concat([temp, buffer]);
// 接收数据头
chunk = Buffer.concat([temp, chunk, buffer]);
if(chunk.length < 5)
return;
}
else if (temp.length > 0) {
buffer = Buffer.concat([temp, buffer]);
temp = Buffer.from([]);
}
for (i = 0; i < buffer.byteLength; i++) {
const byte = buffer.readUInt8(i);
if (byte == 0x00 || byte == 0x02) {
if (length > 4) {
const subBuffer = Buffer.alloc(length - (5 - sizeLength));
const subStart = i - length + (5 - sizeLength);
buffer.copy(subBuffer, 0, subStart, subStart + length + sizeLength);
sizeLength = 0;
parts.push(subBuffer);
}
sizeLength++;
length = 0;
continue;
}
length++;
}
if (length > 4) {
const subBuffer = Buffer.alloc(length - (5 - sizeLength));
const subStart = i - length + (5 - sizeLength);
buffer.copy(subBuffer, 0, subStart, subStart + length + sizeLength);
sizeLength = 0;
parts.push(subBuffer);
}
parts.forEach((part) => parser(part));
// 读取当前数据块大小
const chunkSize = chunk.readUint32BE(1);
// 根据当前大小接收完整数据块
temp = chunk.subarray(chunkSize + 5);
chunk = chunk.subarray(0, chunkSize + 5);
if(chunk.length < chunkSize + 5)
return;
parser(chunk.subarray(5));
chunk = Buffer.from([]);
});
stream.once("error", (err) => reject(err));
stream.once("close", () => resolve(data));
Expand Down Expand Up @@ -679,36 +657,23 @@ function createTransStream(
endCallback && endCallback();
}
};
// 将流数据传到转换器,每个buffer去除数据头5字节
let chunk = Buffer.from([]);
let temp = Buffer.from([]);
// 将流数据传到转换器
stream.on("data", (buffer: Buffer) => {
const parts: Buffer[] = [];
let length = 0;
let sizeLength = 0;
let i = 0;
for (i = 0; i < buffer.byteLength; i++) {
const byte = buffer.readUInt8(i);
if (byte == 0x00 || byte == 0x02) {
if (length > 4) {
const subBuffer = Buffer.alloc(length - (5 - sizeLength));
const subStart = i - length + (5 - sizeLength);
buffer.copy(subBuffer, 0, subStart, subStart + length + sizeLength);
sizeLength = 0;
parts.push(subBuffer);
}
sizeLength++;
length = 0;
continue;
}
length++;
}
if (length > 4) {
const subBuffer = Buffer.alloc(length - (5 - sizeLength));
const subStart = i - length + (5 - sizeLength);
buffer.copy(subBuffer, 0, subStart, subStart + length + sizeLength);
sizeLength = 0;
parts.push(subBuffer);
}
parts.forEach((part) => parser(part));
// 接收数据头
chunk = Buffer.concat([temp, chunk, buffer]);
if(chunk.length < 5)
return;
// 读取当前数据块大小
const chunkSize = chunk.readUint32BE(1);
// 根据当前大小接收完整数据块
temp = chunk.subarray(chunkSize + 5);
chunk = chunk.subarray(0, chunkSize + 5);
if(chunk.length < chunkSize + 5)
return;
parser(chunk.subarray(5));
chunk = Buffer.from([]);
});
stream.once(
"error",
Expand Down

0 comments on commit 6f22635

Please sign in to comment.