Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 226 additions & 96 deletions common/config/rush/pnpm-lock.yaml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion common/scripts/esbuild.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const defaultConfig = {
keepNames: false,
sourcemap: false,
logLevel: 'error',
external: [],
external: ['snappy'],
define: {},
};

Expand Down
2 changes: 1 addition & 1 deletion dev/tool/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"_phase:bundle": "rushx bundle",
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"bundle": "node ../../common/scripts/esbuild.js --entry=src/__start.ts --keep-names=true --sourcemap=external --define:MODEL_VERSION --define:GIT_REVISION",
"bundle": "node ../../common/scripts/esbuild.js --entry=src/__start.ts --keep-names=true --external=snappy --sourcemap=external --define:MODEL_VERSION --define:GIT_REVISION",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/tool",
"docker:tbuild": "docker build -t hardcoreeng/tool . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/tool",
"docker:abuild": "docker build -t hardcoreeng/tool . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/tool",
Expand Down
4 changes: 4 additions & 0 deletions packages/importer/config/rig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"$schema": "https://developer.microsoft.com/json-schemas/rig-package/rig.schema.json",
"rigPackageName": "@hcengineering/platform-rig"
}
6 changes: 4 additions & 2 deletions plugins/client-resources/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@
"typescript": "^5.3.3",
"jest": "^29.7.0",
"ts-jest": "^29.1.1",
"@types/jest": "^29.5.5"
"@types/jest": "^29.5.5",
"@types/snappyjs": "^0.7.1"
},
"dependencies": {
"@hcengineering/analytics": "^0.6.0",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/core": "^0.6.32",
"@hcengineering/client": "^0.6.18",
"@hcengineering/rpc": "^0.6.5"
"@hcengineering/rpc": "^0.6.5",
"snappyjs": "^0.7.0"
},
"repository": "https://github.com/hcengineering/platform",
"publishConfig": {
Expand Down
38 changes: 28 additions & 10 deletions plugins/client-resources/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import platform, {
broadcastEvent,
getMetadata
} from '@hcengineering/platform'
import { uncompress } from 'snappyjs'

import { HelloRequest, HelloResponse, RPCHandler, ReqId, type Response } from '@hcengineering/rpc'

Expand Down Expand Up @@ -89,6 +90,7 @@ class RequestPromise {
class Connection implements ClientConnection {
private websocket: ClientSocket | null = null
binaryMode = false
compressionMode = false
private readonly requests = new Map<ReqId, RequestPromise>()
private lastId = 0
private interval: number | undefined
Expand All @@ -107,7 +109,7 @@ class Connection implements ClientConnection {

private pingResponse: number = Date.now()

private helloRecieved: boolean = false
private helloReceived: boolean = false

private account: Account | undefined

Expand Down Expand Up @@ -205,7 +207,7 @@ class Connection implements ClientConnection {
}

isConnected (): boolean {
return this.websocket != null && this.websocket.readyState === ClientSocketReadyState.OPEN && this.helloRecieved
return this.websocket != null && this.websocket.readyState === ClientSocketReadyState.OPEN && this.helloReceived
}

delay = 0
Expand Down Expand Up @@ -289,9 +291,8 @@ class Connection implements ClientConnection {
}
if (resp.result === 'hello') {
const helloResp = resp as HelloResponse
if (helloResp.binary) {
this.binaryMode = true
}
this.binaryMode = helloResp.binary
this.compressionMode = helloResp.useCompression ?? false

// We need to clear dial timer, since we recieve hello response.
clearTimeout(this.dialTimer)
Expand All @@ -307,7 +308,7 @@ class Connection implements ClientConnection {
return
}
this.account = helloResp.account
this.helloRecieved = true
this.helloReceived = true
if (this.upgrading) {
// We need to call upgrade since connection is upgraded
this.opt?.onUpgrade?.()
Expand Down Expand Up @@ -439,6 +440,7 @@ class Connection implements ClientConnection {

private openConnection (ctx: MeasureContext, socketId: number): void {
this.binaryMode = false
this.helloReceived = false
// Use defined factory or browser default one.
const clientSocketFactory =
this.opt?.socketFactory ??
Expand Down Expand Up @@ -503,11 +505,28 @@ class Connection implements ClientConnection {
}
if (event.data instanceof Blob) {
void event.data.arrayBuffer().then((data) => {
if (this.compressionMode && this.helloReceived) {
try {
data = uncompress(data)
} catch (err: any) {
// Ignore
console.error(err)
}
}
const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
this.handleMsg(socketId, resp)
})
} else {
const resp = this.rpcHandler.readResponse<any>(event.data, this.binaryMode)
let data = event.data
if (this.compressionMode && this.helloReceived) {
try {
data = uncompress(data)
} catch (err: any) {
// Ignore
console.error(err)
}
}
const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
this.handleMsg(socketId, resp)
}
}
Expand All @@ -525,15 +544,14 @@ class Connection implements ClientConnection {
return
}
const useBinary = this.opt?.useBinaryProtocol ?? getMetadata(client.metadata.UseBinaryProtocol) ?? true
const useCompression =
this.compressionMode =
this.opt?.useProtocolCompression ?? getMetadata(client.metadata.UseProtocolCompression) ?? false
this.helloRecieved = false
const helloRequest: HelloRequest = {
method: 'hello',
params: [],
id: -1,
binary: useBinary,
compression: useCompression
compression: this.compressionMode
}
ctx.withSync('send-hello', {}, () => this.websocket?.send(this.rpcHandler.serialize(helloRequest, false)))
}
Expand Down
2 changes: 1 addition & 1 deletion pods/fulltext/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"get-model": "node ../../common/scripts/esbuild.js --entry=src/get-model.ts --keep-names=true --bundle=true --define=MODEL_VERSION --define=VERSION --define=GIT_REVISION && node ./bundle/bundle.js > ./bundle/model.json",
"bundle": "rushx get-model && node ../../common/scripts/esbuild.js --keep-names=true --bundle=true --external=*.node --sourcemap=external",
"bundle": "rushx get-model && node ../../common/scripts/esbuild.js --keep-names=true --bundle=true --external=*.node --external=snappy --sourcemap=external",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/fulltext",
"docker:tbuild": "docker build -t hardcoreeng/fulltext . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/fulltext",
"docker:abuild": "docker build -t hardcoreeng/fulltext . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/fulltext",
Expand Down
9 changes: 6 additions & 3 deletions pods/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"get-model": "node ../../common/scripts/esbuild.js --entry=src/get-model.ts -keep-names=true --define=MODEL_VERSION --define=VERSION --define=GIT_REVISION --bundle=true && node ./bundle/bundle.js > ./bundle/model.json",
"bundle": "rushx get-model && node ../../common/scripts/esbuild.js --entry=src/__start.ts --keep-names=true --bundle=true --sourcemap=external --external=*.node --external=*.node --external=bufferutil --external=snappy --define=MODEL_VERSION --define=VERSION --define=GIT_REVISION --external=utf-8-validate --external=msgpackr-extract",
"bundle": "rushx get-model && node ../../common/scripts/esbuild.js --entry=src/__start.ts --keep-names=true --bundle=true --sourcemap=external --external=*.node --external=bufferutil --external=snappy --define=MODEL_VERSION --define=VERSION --define=GIT_REVISION --external=utf-8-validate --external=msgpackr-extract",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/transactor",
"docker:tbuild": "docker build -t hardcoreeng/transactor . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/transactor",
"docker:abuild": "docker build -t hardcoreeng/transactor . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/transactor",
Expand Down Expand Up @@ -51,7 +51,8 @@
"jest": "^29.7.0",
"ts-jest": "^29.1.1",
"@types/jest": "^29.5.5",
"@hcengineering/model-all": "^0.6.0"
"@hcengineering/model-all": "^0.6.0",
"snappyjs": "^0.7.0"
},
"dependencies": {
"@hcengineering/core": "^0.6.32",
Expand All @@ -77,6 +78,8 @@
"bufferutil": "^4.0.8",
"msgpackr": "^1.11.2",
"msgpackr-extract": "^3.0.3",
"@hcengineering/postgres": "^0.6.0"
"@hcengineering/postgres": "^0.6.0",
"snappy": "^7.2.2",
"@hcengineering/rpc": "^0.6.5"
}
}
23 changes: 23 additions & 0 deletions pods/server/src/__tests__/compression.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { readFile } from 'fs/promises'
import { compress } from 'snappy'
import { RPCHandler } from '@hcengineering/rpc'

describe('compression-tests', () => {
it('check-snappy', async () => {
const modelJSON = (await readFile('./bundle/model.json')).toString()

const txes = JSON.parse(modelJSON)

const compressed = await compress(modelJSON)
console.log(modelJSON.length, compressed.length, compressed.length / modelJSON.length)
expect(compressed.length).toBeLessThan(modelJSON.length)

const rpc = new RPCHandler()

const jsonData = rpc.serialize(txes, true)

const compressed2 = await compress(jsonData)
console.log(jsonData.length, compressed2.length, compressed2.length / jsonData.length)
expect(compressed2.length).toBeLessThan(jsonData.length)
})
})
1 change: 0 additions & 1 deletion server/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,6 @@ export type ServerFactory = (
ctx: MeasureContext,
pipelineFactory: PipelineFactory,
port: number,
enableCompression: boolean,
accountsUrl: string,
externalStorage: StorageAdapter
) => () => Promise<void>
Expand Down
1 change: 1 addition & 0 deletions server/rpc/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export interface HelloResponse extends Response<any> {
lastTx?: string
lastHash?: string // Last model hash
account: Account
useCompression?: boolean
}

function replacer (key: string, value: any): any {
Expand Down
25 changes: 18 additions & 7 deletions server/server/src/sessionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class TSessionManager implements SessionManager {
stop: () => Promise<string | undefined>
}
| undefined,
readonly accountsUrl: string
readonly accountsUrl: string,
readonly enableCompression: boolean
) {
this.checkInterval = setInterval(() => {
this.handleTick()
Expand Down Expand Up @@ -1082,7 +1083,7 @@ class TSessionManager implements SessionManager {
): Promise<void> {
const hello = request as HelloRequest
service.binaryMode = hello.binary ?? false
service.useCompression = hello.compression ?? false
service.useCompression = this.enableCompression ? hello.compression ?? false : false

if (LOGGING_ENABLED) {
ctx.info('hello happen', {
Expand All @@ -1109,7 +1110,8 @@ class TSessionManager implements SessionManager {
serverVersion: this.serverVersion,
lastTx: pipeline.context.lastTx,
lastHash: pipeline.context.lastHash,
account: service.getRawAccount(pipeline)
account: service.getRawAccount(pipeline),
useCompression: service.useCompression
}
ws.send(requestCtx, helloResponse, false, false)
}
Expand All @@ -1126,9 +1128,18 @@ export function createSessionManager (
stop: () => Promise<string | undefined>
}
| undefined,
accountsUrl: string
accountsUrl: string,
enableCompression: boolean
): SessionManager {
return new TSessionManager(ctx, sessionFactory, timeouts, brandingMap ?? null, profiling, accountsUrl)
return new TSessionManager(
ctx,
sessionFactory,
timeouts,
brandingMap ?? null,
profiling,
accountsUrl,
enableCompression
)
}

/**
Expand Down Expand Up @@ -1160,7 +1171,8 @@ export function startSessionManager (
reconnectTimeout: 500
},
opt.profiling,
opt.accountsUrl
opt.accountsUrl,
opt.enableCompression ?? false
)
return {
shutdown: opt.serverFactory(
Expand All @@ -1171,7 +1183,6 @@ export function startSessionManager (
ctx,
opt.pipelineFactory,
opt.port,
opt.enableCompression ?? false,
opt.accountsUrl,
opt.externalStorage
),
Expand Down
3 changes: 2 additions & 1 deletion server/ws/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"express": "^4.21.2",
"utf-8-validate": "^6.0.4",
"ws": "^8.18.0",
"body-parser": "^1.20.2"
"body-parser": "^1.20.2",
"snappy": "^7.2.2"
}
}
43 changes: 16 additions & 27 deletions server/ws/src/server_http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ import os from 'os'
import { WebSocketServer, type RawData, type WebSocket } from 'ws'

import 'bufferutil'
import { compress } from 'snappy'
import 'utf-8-validate'

let profiling = false
const rpcHandler = new RPCHandler()
/**
Expand All @@ -60,14 +62,12 @@ export function startHttpServer (
ctx: MeasureContext,
pipelineFactory: PipelineFactory,
port: number,
enableCompression: boolean,
accountsUrl: string,
externalStorage: StorageAdapter
): () => Promise<void> {
if (LOGGING_ENABLED) {
ctx.info('starting server on', {
port,
enableCompression,
accountsUrl,
parallel: os.availableParallelism()
})
Expand Down Expand Up @@ -324,27 +324,7 @@ export function startHttpServer (
const httpServer = http.createServer(app)
const wss = new WebSocketServer({
noServer: true,
perMessageDeflate: enableCompression
? {
zlibDeflateOptions: {
// See zlib defaults.
chunkSize: 32 * 1024,
memLevel: 1,
level: 1
},
zlibInflateOptions: {
chunkSize: 32 * 1024,
level: 1,
memLevel: 1
},
serverNoContextTakeover: true,
clientNoContextTakeover: true,
// Below options specified as default values.
concurrencyLimit: Math.max(10, os.availableParallelism()), // Limits zlib concurrency for perf.
threshold: 1024 // Size (in bytes) below which messages
// should not be compressed if context takeover is disabled.
}
: false,
perMessageDeflate: false,
skipUTF8Validation: true,
maxPayload: 250 * 1024 * 1024,
clientTracking: false // We do not need to track clients inside clients.
Expand Down Expand Up @@ -566,23 +546,32 @@ function createWebsocketClientSocket (
}
ws.send(pongConst)
},
send: (ctx: MeasureContext, msg, binary, compression) => {
send: (ctx: MeasureContext, msg, binary, _compression) => {
const smsg = rpcHandler.serialize(msg, binary)

ctx.measure('send-data', smsg.length)
const st = Date.now()
if (ws.readyState !== ws.OPEN || cs.isClosed) {
return
}
ws.send(smsg, { binary: true, compress: compression }, (err) => {

const handleErr = (err?: Error): void => {
ctx.measure('msg-send-delta', Date.now() - st)
if (err != null) {
if (!`${err.message}`.includes('WebSocket is not open')) {
ctx.error('send error', { err })
Analytics.handleError(err)
}
}
ctx.measure('msg-send-delta', Date.now() - st)
})
}

if (_compression) {
void compress(smsg).then((msg: any) => {
ws.send(msg, { binary: true }, handleErr)
})
} else {
ws.send(smsg, { binary: true }, handleErr)
}
}
}
return cs
Expand Down
Loading
Loading