diff --git a/.github/workflows/docker-build.yml b/.github/workflows/docker-build.yml index 966f79f65..7db238c49 100644 --- a/.github/workflows/docker-build.yml +++ b/.github/workflows/docker-build.yml @@ -1,19 +1,20 @@ name: Docker Build and Push on: + workflow_dispatch: push: # branches: [ "main" ] paths: - - 'apps/api/**' - - 'apps/worker/**' - - 'apps/public/**' - - 'packages/**' - - '!packages/sdks/**' - - '**Dockerfile' - - '.github/workflows/**' + - "apps/api/**" + - "apps/worker/**" + - "apps/public/**" + - "packages/**" + - "!packages/sdks/**" + - "**Dockerfile" + - ".github/workflows/**" env: - repo_owner: 'openpanel-dev' + repo_owner: "openpanel-dev" jobs: changes: @@ -27,7 +28,7 @@ jobs: - uses: dorny/paths-filter@v2 id: filter with: - base: 'main' + base: "main" filters: | api: - 'apps/api/**' @@ -46,17 +47,27 @@ jobs: needs: changes if: ${{ needs.changes.outputs.api == 'true' || needs.changes.outputs.worker == 'true' || needs.changes.outputs.public == 'true' }} runs-on: ubuntu-latest + services: + redis: + image: redis:7-alpine + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping || exit 1" + --health-interval 5s + --health-timeout 3s + --health-retries 20 steps: - uses: actions/checkout@v4 - + - name: Setup Node.js uses: actions/setup-node@v4 with: - node-version: '20' - + node-version: "20" + - name: Install pnpm uses: pnpm/action-setup@v4 - + - name: Get pnpm store directory shell: bash run: | @@ -69,21 +80,21 @@ jobs: key: ${{ runner.os }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }} restore-keys: | ${{ runner.os }}-pnpm-store- - + - name: Install dependencies run: pnpm install - + - name: Codegen run: pnpm codegen - + # - name: Run Biome # run: pnpm lint - + - name: Run TypeScript checks run: pnpm typecheck - - # - name: Run tests - # run: pnpm test + + - name: Run tests + run: pnpm test build-and-push-api: permissions: @@ -91,7 +102,7 @@ jobs: needs: [changes, lint-and-test] if: ${{ needs.changes.outputs.api == 'true' }} runs-on: ubuntu-latest - steps: + steps: - name: Checkout repository uses: actions/checkout@v4 @@ -118,14 +129,14 @@ jobs: ghcr.io/${{ env.repo_owner }}/api:${{ github.sha }} build-args: | DATABASE_URL=postgresql://dummy:dummy@localhost:5432/dummy - + build-and-push-worker: permissions: packages: write needs: [changes, lint-and-test] if: ${{ needs.changes.outputs.worker == 'true' }} runs-on: ubuntu-latest - steps: + steps: - name: Checkout repository uses: actions/checkout@v4 @@ -151,4 +162,4 @@ jobs: ghcr.io/${{ env.repo_owner }}/worker:latest ghcr.io/${{ env.repo_owner }}/worker:${{ github.sha }} build-args: | - DATABASE_URL=postgresql://dummy:dummy@localhost:5432/dummy \ No newline at end of file + DATABASE_URL=postgresql://dummy:dummy@localhost:5432/dummy diff --git a/apps/api/package.json b/apps/api/package.json index d1b431baf..3f605803f 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -38,6 +38,7 @@ "fastify": "^5.2.1", "fastify-metrics": "^12.1.0", "fastify-raw-body": "^5.0.0", + "groupmq": "1.0.0-next.13", "ico-to-png": "^0.2.2", "jsonwebtoken": "^9.0.2", "ramda": "^0.29.1", diff --git a/apps/api/scripts/mock.ts b/apps/api/scripts/mock.ts index d6f35373c..0e61a55ef 100644 --- a/apps/api/scripts/mock.ts +++ b/apps/api/scripts/mock.ts @@ -3,6 +3,7 @@ import * as faker from '@faker-js/faker'; import { generateId } from '@openpanel/common'; import { hashPassword } from '@openpanel/common/server'; import { ClientType, db } from '@openpanel/db'; +import { getRedisCache } from '@openpanel/redis'; import { v4 as uuidv4 } from 'uuid'; const DOMAIN_COUNT = 5; @@ -260,6 +261,8 @@ function insertFakeEvents(events: Event[]) { } async function simultaneousRequests() { + await getRedisCache().flushdb(); + await new Promise((resolve) => setTimeout(resolve, 1000)); const sessions: { ip: string; referrer: string; @@ -272,9 +275,11 @@ async function simultaneousRequests() { userAgent: 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', track: [ - { name: 'screen_view', path: '/home' }, - { name: 'button_click', element: 'signup' }, - { name: 'screen_view', path: '/pricing' }, + { name: 'screen_view', path: '/home', parallel: '1' }, + { name: 'button_click', element: 'signup', parallel: '1' }, + { name: 'article_viewed', articleId: '123', parallel: '1' }, + { name: 'screen_view', path: '/pricing', parallel: '1' }, + { name: 'screen_view', path: '/blog', parallel: '1' }, ], }, { @@ -361,8 +366,9 @@ async function simultaneousRequests() { { name: 'screen_view', path: '/landing' }, { name: 'screen_view', path: '/pricing' }, { name: 'screen_view', path: '/blog' }, - { name: 'screen_view', path: '/blog/post-1' }, - { name: 'screen_view', path: '/blog/post-2' }, + { name: 'screen_view', path: '/blog/post-1', parallel: '1' }, + { name: 'screen_view', path: '/blog/post-2', parallel: '1' }, + { name: 'button_click', element: 'learn_more', parallel: '1' }, { name: 'screen_view', path: '/blog/post-3' }, { name: 'screen_view', path: '/blog/post-4' }, ], @@ -396,21 +402,85 @@ async function simultaneousRequests() { }; for (const session of sessions) { + // Group tracks by parallel flag + const trackGroups: { parallel?: string; tracks: any[] }[] = []; + let currentGroup: { parallel?: string; tracks: any[] } = { tracks: [] }; + for (const track of session.track) { - const { name, ...properties } = track; - screenView.track.payload.name = name ?? ''; - screenView.track.payload.properties.__referrer = session.referrer ?? ''; - if (name === 'screen_view') { - screenView.track.payload.properties.__path = - (screenView.headers.origin ?? '') + (properties.path ?? ''); + if (track.parallel) { + // If this track has a parallel flag + if (currentGroup.parallel === track.parallel) { + // Same parallel group, add to current group + currentGroup.tracks.push(track); + } else { + // Different parallel group, finish current group and start new one + if (currentGroup.tracks.length > 0) { + trackGroups.push(currentGroup); + } + currentGroup = { parallel: track.parallel, tracks: [track] }; + } } else { - screenView.track.payload.name = track.name ?? ''; - screenView.track.payload.properties = properties; + // No parallel flag, finish any parallel group and start individual track + if (currentGroup.tracks.length > 0) { + trackGroups.push(currentGroup); + } + currentGroup = { tracks: [track] }; } - screenView.headers['x-client-ip'] = session.ip; - screenView.headers['user-agent'] = session.userAgent; - await trackit(screenView); - await new Promise((resolve) => setTimeout(resolve, Math.random() * 5000)); + } + + // Add the last group + if (currentGroup.tracks.length > 0) { + trackGroups.push(currentGroup); + } + + // Process each group + for (const group of trackGroups) { + if (group.parallel && group.tracks.length > 1) { + // Parallel execution for same-flagged tracks + console.log( + `Firing ${group.tracks.length} parallel requests with flag '${group.parallel}'`, + ); + const promises = group.tracks.map(async (track) => { + const { name, parallel, ...properties } = track; + const event = JSON.parse(JSON.stringify(screenView)); + event.track.payload.name = name ?? ''; + event.track.payload.properties.__referrer = session.referrer ?? ''; + if (name === 'screen_view') { + event.track.payload.properties.__path = + (event.headers.origin ?? '') + (properties.path ?? ''); + } else { + event.track.payload.name = track.name ?? ''; + event.track.payload.properties = properties; + } + event.headers['x-client-ip'] = session.ip; + event.headers['user-agent'] = session.userAgent; + return trackit(event); + }); + + await Promise.all(promises); + console.log(`Completed ${group.tracks.length} parallel requests`); + } else { + // Sequential execution for individual tracks + for (const track of group.tracks) { + const { name, parallel, ...properties } = track; + screenView.track.payload.name = name ?? ''; + screenView.track.payload.properties.__referrer = + session.referrer ?? ''; + if (name === 'screen_view') { + screenView.track.payload.properties.__path = + (screenView.headers.origin ?? '') + (properties.path ?? ''); + } else { + screenView.track.payload.name = track.name ?? ''; + screenView.track.payload.properties = properties; + } + screenView.headers['x-client-ip'] = session.ip; + screenView.headers['user-agent'] = session.userAgent; + await trackit(screenView); + } + } + + // Add delay between groups (not within parallel groups) + // await new Promise((resolve) => setTimeout(resolve, Math.random() * 100)); } } } diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index bb911908f..dcecedbb1 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -3,8 +3,8 @@ import type { FastifyReply, FastifyRequest } from 'fastify'; import { generateDeviceId } from '@openpanel/common/server'; import { getSalts } from '@openpanel/db'; -import { eventsQueue } from '@openpanel/queue'; -import { getLock } from '@openpanel/redis'; +import { eventsGroupQueue, eventsQueue } from '@openpanel/queue'; +import { getLock, getRedisCache } from '@openpanel/redis'; import type { PostEventPayload } from '@openpanel/sdk'; import { checkDuplicatedEvent } from '@/utils/deduplicate'; @@ -17,10 +17,14 @@ export async function postEvent( }>, reply: FastifyReply, ) { - const timestamp = getTimestamp(request.timestamp, request.body); + const { timestamp, isTimestampFromThePast } = getTimestamp( + request.timestamp, + request.body, + ); const ip = getClientIp(request)!; const ua = request.headers['user-agent']!; const projectId = request.client?.projectId; + const headers = getStringHeaders(request.headers); if (!projectId) { reply.status(400).send('missing origin'); @@ -56,31 +60,54 @@ export async function postEvent( return; } - await eventsQueue.add( - 'event', - { - type: 'incomingEvent', - payload: { + const isGroupQueue = await getRedisCache().exists('group_queue'); + if (isGroupQueue) { + const groupId = request.body?.profileId + ? `${projectId}:${request.body?.profileId}` + : currentDeviceId; + await eventsGroupQueue.add({ + orderMs: new Date(timestamp).getTime(), + data: { projectId, - headers: getStringHeaders(request.headers), + headers, event: { ...request.body, - timestamp: timestamp.timestamp, - isTimestampFromThePast: timestamp.isTimestampFromThePast, + timestamp, + isTimestampFromThePast, }, geo, currentDeviceId, previousDeviceId, }, - }, - { - attempts: 3, - backoff: { - type: 'exponential', - delay: 200, + groupId, + }); + } else { + await eventsQueue.add( + 'event', + { + type: 'incomingEvent', + payload: { + projectId, + headers, + event: { + ...request.body, + timestamp, + isTimestampFromThePast, + }, + geo, + currentDeviceId, + previousDeviceId, + }, }, - }, - ); + { + attempts: 3, + backoff: { + type: 'exponential', + delay: 200, + }, + }, + ); + } reply.status(202).send('ok'); } diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index ff618f4c2..e9760f97e 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -6,13 +6,14 @@ import { checkDuplicatedEvent } from '@/utils/deduplicate'; import { generateDeviceId, parseUserAgent } from '@openpanel/common/server'; import { getProfileById, getSalts, upsertProfile } from '@openpanel/db'; import { type GeoLocation, getGeoLocation } from '@openpanel/geo'; -import { eventsQueue } from '@openpanel/queue'; -import { getLock } from '@openpanel/redis'; +import { eventsGroupQueue, eventsQueue } from '@openpanel/queue'; +import { getLock, getRedisCache } from '@openpanel/redis'; import type { DecrementPayload, IdentifyPayload, IncrementPayload, TrackHandlerPayload, + TrackPayload, } from '@openpanel/sdk'; export function getStringHeaders(headers: FastifyRequest['headers']) { @@ -260,11 +261,6 @@ export async function handler( reply.status(200).send(); } -type TrackPayload = { - name: string; - properties?: Record; -}; - async function track({ payload, currentDeviceId, @@ -284,11 +280,14 @@ async function track({ timestamp: string; isTimestampFromThePast: boolean; }) { - await eventsQueue.add( - 'event', - { - type: 'incomingEvent', - payload: { + const isGroupQueue = await getRedisCache().exists('group_queue'); + if (isGroupQueue) { + const groupId = payload.profileId + ? `${projectId}:${payload.profileId}` + : currentDeviceId; + await eventsGroupQueue.add({ + orderMs: new Date(timestamp).getTime(), + data: { projectId, headers, event: { @@ -300,15 +299,35 @@ async function track({ currentDeviceId, previousDeviceId, }, - }, - { - attempts: 3, - backoff: { - type: 'exponential', - delay: 200, + groupId, + }); + } else { + await eventsQueue.add( + 'event', + { + type: 'incomingEvent', + payload: { + projectId, + headers, + event: { + ...payload, + timestamp, + isTimestampFromThePast, + }, + geo, + currentDeviceId, + previousDeviceId, + }, }, - }, - ); + { + attempts: 3, + backoff: { + type: 'exponential', + delay: 200, + }, + }, + ); + } } async function identify({ diff --git a/apps/worker/package.json b/apps/worker/package.json index 5bc0551b6..abd4193f4 100644 --- a/apps/worker/package.json +++ b/apps/worker/package.json @@ -15,14 +15,15 @@ "@bull-board/express": "5.21.0", "@openpanel/common": "workspace:*", "@openpanel/db": "workspace:*", + "@openpanel/email": "workspace:*", "@openpanel/integrations": "workspace:^", "@openpanel/json": "workspace:*", "@openpanel/logger": "workspace:*", "@openpanel/queue": "workspace:*", "@openpanel/redis": "workspace:*", - "@openpanel/email": "workspace:*", "bullmq": "^5.8.7", "express": "^4.18.2", + "groupmq": "1.0.0-next.13", "prom-client": "^15.1.3", "ramda": "^0.29.1", "source-map-support": "^0.5.21", diff --git a/apps/worker/src/boot-workers.ts b/apps/worker/src/boot-workers.ts index 1532a9ff1..c33d52f7b 100644 --- a/apps/worker/src/boot-workers.ts +++ b/apps/worker/src/boot-workers.ts @@ -2,18 +2,24 @@ import type { Queue, WorkerOptions } from 'bullmq'; import { Worker } from 'bullmq'; import { + type EventsQueuePayloadIncomingEvent, cronQueue, + eventsGroupQueue, eventsQueue, miscQueue, notificationQueue, + queueLogger, sessionsQueue, } from '@openpanel/queue'; import { getRedisQueue } from '@openpanel/redis'; import { performance } from 'node:perf_hooks'; import { setTimeout as sleep } from 'node:timers/promises'; +import { Worker as GroupWorker } from 'groupmq'; + import { cronJob } from './jobs/cron'; import { eventsJob } from './jobs/events'; +import { incomingEventPure } from './jobs/events.incoming-event'; import { miscJob } from './jobs/misc'; import { notificationJob } from './jobs/notification'; import { sessionsJob } from './jobs/sessions'; @@ -21,10 +27,24 @@ import { logger } from './utils/logger'; const workerOptions: WorkerOptions = { connection: getRedisQueue(), - concurrency: Number.parseInt(process.env.CONCURRENCY || '1', 10), }; export async function bootWorkers() { + const eventsGroupWorker = new GroupWorker< + EventsQueuePayloadIncomingEvent['payload'] + >({ + concurrency: Number.parseInt(process.env.EVENT_JOB_CONCURRENCY || '1', 10), + logger: queueLogger, + queue: eventsGroupQueue, + handler: async (job) => { + logger.info('processing event (group queue)', { + groupId: job.groupId, + timestamp: job.data.event.timestamp, + }); + await incomingEventPure(job.data); + }, + }); + eventsGroupWorker.run(); const eventsWorker = new Worker(eventsQueue.name, eventsJob, workerOptions); const sessionsWorker = new Worker( sessionsQueue.name, @@ -45,29 +65,30 @@ export async function bootWorkers() { cronWorker, notificationWorker, miscWorker, + eventsGroupWorker, ]; workers.forEach((worker) => { - worker.on('error', (error) => { + (worker as Worker).on('error', (error) => { logger.error('worker error', { worker: worker.name, error, }); }); - worker.on('closed', () => { + (worker as Worker).on('closed', () => { logger.info('worker closed', { worker: worker.name, }); }); - worker.on('ready', () => { + (worker as Worker).on('ready', () => { logger.info('worker ready', { worker: worker.name, }); }); - worker.on('failed', (job) => { + (worker as Worker).on('failed', (job) => { if (job) { logger.error('job failed', { worker: worker.name, @@ -78,7 +99,7 @@ export async function bootWorkers() { } }); - worker.on('completed', (job) => { + (worker as Worker).on('completed', (job) => { if (job) { logger.info('job completed', { worker: worker.name, @@ -91,7 +112,7 @@ export async function bootWorkers() { } }); - worker.on('ioredis:close', () => { + (worker as Worker).on('ioredis:close', () => { logger.error('worker closed due to ioredis:close', { worker: worker.name, }); diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index a3d960a5d..5f5129677 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -6,6 +6,7 @@ import express from 'express'; import { createInitialSalts } from '@openpanel/db'; import { cronQueue, + eventsGroupQueue, eventsQueue, miscQueue, notificationQueue, @@ -13,6 +14,7 @@ import { } from '@openpanel/queue'; import client from 'prom-client'; +import { BullBoardGroupMQAdapter } from 'groupmq'; import sourceMapSupport from 'source-map-support'; import { bootCron } from './boot-cron'; import { bootWorkers } from './boot-workers'; @@ -33,6 +35,7 @@ async function start() { serverAdapter.setBasePath('/'); createBullBoard({ queues: [ + new BullBoardGroupMQAdapter(eventsGroupQueue) as any, new BullMQAdapter(eventsQueue), new BullMQAdapter(sessionsQueue), new BullMQAdapter(cronQueue), diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 693e4129b..cae6aee93 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -45,6 +45,14 @@ async function createEventAndNotify( export async function incomingEvent( job: Job, token?: string, +) { + return incomingEventPure(job.data.payload, job, token); +} + +export async function incomingEventPure( + jobPayload: EventsQueuePayloadIncomingEvent['payload'], + job?: Job, + token?: string, ) { const { geo, @@ -53,7 +61,7 @@ export async function incomingEvent( projectId, currentDeviceId, previousDeviceId, - } = job.data.payload; + } = jobPayload; const properties = body.properties ?? {}; const reqId = headers['request-id'] ?? 'unknown'; const logger = baseLogger.child({ @@ -151,11 +159,7 @@ export async function incomingEvent( origin: screenView?.origin ?? baseEvent.origin, }; - return createEventAndNotify( - payload as IServiceEvent, - job.data.payload, - logger, - ); + return createEventAndNotify(payload as IServiceEvent, jobPayload, logger); } const sessionEnd = await getSessionEnd({ @@ -186,21 +190,22 @@ export async function incomingEvent( if (!sessionEnd) { // Too avoid several created sessions we just throw if a lock exists // This will than retry the job - const lock = await getLock( - `create-session-end:${currentDeviceId}`, - 'locked', - 1000, - ); - - if (!lock) { - logger.warn('Move incoming event to delayed'); - await job.moveToDelayed(Date.now() + 50, token); - throw new DelayedError(); + if (job) { + const lock = await getLock( + `create-session-end:${currentDeviceId}`, + 'locked', + 1000, + ); + + if (!lock) { + await job.moveToDelayed(Date.now() + 50, token); + throw new DelayedError(); + } } await createSessionStart({ payload }); } - const event = await createEventAndNotify(payload, job.data.payload, logger); + const event = await createEventAndNotify(payload, jobPayload, logger); if (!sessionEnd) { await createSessionEndJob({ payload }); diff --git a/apps/worker/src/metrics.ts b/apps/worker/src/metrics.ts index 1bce23335..ef7cae39c 100644 --- a/apps/worker/src/metrics.ts +++ b/apps/worker/src/metrics.ts @@ -7,13 +7,18 @@ import { profileBuffer, sessionBuffer, } from '@openpanel/db'; -import { cronQueue, eventsQueue, sessionsQueue } from '@openpanel/queue'; +import { + cronQueue, + eventsGroupQueue, + eventsQueue, + sessionsQueue, +} from '@openpanel/queue'; const Registry = client.Registry; export const register = new Registry(); -const queues = [eventsQueue, sessionsQueue, cronQueue]; +const queues = [eventsQueue, sessionsQueue, cronQueue, eventsGroupQueue]; queues.forEach((queue) => { register.registerMetric( diff --git a/package.json b/package.json index 8ce7dade7..c0efdb269 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "author": "Carl-Gerhard Lindesvärd", "packageManager": "pnpm@9.15.0", "scripts": { - "test": "vitest", + "test": "vitest run", "gen:bots": "pnpm -r --filter api gen:bots", "gen:referrers": "pnpm -r --filter worker gen:referrers", "dock:up": "docker compose up -d", diff --git a/packages/db/src/buffers/base-buffer.ts b/packages/db/src/buffers/base-buffer.ts index 83aaca596..21f66e9cf 100644 --- a/packages/db/src/buffers/base-buffer.ts +++ b/packages/db/src/buffers/base-buffer.ts @@ -1,6 +1,6 @@ import { generateSecureId } from '@openpanel/common/server/id'; import { type ILogger, createLogger } from '@openpanel/logger'; -import { getRedisCache } from '@openpanel/redis'; +import { getRedisCache, runEvery } from '@openpanel/redis'; export class BaseBuffer { name: string; @@ -9,6 +9,8 @@ export class BaseBuffer { lockTimeout = 60; onFlush: () => void; + protected bufferCounterKey: string; + constructor(options: { name: string; onFlush: () => Promise; @@ -17,6 +19,7 @@ export class BaseBuffer { this.name = options.name; this.lockKey = `lock:${this.name}`; this.onFlush = options.onFlush; + this.bufferCounterKey = `${this.name}:buffer:count`; } protected chunks(items: T[], size: number) { @@ -27,6 +30,53 @@ export class BaseBuffer { return chunks; } + /** + * Utility method to safely get buffer size with counter fallback + */ + protected async getBufferSizeWithCounter( + fallbackFn: () => Promise, + ): Promise { + const key = this.bufferCounterKey; + try { + await runEvery({ + interval: 60 * 15, + key: `${this.name}-buffer:resync`, + fn: async () => { + try { + const actual = await fallbackFn(); + await getRedisCache().set(this.bufferCounterKey, actual.toString()); + } catch (error) { + this.logger.warn('Failed to resync buffer counter', { error }); + } + }, + }).catch(() => {}); + + const counterValue = await getRedisCache().get(key); + if (counterValue !== null) { + const parsed = Number.parseInt(counterValue, 10); + if (!Number.isNaN(parsed)) { + return Math.max(0, parsed); + } + // Corrupted value → treat as missing + this.logger.warn('Invalid buffer counter value, reinitializing', { + key, + counterValue, + }); + } + + // Initialize counter with current size + const count = await fallbackFn(); + await getRedisCache().set(key, count.toString()); + return count; + } catch (error) { + this.logger.warn( + 'Failed to get buffer size from counter, using fallback', + { error }, + ); + return fallbackFn(); + } + } + private async releaseLock(lockId: string): Promise { this.logger.debug('Releasing lock...'); const script = ` @@ -60,6 +110,11 @@ export class BaseBuffer { error, lockId, }); + // On error, we might want to reset counter to avoid drift + if (this.bufferCounterKey) { + this.logger.warn('Resetting buffer counter due to flush error'); + await getRedisCache().del(this.bufferCounterKey); + } } finally { await this.releaseLock(lockId); this.logger.info('Flush completed', { diff --git a/packages/db/src/buffers/bot-buffer-redis.ts b/packages/db/src/buffers/bot-buffer.ts similarity index 72% rename from packages/db/src/buffers/bot-buffer-redis.ts rename to packages/db/src/buffers/bot-buffer.ts index 723bcddf2..b98f68b7d 100644 --- a/packages/db/src/buffers/bot-buffer-redis.ts +++ b/packages/db/src/buffers/bot-buffer.ts @@ -24,11 +24,15 @@ export class BotBuffer extends BaseBuffer { async add(event: IClickhouseBotEvent) { try { - // Add event to Redis list - await this.redis.rpush(this.redisKey, JSON.stringify(event)); + // Add event and increment counter atomically + await this.redis + .multi() + .rpush(this.redisKey, JSON.stringify(event)) + .incr(this.bufferCounterKey) + .exec(); - // Check buffer length - const bufferLength = await this.redis.llen(this.redisKey); + // Check buffer length using counter (fallback to LLEN if missing) + const bufferLength = await this.getBufferSize(); if (bufferLength >= this.batchSize) { await this.tryFlush(); @@ -60,8 +64,12 @@ export class BotBuffer extends BaseBuffer { format: 'JSONEachRow', }); - // Only remove events after successful insert - await this.redis.ltrim(this.redisKey, events.length, -1); + // Only remove events after successful insert and update counter + await this.redis + .multi() + .ltrim(this.redisKey, events.length, -1) + .decrby(this.bufferCounterKey, events.length) + .exec(); this.logger.info('Processed bot events', { count: events.length, @@ -72,6 +80,6 @@ export class BotBuffer extends BaseBuffer { } async getBufferSize() { - return getRedisCache().llen(this.redisKey); + return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey)); } } diff --git a/packages/db/src/buffers/event-buffer.test.ts b/packages/db/src/buffers/event-buffer.test.ts new file mode 100644 index 000000000..ddbbe3b11 --- /dev/null +++ b/packages/db/src/buffers/event-buffer.test.ts @@ -0,0 +1,503 @@ +import { getRedisCache } from '@openpanel/redis'; +import { + afterAll, + beforeAll, + beforeEach, + describe, + expect, + it, + vi, +} from 'vitest'; +import { ch } from '../clickhouse/client'; + +// Mock transformEvent to avoid circular dependency with buffers -> services -> buffers +vi.mock('../services/event.service', () => ({ + transformEvent: (event: any) => ({ + id: event.id ?? 'id', + name: event.name, + deviceId: event.device_id, + profileId: event.profile_id, + projectId: event.project_id, + sessionId: event.session_id, + properties: event.properties ?? {}, + createdAt: new Date(event.created_at ?? Date.now()), + country: event.country, + city: event.city, + region: event.region, + longitude: event.longitude, + latitude: event.latitude, + os: event.os, + osVersion: event.os_version, + browser: event.browser, + browserVersion: event.browser_version, + device: event.device, + brand: event.brand, + model: event.model, + duration: event.duration ?? 0, + path: event.path ?? '', + origin: event.origin ?? '', + referrer: event.referrer, + referrerName: event.referrer_name, + referrerType: event.referrer_type, + meta: event.meta, + importedAt: undefined, + sdkName: event.sdk_name, + sdkVersion: event.sdk_version, + profile: event.profile, + }), +})); + +import { EventBuffer } from './event-buffer'; + +const redis = getRedisCache(); + +beforeEach(async () => { + await redis.flushdb(); +}); + +afterAll(async () => { + try { + await redis.quit(); + } catch {} +}); + +describe('EventBuffer with real Redis', () => { + let eventBuffer: EventBuffer; + + beforeEach(() => { + eventBuffer = new EventBuffer(); + }); + + it('keeps a single screen_view pending until a subsequent event arrives', async () => { + const screenView = { + project_id: 'p1', + profile_id: 'u1', + session_id: 'session_a', + name: 'screen_view', + created_at: new Date().toISOString(), + } as any; + + await eventBuffer.add(screenView); + + // Not eligible for processing yet (only 1 event in session) + await eventBuffer.processBuffer(); + + const sessionKey = `event_buffer:session:${screenView.session_id}`; + const events = await redis.lrange(sessionKey, 0, -1); + expect(events.length).toBe(1); + expect(JSON.parse(events[0]!)).toMatchObject({ + session_id: 'session_a', + name: 'screen_view', + }); + }); + + it('processes two screen_view events and leaves only the last one pending', async () => { + const t0 = Date.now(); + const first = { + project_id: 'p1', + profile_id: 'u1', + session_id: 'session_b', + name: 'screen_view', + created_at: new Date(t0).toISOString(), + } as any; + const second = { + project_id: 'p1', + profile_id: 'u1', + session_id: 'session_b', + name: 'screen_view', + created_at: new Date(t0 + 1000).toISOString(), + } as any; + + await eventBuffer.add(first); + await eventBuffer.add(second); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValueOnce(undefined as any); + + await eventBuffer.processBuffer(); + + // First screen_view should be flushed to ClickHouse, second should remain pending in Redis + expect(insertSpy).toHaveBeenCalledWith({ + format: 'JSONEachRow', + table: 'events', + values: [ + { + ...first, + duration: 1000, + }, + ], + }); + + const sessionKey = `event_buffer:session:${first.session_id}`; + const storedEvents = await redis.lrange(sessionKey, 0, -1); + expect(storedEvents.length).toBe(1); + const remaining = JSON.parse(storedEvents[0]!); + expect(remaining).toMatchObject({ + session_id: 'session_b', + name: 'screen_view', + created_at: second.created_at, + }); + }); + + it('clears session when a session_end event arrives', async () => { + const t0 = Date.now(); + const first = { + project_id: 'p1', + profile_id: 'u1', + session_id: 'session_c', + name: 'screen_view', + created_at: new Date(t0).toISOString(), + } as any; + const end = { + project_id: 'p1', + profile_id: 'u1', + session_id: 'session_c', + name: 'session_end', + created_at: new Date(t0 + 1000).toISOString(), + } as any; + + await eventBuffer.add(first); + await eventBuffer.add(end); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValue(undefined as any); + + await eventBuffer.processBuffer(); + + // Both events should be flushed, leaving no pending session events + expect(insertSpy).toHaveBeenCalledWith({ + format: 'JSONEachRow', + table: 'events', + values: [first, end], + }); + const sessionKey = `event_buffer:session:${first.session_id}`; + const storedEvents = await redis.lrange(sessionKey, 0, -1); + expect(storedEvents.length).toBe(0); + }); + + it('queues and processes non-session events in regular queue', async () => { + const event = { + project_id: 'p2', + name: 'custom_event', + created_at: new Date().toISOString(), + } as any; + + await eventBuffer.add(event); + + // Should be in regular queue + const regularQueueKey = 'event_buffer:regular_queue'; + expect(await redis.llen(regularQueueKey)).toBe(1); + + // Buffer counter should reflect outstanding = 1 + expect(await eventBuffer.getBufferSize()).toBe(1); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValueOnce(undefined as any); + await eventBuffer.processBuffer(); + + // Regular queue should be trimmed + expect(await redis.llen(regularQueueKey)).toBe(0); + expect(insertSpy).toHaveBeenCalled(); + + // Buffer counter back to 0 + expect(await eventBuffer.getBufferSize()).toBe(0); + }); + + it('adds session to ready set at 2 events and removes after processing', async () => { + const s = 'session_ready'; + const e1 = { + project_id: 'p3', + profile_id: 'u3', + session_id: s, + name: 'screen_view', + created_at: new Date().toISOString(), + } as any; + const e2 = { + ...e1, + created_at: new Date(Date.now() + 1000).toISOString(), + } as any; + + await eventBuffer.add(e1); + + // One event -> not ready + expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); + + await eventBuffer.add(e2); + + // Two events -> ready + expect(await redis.zscore('event_buffer:ready_sessions', s)).not.toBeNull(); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValueOnce(undefined as any); + await eventBuffer.processBuffer(); + + // After processing with one pending left, session should be removed from ready set + expect(await redis.zscore('event_buffer:ready_sessions', s)).toBeNull(); + expect(insertSpy).toHaveBeenCalled(); + }); + + it('sets last screen_view key and clears it on session_end', async () => { + const projectId = 'p4'; + const profileId = 'u4'; + const sessionId = 'session_last'; + const lastKey = `session:last_screen_view:${projectId}:${profileId}`; + + const view = { + project_id: projectId, + profile_id: profileId, + session_id: sessionId, + name: 'screen_view', + created_at: new Date().toISOString(), + } as any; + + await eventBuffer.add(view); + + // Should be set in Redis + expect(await redis.get(lastKey)).not.toBeNull(); + + const end = { + project_id: projectId, + profile_id: profileId, + session_id: sessionId, + name: 'session_end', + created_at: new Date(Date.now() + 1000).toISOString(), + } as any; + + await eventBuffer.add(end); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValueOnce(undefined as any); + await eventBuffer.processBuffer(); + + // Key should be deleted by session_end + expect(await redis.get(lastKey)).toBeNull(); + expect(insertSpy).toHaveBeenCalled(); + }); + + it('getLastScreenView works for profile and session queries', async () => { + const projectId = 'p5'; + const profileId = 'u5'; + const sessionId = 'session_glsv'; + + const view = { + project_id: projectId, + profile_id: profileId, + session_id: sessionId, + name: 'screen_view', + created_at: new Date().toISOString(), + } as any; + + await eventBuffer.add(view); + + const byProfile = await eventBuffer.getLastScreenView({ + projectId, + profileId, + }); + + if (!byProfile) { + throw new Error('byProfile is null'); + } + + expect(byProfile.name).toBe('screen_view'); + + const bySession = await eventBuffer.getLastScreenView({ + projectId, + sessionId, + }); + + if (!bySession) { + throw new Error('bySession is null'); + } + + expect(bySession.name).toBe('screen_view'); + }); + + it('buffer counter reflects pending after processing 2 screen_view events', async () => { + const sessionId = 'session_counter'; + const a = { + project_id: 'p6', + profile_id: 'u6', + session_id: sessionId, + name: 'screen_view', + created_at: new Date().toISOString(), + } as any; + const b = { + ...a, + created_at: new Date(Date.now() + 1000).toISOString(), + } as any; + + await eventBuffer.add(a); + await eventBuffer.add(b); + + // Counter counts enqueued items + expect(await eventBuffer.getBufferSize()).toBeGreaterThanOrEqual(2); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValueOnce(undefined as any); + await eventBuffer.processBuffer(); + + // One pending screen_view left -> counter should be 1 + expect(await eventBuffer.getBufferSize()).toBe(1); + expect(insertSpy).toHaveBeenCalled(); + }); + + it('inserts in chunks according to EVENT_BUFFER_CHUNK_SIZE', async () => { + const prev = process.env.EVENT_BUFFER_CHUNK_SIZE; + process.env.EVENT_BUFFER_CHUNK_SIZE = '1'; + const eb = new EventBuffer(); + + const e1 = { + project_id: 'pc', + name: 'ev1', + created_at: new Date().toISOString(), + } as any; + const e2 = { + project_id: 'pc', + name: 'ev2', + created_at: new Date(Date.now() + 1).toISOString(), + } as any; + + await eb.add(e1); + await eb.add(e2); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValue(undefined as any); + + await eb.processBuffer(); + + // With chunk size 1 and two events, insert should be called twice + expect(insertSpy.mock.calls.length).toBeGreaterThanOrEqual(2); + + // Restore env + if (prev === undefined) delete process.env.EVENT_BUFFER_CHUNK_SIZE; + else process.env.EVENT_BUFFER_CHUNK_SIZE = prev; + }); + + it('counts active visitors after adding an event with profile', async () => { + const e = { + project_id: 'p7', + profile_id: 'u7', + name: 'custom', + created_at: new Date().toISOString(), + } as any; + + await eventBuffer.add(e); + + const count = await eventBuffer.getActiveVisitorCount('p7'); + expect(count).toBeGreaterThanOrEqual(1); + }); + + it('batches pending session updates (respects cap) during processBuffer', async () => { + const prev = process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE; + process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE = '3'; + const eb = new EventBuffer(); + + // Create many sessions each with 2 screen_view events → leaves 1 pending per session + const numSessions = 10; + const base = Date.now(); + + for (let i = 0; i < numSessions; i++) { + const sid = `batch_s_${i}`; + const e1 = { + project_id: 'p8', + profile_id: `u${i}`, + session_id: sid, + name: 'screen_view', + created_at: new Date(base + i * 10).toISOString(), + } as any; + const e2 = { + ...e1, + created_at: new Date(base + i * 10 + 1).toISOString(), + } as any; + await eb.add(e1); + await eb.add(e2); + } + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValue(undefined as any); + const evalSpy = vi.spyOn(redis as any, 'eval'); + + await eb.processBuffer(); + + // Only consider eval calls for batchUpdateSessionsScript (2 keys, second is total_count) + const batchEvalCalls = evalSpy.mock.calls.filter( + (call) => call[1] === 2 && call[3] === 'event_buffer:total_count', + ); + + const expectedCalls = Math.ceil(numSessions / 3); + expect(batchEvalCalls.length).toBeGreaterThanOrEqual(expectedCalls); + + function countSessionsInEvalCall(args: any[]): number { + let idx = 4; // ARGV starts after: script, numKeys, key1, key2 + let count = 0; + while (idx < args.length) { + if (idx + 3 >= args.length) break; + const pendingCount = Number.parseInt(String(args[idx + 3]), 10); + idx += 4 + Math.max(0, pendingCount); + count += 1; + } + return count; + } + + for (const call of batchEvalCalls) { + expect(call[1]).toBe(2); + expect(call[2]).toBe('event_buffer:ready_sessions'); + expect(call[3]).toBe('event_buffer:total_count'); + + const sessionsInThisCall = countSessionsInEvalCall(call.slice(0)); + expect(sessionsInThisCall).toBeLessThanOrEqual(3); + expect(sessionsInThisCall).toBeGreaterThan(0); + } + + expect(insertSpy).toHaveBeenCalled(); + + // Restore env + if (prev === undefined) + delete process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE; + else process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE = prev; + + evalSpy.mockRestore(); + insertSpy.mockRestore(); + }); + + it('flushes a lone session_end and clears the session list', async () => { + const s = 'session_only_end'; + const end = { + project_id: 'p9', + profile_id: 'u9', + session_id: s, + name: 'session_end', + created_at: new Date().toISOString(), + } as any; + + const eb = new EventBuffer(); + await eb.add(end); + + // Should be considered ready even though only 1 event (session_end) + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValueOnce(undefined as any); + + await eb.processBuffer(); + + expect(insertSpy).toHaveBeenCalledWith({ + format: 'JSONEachRow', + table: 'events', + values: [end], + }); + + const sessionKey = `event_buffer:session:${s}`; + const remaining = await redis.lrange(sessionKey, 0, -1); + expect(remaining.length).toBe(0); + + insertSpy.mockRestore(); + }); +}); diff --git a/packages/db/src/buffers/event-buffer-redis.ts b/packages/db/src/buffers/event-buffer.ts similarity index 57% rename from packages/db/src/buffers/event-buffer-redis.ts rename to packages/db/src/buffers/event-buffer.ts index ef8cc6d93..caf1bb36f 100644 --- a/packages/db/src/buffers/event-buffer-redis.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -1,4 +1,4 @@ -import { getSafeJson, setSuperJson } from '@openpanel/json'; +import { getSafeJson } from '@openpanel/json'; import { type Redis, getRedisCache, @@ -38,12 +38,16 @@ import { BaseBuffer } from './base-buffer'; export class EventBuffer extends BaseBuffer { // Configurable limits + // How many days to keep buffered session metadata before cleanup private daysToKeep = process.env.EVENT_BUFFER_DAYS_TO_KEEP ? Number.parseFloat(process.env.EVENT_BUFFER_DAYS_TO_KEEP) : 3; + // How many events we attempt to FETCH per flush cycle (split across sessions/non-sessions) + // Prefer new env EVENT_BUFFER_BATCH_SIZE; fallback to legacy EVENT_BUFFER_BATCH_SIZE private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10) : 4000; + // How many events per insert chunk we send to ClickHouse (insert batch size) private chunkSize = process.env.EVENT_BUFFER_CHUNK_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) : 1000; @@ -53,8 +57,20 @@ export class EventBuffer extends BaseBuffer { process.env.EVENT_BUFFER_UPDATE_PENDING_SESSIONS_BATCH_SIZE, 10, ) + : 300; + + // Cap of how many ready sessions to scan per flush cycle (configurable via env) + private maxSessionsPerFlush = process.env.EVENT_BUFFER_MAX_SESSIONS_PER_FLUSH + ? Number.parseInt(process.env.EVENT_BUFFER_MAX_SESSIONS_PER_FLUSH, 10) + : 500; + + // Soft time budget per flush (ms) to avoid long lock holds + private flushTimeBudgetMs = process.env.EVENT_BUFFER_FLUSH_TIME_BUDGET_MS + ? Number.parseInt(process.env.EVENT_BUFFER_FLUSH_TIME_BUDGET_MS, 10) : 1000; + private minEventsInSession = 2; + private activeVisitorsExpiration = 60 * 5; // 5 minutes private sessionEvents = ['screen_view', 'session_end']; @@ -65,113 +81,164 @@ export class EventBuffer extends BaseBuffer { // SORTED SET - Tracks all active session IDs with their timestamps private sessionSortedKey = 'event_buffer:sessions_sorted'; // sorted set of session IDs + // SORTED SET - Tracks sessions that are ready for processing (have >= minEvents) + private readySessionsKey = 'event_buffer:ready_sessions'; + + // STRING - Tracks total buffer size incrementally + protected bufferCounterKey = 'event_buffer:total_count'; + private readonly sessionKeyPrefix = 'event_buffer:session:'; // LIST - Stores events for a given session private getSessionKey(sessionId: string) { return `${this.sessionKeyPrefix}${sessionId}`; } /** - * Lua script that loops through sessions and returns a JSON-encoded list of - * session objects (sessionId and events). It stops once a total number of events - * >= batchSize is reached. It also cleans up any empty sessions. + * Optimized Lua script that processes ready sessions efficiently. + * Only fetches from sessions known to have >= minEvents. + * Limits the number of events fetched per session to avoid huge payloads. */ - private readonly processSessionsScript = ` -local sessionSortedKey = KEYS[1] + private readonly processReadySessionsScript = ` +local readySessionsKey = KEYS[1] local sessionPrefix = KEYS[2] -local batchSize = tonumber(ARGV[1]) -local minEvents = tonumber(ARGV[2]) +local maxSessions = tonumber(ARGV[1]) +local maxEventsPerSession = tonumber(ARGV[2]) +local startOffset = tonumber(ARGV[3]) or 0 local result = {} local sessionsToRemove = {} -local sessionIds = redis.call('ZRANGE', sessionSortedKey, 0, -1) + +-- Get up to maxSessions ready sessions from window [startOffset, startOffset+maxSessions-1] +local stopIndex = startOffset + maxSessions - 1 +local sessionIds = redis.call('ZRANGE', readySessionsKey, startOffset, stopIndex) local resultIndex = 1 -local totalEvents = 0 for i, sessionId in ipairs(sessionIds) do local sessionKey = sessionPrefix .. sessionId - local events = redis.call('LRANGE', sessionKey, 0, -1) + local eventCount = redis.call('LLEN', sessionKey) - if #events == 0 then + if eventCount == 0 then + -- Session is empty, remove from ready set table.insert(sessionsToRemove, sessionId) - -- If we have collected 100 sessions to remove, remove them now - if #sessionsToRemove >= 100 then - redis.call('ZREM', sessionSortedKey, unpack(sessionsToRemove)) - sessionsToRemove = {} - end - elseif #events >= minEvents then - result[resultIndex] = { sessionId = sessionId, events = events } + else + -- Fetch limited number of events to avoid huge payloads + local eventsToFetch = math.min(eventCount, maxEventsPerSession) + local events = redis.call('LRANGE', sessionKey, 0, eventsToFetch - 1) + + result[resultIndex] = { + sessionId = sessionId, + events = events, + totalEventCount = eventCount + } resultIndex = resultIndex + 1 - totalEvents = totalEvents + #events - end - - -- Only check if we should break AFTER processing the entire session - if totalEvents >= batchSize then - break end end --- Remove any remaining sessions +-- Clean up empty sessions from ready set if #sessionsToRemove > 0 then - redis.call('ZREM', sessionSortedKey, unpack(sessionsToRemove)) + redis.call('ZREM', readySessionsKey, unpack(sessionsToRemove)) end return cjson.encode(result) `; /** - * New atomic Lua script to update a session's list with pending events. - * Instead of doing a separate DEL and RPUSH (which leaves a race condition), - * this script will: - * 1. Remove the first `snapshotCount` items from the session list. - * 2. Re-insert the pending events (provided as additional arguments) - * at the head (using LPUSH in reverse order to preserve order). + * Optimized atomic Lua script to update a session's list with pending events. + * Also manages the ready_sessions set and buffer counter. * * KEYS[1] = session key - * ARGV[1] = snapshotCount (number of events that were present in our snapshot) - * ARGV[2] = pendingCount (number of pending events) - * ARGV[3..(2+pendingCount)] = the pending event strings + * KEYS[2] = ready sessions key + * KEYS[3] = buffer counter key + * ARGV[1] = sessionId + * ARGV[2] = snapshotCount (number of events that were present in our snapshot) + * ARGV[3] = pendingCount (number of pending events) + * ARGV[4] = minEventsInSession + * ARGV[5..(4+pendingCount)] = the pending event strings */ private readonly updateSessionScript = ` -local snapshotCount = tonumber(ARGV[1]) -local pendingCount = tonumber(ARGV[2]) local sessionKey = KEYS[1] +local readySessionsKey = KEYS[2] +local bufferCounterKey = KEYS[3] +local sessionId = ARGV[1] +local snapshotCount = tonumber(ARGV[2]) +local pendingCount = tonumber(ARGV[3]) +local minEventsInSession = tonumber(ARGV[4]) -- Trim the list to remove the processed (snapshot) events. redis.call("LTRIM", sessionKey, snapshotCount, -1) -- Re-insert the pending events at the head in their original order. for i = pendingCount, 1, -1 do - redis.call("LPUSH", sessionKey, ARGV[i+2]) + redis.call("LPUSH", sessionKey, ARGV[i+4]) end -return redis.call("LLEN", sessionKey) +local newLength = redis.call("LLEN", sessionKey) + +-- Update ready sessions set based on new length +if newLength >= minEventsInSession then + redis.call("ZADD", readySessionsKey, "XX", redis.call("TIME")[1], sessionId) +else + redis.call("ZREM", readySessionsKey, sessionId) +end + +-- Update buffer counter (decrement by processed events, increment by pending) +local counterChange = pendingCount - snapshotCount +if counterChange ~= 0 then + redis.call("INCRBY", bufferCounterKey, counterChange) +end + +return newLength `; /** - * Lua script that processes a batch of session updates in a single call. - * Format of updates: [sessionKey1, snapshotCount1, pendingCount1, pending1...., sessionKey2, ...] + * Optimized batch update script with counter and ready sessions management. + * KEYS[1] = ready sessions key + * KEYS[2] = buffer counter key + * ARGV format: [sessionKey1, sessionId1, snapshotCount1, pendingCount1, pending1...., sessionKey2, ...] */ private readonly batchUpdateSessionsScript = ` -local i = 1 +local readySessionsKey = KEYS[1] +local bufferCounterKey = KEYS[2] +local minEventsInSession = tonumber(ARGV[1]) +local totalCounterChange = 0 + +local i = 2 while i <= #ARGV do local sessionKey = ARGV[i] - local snapshotCount = tonumber(ARGV[i + 1]) - local pendingCount = tonumber(ARGV[i + 2]) + local sessionId = ARGV[i + 1] + local snapshotCount = tonumber(ARGV[i + 2]) + local pendingCount = tonumber(ARGV[i + 3]) -- Trim the list to remove processed events redis.call("LTRIM", sessionKey, snapshotCount, -1) -- Re-insert pending events at the head in original order if pendingCount > 0 then - local pendingEvents = {} - for j = 1, pendingCount do - table.insert(pendingEvents, ARGV[i + 2 + j]) + -- Reinsert in original order: LPUSH requires reverse iteration + for j = pendingCount, 1, -1 do + redis.call("LPUSH", sessionKey, ARGV[i + 3 + j]) end - redis.call("LPUSH", sessionKey, unpack(pendingEvents)) end - i = i + 3 + pendingCount + local newLength = redis.call("LLEN", sessionKey) + + -- Update ready sessions set based on new length + if newLength >= minEventsInSession then + redis.call("ZADD", readySessionsKey, "XX", redis.call("TIME")[1], sessionId) + else + redis.call("ZREM", readySessionsKey, sessionId) + end + + -- Track counter change + totalCounterChange = totalCounterChange + (pendingCount - snapshotCount) + + i = i + 4 + pendingCount end + +-- Update buffer counter once +if totalCounterChange ~= 0 then + redis.call("INCRBY", bufferCounterKey, totalCounterChange) +end + return "OK" `; @@ -194,9 +261,69 @@ return "OK" return multi.exec(); } + /** + * Optimized Lua script for adding events with counter management. + * KEYS[1] = session key (if session event) + * KEYS[2] = regular queue key + * KEYS[3] = sessions sorted key + * KEYS[4] = ready sessions key + * KEYS[5] = buffer counter key + * KEYS[6] = last event key (if screen_view) + * ARGV[1] = event JSON + * ARGV[2] = session_id + * ARGV[3] = event_name + * ARGV[4] = score (timestamp) + * ARGV[5] = minEventsInSession + * ARGV[6] = last event TTL (if screen_view) + */ + private readonly addEventScript = ` +local sessionKey = KEYS[1] +local regularQueueKey = KEYS[2] +local sessionsSortedKey = KEYS[3] +local readySessionsKey = KEYS[4] +local bufferCounterKey = KEYS[5] +local lastEventKey = KEYS[6] + +local eventJson = ARGV[1] +local sessionId = ARGV[2] +local eventName = ARGV[3] +local score = tonumber(ARGV[4]) +local minEventsInSession = tonumber(ARGV[5]) +local lastEventTTL = tonumber(ARGV[6] or 0) + +local counterIncrement = 1 + +if sessionId and sessionId ~= "" and (eventName == "screen_view" or eventName == "session_end") then + -- Add to session + redis.call("RPUSH", sessionKey, eventJson) + redis.call("ZADD", sessionsSortedKey, "NX", score, sessionId) + + -- Check if session is now ready for processing + local sessionLength = redis.call("LLEN", sessionKey) + if sessionLength >= minEventsInSession or eventName == "session_end" then + redis.call("ZADD", readySessionsKey, score, sessionId) + end + + -- Handle screen_view specific logic + if eventName == "screen_view" and lastEventKey ~= "" then + redis.call("SET", lastEventKey, eventJson, "EX", lastEventTTL) + elseif eventName == "session_end" and lastEventKey ~= "" then + redis.call("DEL", lastEventKey) + end +else + -- Add to regular queue + redis.call("RPUSH", regularQueueKey, eventJson) +end + +-- Increment buffer counter +redis.call("INCR", bufferCounterKey) + +return "OK" +`; + /** * Add an event into Redis. - * Combines multiple Redis operations into a single MULTI command. + * Uses optimized Lua script to reduce round trips and manage counters. */ async add(event: IClickhouseEvent, _multi?: ReturnType) { try { @@ -204,50 +331,46 @@ return "OK" const eventJson = JSON.stringify(event); const multi = _multi || redis.multi(); - if (event.session_id && this.sessionEvents.includes(event.name)) { - const sessionKey = this.getSessionKey(event.session_id); - const addEventToSession = () => { - const score = new Date(event.created_at || Date.now()).getTime(); - multi - .rpush(sessionKey, eventJson) - .zadd(this.sessionSortedKey, 'NX', score, event.session_id); - }; - - if (event.name === 'screen_view') { - multi.set( - this.getLastEventKey({ - projectId: event.project_id, - profileId: event.profile_id, - }), - eventJson, - 'EX', - 60 * 60, - ); - - addEventToSession(); - } else if (event.name === 'session_end') { - // Delete last screen view - multi.del( - this.getLastEventKey({ - projectId: event.project_id, - profileId: event.profile_id, - }), - ); - - // Check if session has any events - const eventCount = await redis.llen(sessionKey); + const isSessionEvent = + event.session_id && this.sessionEvents.includes(event.name); - if (eventCount === 0) { - // If session is empty, add to regular queue and don't track in sorted set - multi.rpush(this.regularQueueKey, eventJson); - } else { - // Otherwise add to session as normal - addEventToSession(); - } - } + if (isSessionEvent) { + const sessionKey = this.getSessionKey(event.session_id); + const score = new Date(event.created_at || Date.now()).getTime(); + const lastEventKey = + event.name === 'screen_view' + ? this.getLastEventKey({ + projectId: event.project_id, + profileId: event.profile_id, + }) + : event.name === 'session_end' + ? this.getLastEventKey({ + projectId: event.project_id, + profileId: event.profile_id, + }) + : ''; + + multi.eval( + this.addEventScript, + 6, + sessionKey, + this.regularQueueKey, + this.sessionSortedKey, + this.readySessionsKey, + this.bufferCounterKey, + lastEventKey, + eventJson, + event.session_id, + event.name, + score.toString(), + this.minEventsInSession.toString(), + '3600', // 1 hour TTL for last event + ); } else { - // All other events go to regularQueue queue - multi.rpush(this.regularQueueKey, eventJson); + // Non-session events go to regular queue + multi + .rpush(this.regularQueueKey, eventJson) + .incr(this.bufferCounterKey); } if (event.profile_id) { @@ -261,43 +384,57 @@ return "OK" if (!_multi) { await multi.exec(); } + await publishEvent('events', 'received', transformEvent(event)); } catch (error) { this.logger.error('Failed to add event to Redis buffer', { error }); } } - private async getEligableSessions({ minEventsInSession = 2 }) { + private async getEligibleSessions( + startOffset: number, + maxEventsPerSession: number, + sessionsPerPage: number, + ) { const sessionsSorted = await getRedisCache().eval( - this.processSessionsScript, + this.processReadySessionsScript, 2, // number of KEYS - this.sessionSortedKey, + this.readySessionsKey, this.sessionKeyPrefix, - (this.batchSize / 2).toString(), - minEventsInSession.toString(), + sessionsPerPage.toString(), + maxEventsPerSession.toString(), + startOffset.toString(), ); - // (A) Process session events using the Lua script. const parsed = getSafeJson< Array<{ sessionId: string; events: string[]; + totalEventCount: number; }> >(sessionsSorted as string); - const sessions: Record = {}; - if (!parsed) { - return sessions; - } + const sessions: Record< + string, + { + events: IClickhouseEvent[]; + totalEventCount: number; + } + > = {}; - if (!Array.isArray(parsed)) { + if (!parsed || !Array.isArray(parsed)) { return sessions; } for (const session of parsed) { - sessions[session.sessionId] = session.events + const events = session.events .map((e) => getSafeJson(e)) .filter((e): e is IClickhouseEvent => e !== null); + + sessions[session.sessionId] = { + events, + totalEventCount: session.totalEventCount, + }; } return sessions; @@ -343,28 +480,66 @@ return "OK" try { let now = performance.now(); - const [sessions, regularQueueEvents] = await Promise.all([ - // (A) Fetch session events - this.getEligableSessions({ minEventsInSession: 2 }), - // (B) Fetch no-session events - redis.lrange(this.regularQueueKey, 0, this.batchSize / 2 - 1), - ]); - - timer.fetchUnprocessedEvents = performance.now() - now; - now = performance.now(); + // (A) Fetch no-session events once per run + const regularQueueEvents = await redis.lrange( + this.regularQueueKey, + 0, + Math.floor(this.batchSize / 2) - 1, + ); - for (const [sessionId, sessionEvents] of Object.entries(sessions)) { - const { flush, pending } = this.processSessionEvents(sessionEvents); + // (A2) Page through ready sessions within time and budget + let sessionBudget = Math.floor(this.batchSize / 2); + let startOffset = 0; + let totalSessionEventsFetched = 0; + while (sessionBudget > 0) { + if (performance.now() - now > this.flushTimeBudgetMs) { + this.logger.debug('Stopping session paging due to time budget'); + break; + } - if (flush.length > 0) { - eventsToClickhouse.push(...flush); + const sessionsPerPage = Math.min( + this.maxSessionsPerFlush, + Math.max(1, Math.floor(sessionBudget / 2)), + ); + const perSessionBudget = Math.max( + 2, + Math.floor(sessionBudget / sessionsPerPage), + ); + + const sessionsPage = await this.getEligibleSessions( + startOffset, + perSessionBudget, + sessionsPerPage, + ); + const sessionIds = Object.keys(sessionsPage); + if (sessionIds.length === 0) { + break; } - pendingUpdates.push({ - sessionId, - snapshotCount: sessionEvents.length, - pending, - }); + for (const sessionId of sessionIds) { + const sessionData = sessionsPage[sessionId]!; + const { flush, pending } = this.processSessionEvents( + sessionData.events, + ); + + if (flush.length > 0) { + eventsToClickhouse.push(...flush); + } + + pendingUpdates.push({ + sessionId, + snapshotCount: sessionData.events.length, + pending, + }); + + // Decrease budget by fetched events for this session window + sessionBudget -= sessionData.events.length; + totalSessionEventsFetched += sessionData.events.length; + if (sessionBudget <= 0) { + break; + } + } + startOffset += sessionsPerPage; } timer.processSessionEvents = performance.now() - now; @@ -420,9 +595,11 @@ return "OK" // (F) Only after successful processing, update Redis const multi = redis.multi(); - // Clean up no-session events + // Clean up no-session events and update counter if (regularQueueEvents.length > 0) { - multi.ltrim(this.regularQueueKey, regularQueueEvents.length, -1); + multi + .ltrim(this.regularQueueKey, regularQueueEvents.length, -1) + .decrby(this.bufferCounterKey, regularQueueEvents.length); } await multi.exec(); @@ -436,10 +613,7 @@ return "OK" batchSize: this.batchSize, eventsToClickhouse: eventsToClickhouse.length, pendingSessionUpdates: pendingUpdates.length, - sessionEvents: Object.entries(sessions).reduce( - (acc, [sId, events]) => acc + events.length, - 0, - ), + sessionEventsFetched: totalSessionEventsFetched, regularEvents: regularQueueEvents.length, timer, }); @@ -609,12 +783,13 @@ return "OK" pendingUpdates, this.updatePendingSessionsBatchSize, )) { - const batchArgs: string[] = []; + const batchArgs: string[] = [this.minEventsInSession.toString()]; for (const { sessionId, snapshotCount, pending } of batch) { const sessionKey = this.getSessionKey(sessionId); batchArgs.push( sessionKey, + sessionId, snapshotCount.toString(), pending.length.toString(), ...pending.map((e) => JSON.stringify(e)), @@ -623,13 +798,16 @@ return "OK" await redis.eval( this.batchUpdateSessionsScript, - 0, // no KEYS needed + 2, // KEYS: ready sessions, buffer counter + this.readySessionsKey, + this.bufferCounterKey, ...batchArgs, ); } } public async getBufferSizeHeavy() { + // Fallback method for when counter is not available const redis = getRedisCache(); const pipeline = redis.pipeline(); @@ -668,18 +846,7 @@ return "OK" } public async getBufferSize() { - const cached = await getRedisCache().get('event_buffer:cached_count'); - if (cached) { - return Number.parseInt(cached, 10); - } - const count = await this.getBufferSizeHeavy(); - await getRedisCache().set( - 'event_buffer:cached_count', - count.toString(), - 'EX', - 15, // increase when we know it's stable - ); - return count; + return this.getBufferSizeWithCounter(() => this.getBufferSizeHeavy()); } private async incrementActiveVisitorCount( @@ -687,21 +854,13 @@ return "OK" projectId: string, profileId: string, ) { - // Add/update visitor with current timestamp as score + // Track active visitors and emit expiry events when inactive for TTL const now = Date.now(); const zsetKey = `live:visitors:${projectId}`; - return ( - multi - // To keep the count - .zadd(zsetKey, now, profileId) - // To trigger the expiration listener - .set( - `live:visitor:${projectId}:${profileId}`, - '1', - 'EX', - this.activeVisitorsExpiration, - ) - ); + const heartbeatKey = `live:visitor:${projectId}:${profileId}`; + return multi + .zadd(zsetKey, now, profileId) + .set(heartbeatKey, '1', 'EX', this.activeVisitorsExpiration); } public async getActiveVisitorCount(projectId: string): Promise { diff --git a/packages/db/src/buffers/index.ts b/packages/db/src/buffers/index.ts index 6e383dd57..17932ab95 100644 --- a/packages/db/src/buffers/index.ts +++ b/packages/db/src/buffers/index.ts @@ -1,6 +1,6 @@ -import { BotBuffer as BotBufferRedis } from './bot-buffer-redis'; -import { EventBuffer as EventBufferRedis } from './event-buffer-redis'; -import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer-redis'; +import { BotBuffer as BotBufferRedis } from './bot-buffer'; +import { EventBuffer as EventBufferRedis } from './event-buffer'; +import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer'; import { SessionBuffer } from './session-buffer'; export const eventBuffer = new EventBufferRedis(); diff --git a/packages/db/src/buffers/profile-buffer-redis.ts b/packages/db/src/buffers/profile-buffer.ts similarity index 90% rename from packages/db/src/buffers/profile-buffer-redis.ts rename to packages/db/src/buffers/profile-buffer.ts index 1f70ccfe3..92581e13a 100644 --- a/packages/db/src/buffers/profile-buffer-redis.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -19,7 +19,7 @@ export class ProfileBuffer extends BaseBuffer { ? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10) : 1000; - private readonly redisBufferKey = 'profile-buffer'; + private readonly redisKey = 'profile-buffer'; private readonly redisProfilePrefix = 'profile-cache:'; private redis: Redis; @@ -101,8 +101,9 @@ export class ProfileBuffer extends BaseBuffer { const result = await this.redis .multi() .set(cacheKey, JSON.stringify(mergedProfile), 'EX', cacheTtl) - .rpush(this.redisBufferKey, JSON.stringify(mergedProfile)) - .llen(this.redisBufferKey) + .rpush(this.redisKey, JSON.stringify(mergedProfile)) + .incr(this.bufferCounterKey) + .llen(this.redisKey) .exec(); if (!result) { @@ -112,7 +113,7 @@ export class ProfileBuffer extends BaseBuffer { }); return; } - const bufferLength = (result?.[2]?.[1] as number) ?? 0; + const bufferLength = (result?.[3]?.[1] as number) ?? 0; this.logger.debug('Current buffer length', { bufferLength, @@ -177,7 +178,7 @@ export class ProfileBuffer extends BaseBuffer { try { this.logger.info('Starting profile buffer processing'); const profiles = await this.redis.lrange( - this.redisBufferKey, + this.redisKey, 0, this.batchSize - 1, ); @@ -200,8 +201,12 @@ export class ProfileBuffer extends BaseBuffer { }); } - // Only remove profiles after successful insert - await this.redis.ltrim(this.redisBufferKey, profiles.length, -1); + // Only remove profiles after successful insert and update counter + await this.redis + .multi() + .ltrim(this.redisKey, profiles.length, -1) + .decrby(this.bufferCounterKey, profiles.length) + .exec(); this.logger.info('Successfully completed profile processing', { totalProfiles: profiles.length, @@ -212,6 +217,6 @@ export class ProfileBuffer extends BaseBuffer { } async getBufferSize() { - return getRedisCache().llen(this.redisBufferKey); + return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey)); } } diff --git a/packages/db/src/buffers/session-buffer.ts b/packages/db/src/buffers/session-buffer.ts index 70ebd48b2..6e77746c1 100644 --- a/packages/db/src/buffers/session-buffer.ts +++ b/packages/db/src/buffers/session-buffer.ts @@ -1,4 +1,4 @@ -import { type Redis, getRedisCache, runEvery } from '@openpanel/redis'; +import { type Redis, getRedisCache } from '@openpanel/redis'; import { toDots } from '@openpanel/common'; import { getSafeJson } from '@openpanel/json'; @@ -61,7 +61,7 @@ export class SessionBuffer extends BaseBuffer { const duration = new Date(newSession.ended_at).getTime() - new Date(newSession.created_at).getTime(); - if (duration > 0) { + if (duration >= 0) { newSession.duration = duration; } else { this.logger.warn('Session duration is negative', { @@ -174,10 +174,12 @@ export class SessionBuffer extends BaseBuffer { for (const session of sessions) { multi.rpush(this.redisKey, JSON.stringify(session)); } + // Increment counter by number of sessions added + multi.incrby(this.bufferCounterKey, sessions.length); await multi.exec(); - // Check buffer length - const bufferLength = await this.redis.llen(this.redisKey); + // Check buffer length using counter + const bufferLength = await this.getBufferSize(); if (bufferLength >= this.batchSize) { await this.tryFlush(); @@ -216,8 +218,12 @@ export class SessionBuffer extends BaseBuffer { }); } - // Only remove events after successful insert - await this.redis.ltrim(this.redisKey, events.length, -1); + // Only remove events after successful insert and update counter + const multi = this.redis.multi(); + multi + .ltrim(this.redisKey, events.length, -1) + .decrby(this.bufferCounterKey, events.length); + await multi.exec(); this.logger.info('Processed sessions', { count: events.length, @@ -228,6 +234,6 @@ export class SessionBuffer extends BaseBuffer { } async getBufferSize() { - return getRedisCache().llen(this.redisKey); + return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey)); } } diff --git a/packages/queue/package.json b/packages/queue/package.json index 9951df9a3..07105c24c 100644 --- a/packages/queue/package.json +++ b/packages/queue/package.json @@ -7,8 +7,10 @@ }, "dependencies": { "@openpanel/db": "workspace:*", + "@openpanel/logger": "workspace:*", "@openpanel/redis": "workspace:*", - "bullmq": "^5.8.7" + "bullmq": "^5.8.7", + "groupmq": "1.0.0-next.13" }, "devDependencies": { "@openpanel/sdk": "workspace:*", diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index 54a6dce0a..3d0413f8f 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -1,8 +1,12 @@ import { Queue, QueueEvents } from 'bullmq'; -import type { IServiceEvent, Notification, Prisma } from '@openpanel/db'; -import { getRedisQueue } from '@openpanel/redis'; +import type { IServiceEvent, Prisma } from '@openpanel/db'; +import { createLogger } from '@openpanel/logger'; +import { getRedisGroupQueue, getRedisQueue } from '@openpanel/redis'; import type { TrackPayload } from '@openpanel/sdk'; +import { Queue as GroupQueue } from 'groupmq'; + +export const queueLogger = createLogger({ name: 'queue' }); export interface EventsQueuePayloadIncomingEvent { type: 'incomingEvent'; @@ -103,6 +107,17 @@ export const eventsQueue = new Queue('events', { }, }); +export const eventsGroupQueue = new GroupQueue< + EventsQueuePayloadIncomingEvent['payload'] +>({ + logger: queueLogger, + namespace: 'group_events', + redis: getRedisGroupQueue(), + orderingDelayMs: 2000, + keepCompleted: 10, + keepFailed: 10_000, +}); + export const sessionsQueue = new Queue('sessions', { connection: getRedisQueue(), defaultJobOptions: { diff --git a/packages/redis/redis.ts b/packages/redis/redis.ts index 22b1e5ea0..769164ba7 100644 --- a/packages/redis/redis.ts +++ b/packages/redis/redis.ts @@ -8,6 +8,8 @@ const options: RedisOptions = { export { Redis }; +const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379'; + export interface ExtendedRedis extends Redis { getJson: (key: string) => Promise; setJson: ( @@ -63,7 +65,7 @@ const createRedisClient = ( let redisCache: ExtendedRedis; export function getRedisCache() { if (!redisCache) { - redisCache = createRedisClient(process.env.REDIS_URL!, options); + redisCache = createRedisClient(REDIS_URL, options); } return redisCache; @@ -72,7 +74,7 @@ export function getRedisCache() { let redisSub: ExtendedRedis; export function getRedisSub() { if (!redisSub) { - redisSub = createRedisClient(process.env.REDIS_URL!, options); + redisSub = createRedisClient(REDIS_URL, options); } return redisSub; @@ -81,7 +83,7 @@ export function getRedisSub() { let redisPub: ExtendedRedis; export function getRedisPub() { if (!redisPub) { - redisPub = createRedisClient(process.env.REDIS_URL!, options); + redisPub = createRedisClient(REDIS_URL, options); } return redisPub; @@ -91,20 +93,32 @@ let redisQueue: ExtendedRedis; export function getRedisQueue() { if (!redisQueue) { // Use different redis for queues (self-hosting will re-use the same redis instance) - redisQueue = createRedisClient( - (process.env.QUEUE_REDIS_URL || process.env.REDIS_URL)!, - { - ...options, - enableReadyCheck: false, - maxRetriesPerRequest: null, - enableOfflineQueue: true, - }, - ); + redisQueue = createRedisClient(REDIS_URL, { + ...options, + enableReadyCheck: false, + maxRetriesPerRequest: null, + enableOfflineQueue: true, + }); } return redisQueue; } +let redisGroupQueue: ExtendedRedis; +export function getRedisGroupQueue() { + if (!redisGroupQueue) { + // Dedicated Redis connection for GroupWorker to avoid blocking BullMQ + redisGroupQueue = createRedisClient(REDIS_URL, { + ...options, + enableReadyCheck: false, + maxRetriesPerRequest: null, + enableOfflineQueue: true, + }); + } + + return redisGroupQueue; +} + export async function getLock(key: string, value: string, timeout: number) { const lock = await getRedisCache().set(key, value, 'PX', timeout, 'NX'); return lock === 'OK'; diff --git a/packages/redis/run-every.ts b/packages/redis/run-every.ts index 3d81b1a96..628060427 100644 --- a/packages/redis/run-every.ts +++ b/packages/redis/run-every.ts @@ -15,6 +15,6 @@ export async function runEvery({ return; } - getRedisCache().set(cacheKey, 'true', 'EX', interval); + await getRedisCache().set(cacheKey, '1', 'EX', interval); return fn(); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b96550296..10af5da4a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -38,7 +38,7 @@ importers: version: 2.12.1 vitest: specifier: ^3.0.4 - version: 3.1.3(@types/debug@4.1.12)(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1) + version: 3.1.3(@types/debug@4.1.12)(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1) apps/api: dependencies: @@ -126,6 +126,9 @@ importers: fastify-raw-body: specifier: ^5.0.0 version: 5.0.0 + groupmq: + specifier: 1.0.0-next.13 + version: 1.0.0-next.13(ioredis@5.4.1) ico-to-png: specifier: ^0.2.2 version: 0.2.2 @@ -756,6 +759,9 @@ importers: express: specifier: ^4.18.2 version: 4.18.2 + groupmq: + specifier: 1.0.0-next.13 + version: 1.0.0-next.13(ioredis@5.4.1) prom-client: specifier: ^15.1.3 version: 15.1.3 @@ -1208,12 +1214,18 @@ importers: '@openpanel/db': specifier: workspace:* version: link:../db + '@openpanel/logger': + specifier: workspace:* + version: link:../logger '@openpanel/redis': specifier: workspace:* version: link:../redis bullmq: specifier: ^5.8.7 version: 5.8.7 + groupmq: + specifier: 1.0.0-next.13 + version: 1.0.0-next.13(ioredis@5.4.1) devDependencies: '@openpanel/sdk': specifier: workspace:* @@ -1280,7 +1292,7 @@ importers: devDependencies: astro: specifier: ^5.7.7 - version: 5.7.8(@types/node@20.14.8)(jiti@2.4.1)(rollup@4.40.1)(terser@5.27.1)(typescript@5.6.3) + version: 5.7.8(@types/node@20.14.8)(jiti@2.5.1)(rollup@4.40.1)(terser@5.27.1)(typescript@5.6.3) packages/sdks/express: dependencies: @@ -7823,15 +7835,6 @@ packages: supports-color: optional: true - debug@4.3.4: - resolution: {integrity: sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==} - engines: {node: '>=6.0'} - peerDependencies: - supports-color: '*' - peerDependenciesMeta: - supports-color: - optional: true - debug@4.3.7: resolution: {integrity: sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==} engines: {node: '>=6.0'} @@ -8834,6 +8837,12 @@ packages: resolution: {integrity: sha512-5v6yZd4JK3eMI3FqqCouswVqwugaA9r4dNZB1wwcmrD02QkV5H0y7XBQW8QwQqEaZY1pM9aqORSORhJRdNK44Q==} engines: {node: '>=6.0'} + groupmq@1.0.0-next.13: + resolution: {integrity: sha512-gPbzxXFZyeIUecEmhZWjqcODF5Xs9ZLhtAccemcD4mbeAei1CJox7gxY5eaXQ5uuu9bsBLiMFPOsSFl9/DJVRw==} + engines: {node: '>=18'} + peerDependencies: + ioredis: '>=5' + h3@1.15.3: resolution: {integrity: sha512-z6GknHqyX0h9aQaTx22VZDf6QyZn+0Nh+Ym8O/u0SGSkyF5cuTJYKlc8MkzW3Nzf9LE1ivcpmYC3FUGpywhuUQ==} @@ -9444,6 +9453,10 @@ packages: resolution: {integrity: sha512-yPBThwecp1wS9DmoA4x4KR2h3QoslacnDR8ypuFM962kI4/456Iy1oHx2RAgh4jfZNdn0bctsdadceiBUgpU1g==} hasBin: true + jiti@2.5.1: + resolution: {integrity: sha512-twQoecYPiVA5K/h6SxtORw/Bs3ar+mLUtoPSc7iMXzQzK8d7eJ/R09wmTwAjiamETn1cXYPGfNnu7DMoHgu12w==} + hasBin: true + joi@17.12.1: resolution: {integrity: sha512-vtxmq+Lsc5SlfqotnfVjlViWfOL9nt/avKNbKYizwf6gsCfq9NYY/ceYRMFD8XDdrjJ9abJyScWmhmIiy+XRtQ==} @@ -9792,10 +9805,6 @@ packages: peerDependencies: react: ^16.5.1 || ^17.0.0 || ^18.0.0 || ^19.0.0 - luxon@3.4.4: - resolution: {integrity: sha512-zobTr7akeGHnv7eBOXcRgMeCP6+uyYsczwmeRCauvpvaAltgNyTbLH/+VaEAPUeWBT+1GuNmz4wC/6jtQzbbVA==} - engines: {node: '>=12'} - luxon@3.6.1: resolution: {integrity: sha512-tJLxrKJhO2ukZ5z0gyjY1zPh3Rh88Ej9P7jNrZiHMUXHae1yvI2imgOZtL1TO8TW6biMMKfTtAOoEJANgtWBMQ==} engines: {node: '>=12'} @@ -10252,9 +10261,6 @@ packages: ms@2.0.0: resolution: {integrity: sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==} - ms@2.1.2: - resolution: {integrity: sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==} - ms@2.1.3: resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} @@ -10430,6 +10436,7 @@ packages: node-domexception@1.0.0: resolution: {integrity: sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==} engines: {node: '>=10.5.0'} + deprecated: Use your platform's native DOMException instead node-fetch-native@1.6.6: resolution: {integrity: sha512-8Mc2HhqPdlIfedsuZoc3yioPuzp6b+L5jRCRY1QzuWZh2EGJVQrGppC6V6cF0bLdbW0+O2YpqCA25aF/1lvipQ==} @@ -12035,6 +12042,7 @@ packages: source-map@0.8.0-beta.0: resolution: {integrity: sha512-2ymg6oRBpebeZi9UUNsgQ89bhx01TcTkmNTGnNO88imTmbSgy4nfujrgVEFKWpMTEGA11EDkTt7mqObTPdigIA==} engines: {node: '>= 8'} + deprecated: The work that was done in this beta branch won't be included in future versions space-separated-tokens@1.1.5: resolution: {integrity: sha512-q/JSVd1Lptzhf5bkYm4ob4iWPjx0KiRe3sRFBNrVqbJkFaBm5vbbowy1mymoPNLRa52+oadOhJ+K49wsSeSjTA==} @@ -14865,7 +14873,7 @@ snapshots: getenv: 1.0.0 glob: 7.1.6 resolve-from: 5.0.0 - semver: 7.6.3 + semver: 7.7.1 slash: 3.0.0 slugify: 1.6.6 xcode: 3.0.1 @@ -15904,7 +15912,7 @@ snapshots: '@opentelemetry/api': 1.8.0 '@opentelemetry/instrumentation': 0.51.1(@opentelemetry/api@1.8.0) '@opentelemetry/semantic-conventions': 1.27.0 - semver: 7.6.3 + semver: 7.7.1 transitivePeerDependencies: - supports-color @@ -16338,7 +16346,7 @@ snapshots: '@opentelemetry/propagator-b3': 1.24.1(@opentelemetry/api@1.8.0) '@opentelemetry/propagator-jaeger': 1.24.1(@opentelemetry/api@1.8.0) '@opentelemetry/sdk-trace-base': 1.24.1(@opentelemetry/api@1.8.0) - semver: 7.6.3 + semver: 7.7.1 '@opentelemetry/semantic-conventions@1.24.1': {} @@ -17985,7 +17993,7 @@ snapshots: hermes-profile-transformer: 0.0.6 node-stream-zip: 1.15.0 ora: 5.4.1 - semver: 7.6.3 + semver: 7.7.1 strip-ansi: 5.2.0 wcwidth: 1.0.1 yaml: 2.3.4 @@ -18051,7 +18059,7 @@ snapshots: node-fetch: 2.7.0 open: 6.4.0 ora: 5.4.1 - semver: 7.6.3 + semver: 7.7.1 shell-quote: 1.8.1 sudo-prompt: 9.2.1 transitivePeerDependencies: @@ -18080,7 +18088,7 @@ snapshots: fs-extra: 8.1.0 graceful-fs: 4.2.11 prompts: 2.4.2 - semver: 7.6.3 + semver: 7.7.1 transitivePeerDependencies: - bufferutil - encoding @@ -19246,13 +19254,13 @@ snapshots: chai: 5.2.0 tinyrainbow: 2.0.0 - '@vitest/mocker@3.1.3(vite@6.3.3(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1))': + '@vitest/mocker@3.1.3(vite@6.3.3(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1))': dependencies: '@vitest/spy': 3.1.3 estree-walker: 3.0.3 magic-string: 0.30.17 optionalDependencies: - vite: 6.3.3(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1) + vite: 6.3.3(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1) '@vitest/pretty-format@3.1.3': dependencies: @@ -19498,7 +19506,7 @@ snapshots: astring@1.8.6: {} - astro@5.7.8(@types/node@20.14.8)(jiti@2.4.1)(rollup@4.40.1)(terser@5.27.1)(typescript@5.6.3): + astro@5.7.8(@types/node@20.14.8)(jiti@2.5.1)(rollup@4.40.1)(terser@5.27.1)(typescript@5.6.3): dependencies: '@astrojs/compiler': 2.11.0 '@astrojs/internal-helpers': 0.6.1 @@ -19551,8 +19559,8 @@ snapshots: unist-util-visit: 5.0.0 unstorage: 1.16.0 vfile: 6.0.3 - vite: 6.3.3(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1) - vitefu: 1.0.6(vite@6.3.3(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1)) + vite: 6.3.3(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1) + vitefu: 1.0.6(vite@6.3.3(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1)) xxhash-wasm: 1.1.0 yargs-parser: 21.1.1 yocto-spinner: 0.2.2 @@ -20353,7 +20361,7 @@ snapshots: cron-parser@4.9.0: dependencies: - luxon: 3.4.4 + luxon: 3.6.1 cross-fetch@3.1.8: dependencies: @@ -20638,10 +20646,6 @@ snapshots: dependencies: ms: 2.1.3 - debug@4.3.4: - dependencies: - ms: 2.1.2 - debug@4.3.7: dependencies: ms: 2.1.3 @@ -22056,6 +22060,11 @@ snapshots: section-matter: 1.0.0 strip-bom-string: 1.0.0 + groupmq@1.0.0-next.13(ioredis@5.4.1): + dependencies: + cron-parser: 4.9.0 + ioredis: 5.4.1 + h3@1.15.3: dependencies: cookie-es: 1.2.2 @@ -22451,7 +22460,7 @@ snapshots: dependencies: '@ioredis/commands': 1.2.0 cluster-key-slot: 1.1.2 - debug: 4.3.4 + debug: 4.4.0 denque: 2.1.0 lodash.defaults: 4.2.0 lodash.isarguments: 3.1.0 @@ -22770,6 +22779,9 @@ snapshots: jiti@2.4.1: {} + jiti@2.5.1: + optional: true + joi@17.12.1: dependencies: '@hapi/hoek': 9.3.0 @@ -23107,8 +23119,6 @@ snapshots: dependencies: react: 18.2.0 - luxon@3.4.4: {} - luxon@3.6.1: {} magic-string@0.30.17: @@ -23938,8 +23948,6 @@ snapshots: ms@2.0.0: {} - ms@2.1.2: {} - ms@2.1.3: {} msgpackr-extract@3.0.2: @@ -27086,13 +27094,13 @@ snapshots: d3-time: 3.1.0 d3-timer: 3.0.1 - vite-node@3.1.3(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1): + vite-node@3.1.3(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1): dependencies: cac: 6.7.14 debug: 4.4.0 es-module-lexer: 1.7.0 pathe: 2.0.3 - vite: 6.3.3(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1) + vite: 6.3.3(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1) transitivePeerDependencies: - '@types/node' - jiti @@ -27107,7 +27115,7 @@ snapshots: - tsx - yaml - vite@6.3.3(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1): + vite@6.3.3(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1): dependencies: esbuild: 0.25.3 fdir: 6.4.4(picomatch@4.0.2) @@ -27118,17 +27126,17 @@ snapshots: optionalDependencies: '@types/node': 20.14.8 fsevents: 2.3.3 - jiti: 2.4.1 + jiti: 2.5.1 terser: 5.27.1 - vitefu@1.0.6(vite@6.3.3(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1)): + vitefu@1.0.6(vite@6.3.3(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1)): optionalDependencies: - vite: 6.3.3(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1) + vite: 6.3.3(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1) - vitest@3.1.3(@types/debug@4.1.12)(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1): + vitest@3.1.3(@types/debug@4.1.12)(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1): dependencies: '@vitest/expect': 3.1.3 - '@vitest/mocker': 3.1.3(vite@6.3.3(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1)) + '@vitest/mocker': 3.1.3(vite@6.3.3(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1)) '@vitest/pretty-format': 3.1.3 '@vitest/runner': 3.1.3 '@vitest/snapshot': 3.1.3 @@ -27145,8 +27153,8 @@ snapshots: tinyglobby: 0.2.13 tinypool: 1.0.2 tinyrainbow: 2.0.0 - vite: 6.3.3(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1) - vite-node: 3.1.3(@types/node@20.14.8)(jiti@2.4.1)(terser@5.27.1) + vite: 6.3.3(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1) + vite-node: 3.1.3(@types/node@20.14.8)(jiti@2.5.1)(terser@5.27.1) why-is-node-running: 2.3.0 optionalDependencies: '@types/debug': 4.1.12