Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions packages/core/src/__tests__/limits.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
//
// Copyright © 2023 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//

import { TimeRateLimiter } from '../utils'

describe('TimeRateLimiter', () => {
it('should limit rate of executions', async () => {
jest.useFakeTimers()
const limiter = new TimeRateLimiter(2, 1000) // 2 executions per second
const mockFn = jest.fn().mockResolvedValue('result')
const operations: Promise<string>[] = []

// Try to execute 4 operations
for (let i = 0; i < 4; i++) {
operations.push(limiter.exec(mockFn))
}

// First 2 should execute immediately
expect(mockFn).toHaveBeenCalledTimes(2)

// Advance time by 1 second
jest.advanceTimersByTime(1001)
await Promise.resolve()

// Next 2 should execute after time advance
expect(mockFn).toHaveBeenCalledTimes(4)

await Promise.all(operations)
})

it('should cleanup old executions', async () => {
jest.useFakeTimers()
const limiter = new TimeRateLimiter(2, 1000)
const mockFn = jest.fn().mockResolvedValue('result')

// Execute first operation
await limiter.exec(mockFn)
expect(mockFn).toHaveBeenCalledTimes(1)
expect(limiter.executions.length).toBe(1)

// Advance time past period
jest.advanceTimersByTime(1001)

// Execute another operation
await limiter.exec(mockFn)
expect(mockFn).toHaveBeenCalledTimes(2)
expect(limiter.executions.length).toBe(1) // Old execution should be cleaned up
})

it('should handle concurrent operations', async () => {
jest.useFakeTimers()
const limiter = new TimeRateLimiter(2, 1000)
const mockFn = jest.fn().mockImplementation(async () => {
console.log('start#')
await new Promise((resolve) => setTimeout(resolve, 450))
console.log('finished#')
return 'result'
})

const operations = Promise.all([limiter.exec(mockFn), limiter.exec(mockFn), limiter.exec(mockFn)])

expect(mockFn).toHaveBeenCalledTimes(2)
expect(limiter.processingQueue.size).toBe(2)

jest.advanceTimersByTime(500)
await Promise.resolve()
await Promise.resolve()
jest.advanceTimersByTime(1000)
await Promise.resolve()

jest.advanceTimersByTime(2001)
await Promise.resolve()
await Promise.resolve()

expect(limiter.processingQueue.size).toBe(0)

expect(mockFn).toHaveBeenCalledTimes(3)

await operations
})

it('should wait for processing to complete', async () => {
jest.useFakeTimers()
const limiter = new TimeRateLimiter(2, 1000)
const mockFn = jest.fn().mockImplementation(async () => {
await new Promise((resolve) => setTimeout(resolve, 500))
return 'result'
})

const operation = limiter.exec(mockFn)
const waitPromise = limiter.waitProcessing().then(() => {
console.log('wait complete')
})

expect(limiter.processingQueue.size).toBe(1)

jest.advanceTimersByTime(1001)
await Promise.resolve()
await Promise.resolve()
await Promise.resolve()

await waitPromise
await operation
expect(limiter.processingQueue.size).toBe(0)
})
})
65 changes: 65 additions & 0 deletions packages/core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -839,3 +839,68 @@ export function pluginFilterTx (
systemTx = systemTx.filter((t) => !totalExcluded.has(t._id))
return systemTx
}

/**
* @public
*/

export class TimeRateLimiter {
idCounter: number = 0
processingQueue = new Map<number, Promise<void>>()
last: number = 0
rate: number
period: number
executions: { time: number, running: boolean }[] = []

queue: (() => Promise<void>)[] = []
notify: (() => void)[] = []

constructor (rate: number, period: number = 1000) {
this.rate = rate
this.period = period
}

private cleanupExecutions (): void {
const now = Date.now()
this.executions = this.executions.filter((time) => time.running || now - time.time < this.period)
}

async exec<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<T> {
const processingId = this.idCounter++

while (this.processingQueue.size >= this.rate || this.executions.length >= this.rate) {
this.cleanupExecutions()
if (this.executions.length < this.rate) {
break
}
await new Promise<void>((resolve) => {
setTimeout(resolve, this.period / this.rate)
})
}

const v = { time: Date.now(), running: true }
try {
this.executions.push(v)
const p = op(args)
this.processingQueue.set(processingId, p as Promise<void>)
return await p
} finally {
v.running = false
this.processingQueue.delete(processingId)
this.cleanupExecutions()
const n = this.notify.shift()
if (n !== undefined) {
n()
}
}
}

async waitProcessing (): Promise<void> {
while (this.processingQueue.size > 0) {
console.log('wait', this.processingQueue.size)
await new Promise<void>((resolve) => {
this.notify.push(resolve)
})
}
}
}
4 changes: 2 additions & 2 deletions services/github/pod-github/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export async function createPlatformClient (
workspace: string,
timeout: number,
reconnect?: (event: ClientConnectEvent, data: any) => Promise<void>
): Promise<Client> {
): Promise<{ client: Client, endpoint: string }> {
setMetadata(client.metadata.ClientSocketFactory, (url) => {
return new WebSocket(url, {
headers: {
Expand All @@ -45,5 +45,5 @@ export async function createPlatformClient (
onConnect: reconnect
})

return connection
return { client: connection, endpoint }
}
12 changes: 10 additions & 2 deletions services/github/pod-github/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ interface Config {
BrandingPath: string

WorkspaceInactivityInterval: number // Interval in days to stop workspace synchronization if not visited

// Limits
RateLimit: number
}

const envMap: { [key in keyof Config]: string } = {
Expand All @@ -55,7 +58,11 @@ const envMap: { [key in keyof Config]: string } = {
SentryDSN: 'SENTRY_DSN',
BrandingPath: 'BRANDING_PATH',

WorkspaceInactivityInterval: 'WORKSPACE_INACTIVITY_INTERVAL'
WorkspaceInactivityInterval: 'WORKSPACE_INACTIVITY_INTERVAL',

// Limits

RateLimit: 'RATE_LIMIT' // Operations per second for one transactor
}

const required: Array<keyof Config> = [
Expand Down Expand Up @@ -101,7 +108,8 @@ const config: Config = (() => {

SentryDSN: process.env[envMap.SentryDSN],
BrandingPath: process.env[envMap.BrandingPath] ?? '',
WorkspaceInactivityInterval: parseInt(process.env[envMap.WorkspaceInactivityInterval] ?? '5') // In days
WorkspaceInactivityInterval: parseInt(process.env[envMap.WorkspaceInactivityInterval] ?? '5'), // In days
RateLimit: parseInt(process.env[envMap.RateLimit] ?? '25')
}

const missingEnv = required.filter((key) => params[key] === undefined).map((key) => envMap[key])
Expand Down
16 changes: 14 additions & 2 deletions services/github/pod-github/src/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import core, {
RateLimiter,
Ref,
systemAccountEmail,
TimeRateLimiter,
TxOperations
} from '@hcengineering/core'
import github, { GithubAuthentication, makeQuery, type GithubIntegration } from '@hcengineering/github'
Expand Down Expand Up @@ -73,6 +74,8 @@ export class PlatformWorker {

userManager!: UserManager

rateLimits = new Map<string, TimeRateLimiter>()

private constructor (
readonly ctx: MeasureContext,
readonly app: App,
Expand All @@ -83,6 +86,15 @@ export class PlatformWorker {
registerLoaders()
}

getRateLimiter (endpoint: string): TimeRateLimiter {
let limiter = this.rateLimits.get(endpoint)
if (limiter === undefined) {
limiter = new TimeRateLimiter(config.RateLimit)
this.rateLimits.set(endpoint, limiter)
}
return limiter
}

public async initStorage (): Promise<void> {
this.mongoRef = getMongoClient(config.MongoURL)
const mongoClient = await this.mongoRef.getClient()
Expand Down Expand Up @@ -230,7 +242,7 @@ export class PlatformWorker {
} else {
let client: Client | undefined
try {
client = await createPlatformClient(oldWorkspace, 30000)
;({ client } = await createPlatformClient(oldWorkspace, 30000))
await this.removeInstallationFromWorkspace(oldWorker, installationId)
await client.close()
} catch (err: any) {
Expand Down Expand Up @@ -386,7 +398,7 @@ export class PlatformWorker {
platformClient = this.clients.get(payload.workspace)?.client
if (platformClient === undefined) {
shouldClose = true
platformClient = await createPlatformClient(payload.workspace, 30000)
;({ client: platformClient } = await createPlatformClient(payload.workspace, 30000))
}
const client = new TxOperations(platformClient, payload.accountId)

Expand Down
9 changes: 9 additions & 0 deletions services/github/pod-github/src/sync/comments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ export class CommentSyncManager implements DocSyncManager {
}
}

isHulyLinkComment (message: string): boolean {
return message.includes('<p>Connected to') && message.includes('Huly&reg;')
}

private async createComment (
info: DocSyncInfo,
messageData: MessageData,
Expand All @@ -367,6 +371,11 @@ export class CommentSyncManager implements DocSyncManager {
...messageData,
attachments: 0
}
// Check if it is Connected message.
if ((comment as any).performed_via_github_app !== undefined && this.isHulyLinkComment(comment.body)) {
// No need to create comment on platform.
return
}
await this.client.addCollection(
chunter.class.ChatMessage,
info.space,
Expand Down
Loading
Loading