- 
                Notifications
    You must be signed in to change notification settings 
- Fork 247
feat: use groupmq instead of bullmq for incoming events #206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| WalkthroughAdds GroupMQ-based grouped event queuing and worker, a dedicated Redis client for group queues, conditional enqueueing in API controllers to group or regular queues, grouped processing in the worker via a pure handler, mock script grouping/parallel dispatch, buffer refactors with counter-backed sizing, and dependency/workflow updates. Changes
 Sequence Diagram(s)sequenceDiagram
  autonumber
  participant Client
  participant API as API Controller
  participant Redis
  participant EQ as eventsQueue (BullMQ)
  participant GQ as eventsGroupQueue (GroupMQ)
  Client->>API: POST /event or /track
  API->>Redis: exists('group_queue')
  alt group_queue present
    API->>GQ: enqueue({ groupId, orderMs, data: { headers, event payload } })
  else
    API->>EQ: enqueue({ payload, backoff?, attempts? })
  end
  note over API: payload includes timestamp & isTimestampFromThePast
sequenceDiagram
  autonumber
  participant GQ as eventsGroupQueue
  participant GW as GroupWorker
  participant Job as incomingEventPure
  participant S as Services
  GQ-->>GW: deliver ordered grouped payload
  GW->>GW: log(groupId, timestamp)
  GW->>Job: incomingEventPure(payload, job?, token?)
  Job->>S: resolve session / create event / notify
  S-->>Job: result
  Job-->>GW: ack/complete
sequenceDiagram
  autonumber
  participant Mock as scripts/mock.ts
  participant Redis
  participant Track as trackit
  Mock->>Redis: FLUSH
  Note over Mock: initial 1s delay
  loop per session
    Mock->>Mock: group tracks by `parallel` key
    alt parallel group (size > 1)
      par parallel dispatch
        Mock->>Track: send event payload (parallel)
      and
        Mock->>Track: send event payload (parallel)
      end
    else sequential
      Mock->>Track: send event payload (sequential)
    end
  end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
 Pre-merge checks and finishing touches❌ Failed checks (1 warning)
 ✅ Passed checks (2 passed)
 ✨ Finishing touches
 🧪 Generate unit tests (beta)
 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro ⛔ Files ignored due to path filters (1)
 📒 Files selected for processing (5)
 🚧 Files skipped from review as they are similar to previous changes (1)
 🧰 Additional context used🧬 Code graph analysis (2)packages/db/src/buffers/event-buffer.ts (4)
 apps/worker/src/boot-workers.ts (2)
 ⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
 🔇 Additional comments (8)
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment  | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️  Outside diff range comments (1)
apps/api/src/controllers/track.controller.ts (1)
90-161: Fix timestamp handling for dedupe & enqueue paths.
getTimestampnow returns an object, but we keep passing it as-is tocheckDuplicatedEventand to the queue payload builders. That meanspayload.timestampbecomes an object (stringifies to"[object Object]"), breaking the dedupe key and flattening ordering metadata for the group queue. We need to destructure the helper result into a plain timestamp string and boolean before reusing it, then update downstream calls to use those scalars.Apply this diff:
- const timestamp = getTimestamp( - request.timestamp, - request.body.payload, - ); + const { timestamp, isTimestampFromThePast } = getTimestamp( + request.timestamp, + request.body.payload, + ); @@ - timestamp, - isTimestampFromThePast: timestamp.isTimestampFromThePast, + timestamp, + isTimestampFromThePast,(Repeat the same replacement for every spot that still reads
timestamp.timestamportimestamp.isTimestampFromThePast.)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
- pnpm-lock.yamlis excluded by- !**/pnpm-lock.yaml
📒 Files selected for processing (12)
- apps/api/package.json(1 hunks)
- apps/api/scripts/mock.ts(5 hunks)
- apps/api/src/controllers/event.controller.ts(3 hunks)
- apps/api/src/controllers/healthcheck.controller.ts(1 hunks)
- apps/api/src/controllers/track.controller.ts(3 hunks)
- apps/worker/package.json(1 hunks)
- apps/worker/src/boot-workers.ts(5 hunks)
- apps/worker/src/index.ts(2 hunks)
- apps/worker/src/jobs/events.incoming-event.ts(4 hunks)
- packages/queue/package.json(1 hunks)
- packages/queue/src/queues.ts(2 hunks)
- packages/redis/redis.ts(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (6)
packages/queue/src/queues.ts (1)
packages/redis/redis.ts (1)
getRedisGroupQueue(109-124)
apps/api/src/controllers/track.controller.ts (2)
packages/redis/redis.ts (1)
getRedisCache(64-70)packages/queue/src/queues.ts (2)
eventsGroupQueue(107-115)
eventsQueue(95-105)
apps/worker/src/jobs/events.incoming-event.ts (4)
packages/queue/src/queues.ts (1)
EventsQueuePayloadIncomingEvent(8-27)packages/db/src/services/event.service.ts (1)
IServiceEvent(138-175)packages/redis/redis.ts (1)
getLock(126-129)apps/worker/src/utils/session-handler.ts (1)
createSessionStart(15-25)
apps/api/scripts/mock.ts (1)
packages/redis/redis.ts (1)
getRedisCache(64-70)
apps/worker/src/boot-workers.ts (2)
packages/queue/src/queues.ts (2)
EventsQueuePayloadIncomingEvent(8-27)
eventsGroupQueue(107-115)apps/worker/src/jobs/events.incoming-event.ts (1)
incomingEventPure(52-215)
apps/worker/src/index.ts (1)
packages/queue/src/queues.ts (1)
eventsGroupQueue(107-115)
| await getRedisCache().flushdb(); | ||
| await new Promise((resolve) => setTimeout(resolve, 1000)); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't flush the shared Redis by default.
flushdb() wipes every key in the selected Redis database. Running this helper script against a shared dev/staging instance would erase caches, queues, rate limits—everything—before the mock traffic even starts. Please gate the flush behind an explicit opt-in (env flag or CLI switch) so we cannot accidentally destroy state.
-  await getRedisCache().flushdb();
+  if (process.env.MOCK_FLUSH_REDIS === 'true') {
+    await getRedisCache().flushdb();
+  }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await getRedisCache().flushdb(); | |
| await new Promise((resolve) => setTimeout(resolve, 1000)); | |
| if (process.env.MOCK_FLUSH_REDIS === 'true') { | |
| await getRedisCache().flushdb(); | |
| } | |
| await new Promise((resolve) => setTimeout(resolve, 1000)); | 
🤖 Prompt for AI Agents
In apps/api/scripts/mock.ts around lines 264-265, the code unconditionally calls
getRedisCache().flushdb() which will wipe a shared Redis DB; change this to
require an explicit opt-in before flushing: read an env var (e.g.
MOCK_FLUSH_REDIS==='true') or a CLI flag, log a warning that a destructive flush
will be performed, and only call flushdb() when the flag is set; if not set,
skip the flush and keep the 1s delay (or remove it) so the script is safe to run
against shared dev/staging instances.
| // Sequential execution for individual tracks | ||
| for (const track of group.tracks) { | ||
| const { name, parallel, ...properties } = track; | ||
| screenView.track.payload.name = name ?? ''; | ||
| screenView.track.payload.properties.__referrer = | ||
| session.referrer ?? ''; | ||
| if (name === 'screen_view') { | ||
| screenView.track.payload.properties.__path = | ||
| (screenView.headers.origin ?? '') + (properties.path ?? ''); | ||
| } else { | ||
| screenView.track.payload.name = track.name ?? ''; | ||
| screenView.track.payload.properties = properties; | ||
| } | ||
| screenView.headers['x-client-ip'] = session.ip; | ||
| screenView.headers['user-agent'] = session.userAgent; | ||
| await trackit(screenView); | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reset event payload per sequential track.
The sequential path keeps mutating the shared screenView template. After a non-screen event runs, its custom properties (e.g. element, articleId) remain on the object, so the next screen_view inherits and sends those stray keys. Clone the template per iteration—like the parallel branch already does—to isolate payloads.
-        for (const track of group.tracks) {
-          const { name, parallel, ...properties } = track;
-          screenView.track.payload.name = name ?? '';
-          screenView.track.payload.properties.__referrer =
-            session.referrer ?? '';
-          if (name === 'screen_view') {
-            screenView.track.payload.properties.__path =
-              (screenView.headers.origin ?? '') + (properties.path ?? '');
-          } else {
-            screenView.track.payload.name = track.name ?? '';
-            screenView.track.payload.properties = properties;
-          }
-          screenView.headers['x-client-ip'] = session.ip;
-          screenView.headers['user-agent'] = session.userAgent;
-          await trackit(screenView);
-        }
+        for (const track of group.tracks) {
+          const { name, parallel: _parallel, ...properties } = track;
+          const event = JSON.parse(JSON.stringify(screenView));
+          event.track.payload.name = name ?? '';
+          event.track.payload.properties.__referrer = session.referrer ?? '';
+          if (name === 'screen_view') {
+            event.track.payload.properties.__path =
+              (event.headers.origin ?? '') + (properties.path ?? '');
+          } else {
+            event.track.payload.properties = properties;
+          }
+          event.headers['x-client-ip'] = session.ip;
+          event.headers['user-agent'] = session.userAgent;
+          await trackit(event);
+        }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Sequential execution for individual tracks | |
| for (const track of group.tracks) { | |
| const { name, parallel, ...properties } = track; | |
| screenView.track.payload.name = name ?? ''; | |
| screenView.track.payload.properties.__referrer = | |
| session.referrer ?? ''; | |
| if (name === 'screen_view') { | |
| screenView.track.payload.properties.__path = | |
| (screenView.headers.origin ?? '') + (properties.path ?? ''); | |
| } else { | |
| screenView.track.payload.name = track.name ?? ''; | |
| screenView.track.payload.properties = properties; | |
| } | |
| screenView.headers['x-client-ip'] = session.ip; | |
| screenView.headers['user-agent'] = session.userAgent; | |
| await trackit(screenView); | |
| } | |
| // Sequential execution for individual tracks | |
| for (const track of group.tracks) { | |
| const { name, parallel: _parallel, ...properties } = track; | |
| const event = JSON.parse(JSON.stringify(screenView)); | |
| event.track.payload.name = name ?? ''; | |
| event.track.payload.properties.__referrer = session.referrer ?? ''; | |
| if (name === 'screen_view') { | |
| event.track.payload.properties.__path = | |
| (event.headers.origin ?? '') + (properties.path ?? ''); | |
| } else { | |
| event.track.payload.properties = properties; | |
| } | |
| event.headers['x-client-ip'] = session.ip; | |
| event.headers['user-agent'] = session.userAgent; | |
| await trackit(event); | |
| } | 
🤖 Prompt for AI Agents
In apps/api/scripts/mock.ts around lines 463 to 479 the sequential loop mutates
the shared screenView template so non-screen_event properties persist into
subsequent iterations; fix by creating a fresh clone of the screenView template
at the top of each loop iteration (use a deep clone method consistent with the
project, e.g. structuredClone or JSON.parse(JSON.stringify(...)) if supported),
then set name, properties.__referrer, headers (x-client-ip and user-agent) and
for screen_view set properties.__path on that cloned object before calling await
trackit(clonedScreenView); this ensures each track uses an isolated payload and
prevents stray keys from leaking between iterations.
| const logger = status === 200 ? request.log.info : request.log.error; | ||
| logger('healthcheck', { | ||
| status, | ||
| redis: redisRes, | ||
| db: dbRes, | ||
| queue: queueRes, | ||
| ch: chRes, | ||
| }); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
Verify logger method binding to prevent potential runtime errors.
Assigning request.log.info or request.log.error to a variable may lose the this context if these methods are not bound. If the Fastify logger implementation relies on this, calling logger(...) could throw a runtime error.
Consider using a safer pattern that preserves context:
- const logger = status === 200 ? request.log.info : request.log.error;
- logger('healthcheck', {
+ const logMethod = status === 200 ? 'info' : 'error';
+ request.log[logMethod]('healthcheck', {
   status,
   redis: redisRes,
   db: dbRes,
   queue: queueRes,
   ch: chRes,
- });
+ });Alternatively, verify that Fastify's pino-based logger methods are bound and safe to destructure.
Run the following script to check if similar patterns exist elsewhere in the codebase and verify this is an established pattern:
🏁 Script executed:
#!/bin/bash
# Search for similar logger assignment patterns in the codebase
rg -n --type=ts -C2 'const\s+\w+\s*=.*\.log\.(info|error|warn|debug)'Length of output: 453
Bind or access logger methods to preserve context
Pino’s log methods rely on their this context; destructuring them (e.g. const logger = request.log.info) detaches this and may throw at runtime. Either bind the method:
const logger = status === 200
  ? request.log.info.bind(request.log)
  : request.log.error.bind(request.log);
logger('healthcheck', {
  status,
  redis: redisRes,
  db: dbRes,
  queue: queueRes,
  ch: chRes,
});or use bracket notation:
const level = status === 200 ? 'info' : 'error';
request.log[level]('healthcheck', {
  status,
  redis: redisRes,
  db: dbRes,
  queue: queueRes,
  ch: chRes,
});🤖 Prompt for AI Agents
In apps/api/src/controllers/healthcheck.controller.ts around lines 38 to 45, the
code assigns request.log.info/error to a variable which detaches Pino’s `this`
context and can throw at runtime; instead, either bind the chosen method to
request.log before calling it or avoid extraction by selecting the level name
and calling request.log[level](...) directly so the logger retains its context
and works reliably; update the code to use .bind(request.log) for the chosen
function or to compute the level string ('info'|'error') and call
request.log[level](...) when logging the healthcheck object.
| if (job) { | ||
| const lock = await getLock( | ||
| `create-session-end:${currentDeviceId}`, | ||
| 'locked', | ||
| 1000, | ||
| ); | ||
|  | ||
| if (!lock) { | ||
| await job.moveToDelayed(Date.now() + 50, token); | ||
| throw new DelayedError(); | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore the session lock even when running without a BullMQ job
Gating the getLock/moveToDelayed branch behind if (job) means every path that calls incomingEventPure without a BullMQ Job (the new GroupWorker path) now skips the Redis lock entirely. GroupMQ still has visibility-timeout retries, so a timed-out attempt can run in parallel with the original attempt. Once that happens we’ll gladly create duplicate session_start events because nothing guards createSessionStart anymore. Please keep the lock acquisition outside the Bull-specific branch and make the group worker provide or emulate the delay/retry behavior so we retain the de-duplication guard.
🤖 Prompt for AI Agents
In apps/worker/src/jobs/events.incoming-event.ts around lines 193-204, the Redis
lock logic must run regardless of whether a BullMQ Job exists: move the getLock
call outside the if (job) branch and always attempt to acquire the lock with the
same key and timeout; if lock acquisition fails, preserve existing behavior for
jobs (call job.moveToDelayed(...)) and for non-job callers throw DelayedError so
the GroupWorker can emulate delay/retry; ensure the lock is released in a
finally block after processing to avoid deadlocks.
| The latest updates on your projects. Learn more about Vercel for GitHub. 
 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
- pnpm-lock.yamlis excluded by- !**/pnpm-lock.yaml
📒 Files selected for processing (4)
- apps/api/package.json(1 hunks)
- apps/worker/package.json(1 hunks)
- apps/worker/src/metrics.ts(1 hunks)
- packages/queue/package.json(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- packages/queue/package.json
- apps/worker/package.json
🧰 Additional context used
🧬 Code graph analysis (1)
apps/worker/src/metrics.ts (1)
packages/queue/src/queues.ts (4)
eventsQueue(95-105)
sessionsQueue(117-122)
cronQueue(127-132)
eventsGroupQueue(107-115)
🔇 Additional comments (1)
apps/worker/src/metrics.ts (1)
21-21: Verify GroupQueue compatibility with BullMQ Queue metrics
GroupQueue comes from an external package and isn’t defined in this repo. Manually confirm that thegroupmqpackage’sGroupQueueclass (or its TypeScript definitions) includesgetActiveCount(),getDelayedCount(),getFailedCount(),getCompletedCount(),getWaitingCount(), and anameproperty before using it interchangeably with BullMQ’sQueue.
        
          
                apps/api/package.json
              
                Outdated
          
        
      | "fastify": "^5.2.1", | ||
| "fastify-metrics": "^12.1.0", | ||
| "fastify-raw-body": "^5.0.0", | ||
| "groupmq": "1.0.0-next.5", | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocker: Verify groupmq@1.0.0-next.5 exists before merging
I can only find groupmq published up to 1.0.0-next.1; there’s no evidence that 1.0.0-next.5 is on the public registry. If this tag isn’t available, installs will fail and the API won’t build. Please confirm the package/tag is published (or adjust to an existing release) before landing this PR.(libraries.io)
🤖 Prompt for AI Agents
In apps/api/package.json at line 41, the dependency was changed to "groupmq":
"1.0.0-next.5" but that version isn't available on the public registry; verify
and resolve by one of the following: confirm and publish groupmq@1.0.0-next.5 to
the registry, or change package.json to a published version (e.g., 1.0.0-next.1
or a stable tag) and then run npm/yarn install to update package-lock.json or
yarn.lock accordingly; ensure CI/build passes after updating the lockfile and
commit the lockfile change before merging.
        
          
                apps/worker/src/metrics.ts
              
                Outdated
          
        
      | register.registerMetric( | ||
| new client.Gauge({ | ||
| name: `${queue.name}_active_count`, | ||
| name: `${queue}_active_count`, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Fix metric name generation to use queue.name.
Line 26 incorrectly changed from ${queue.name}_active_count to ${queue}_active_count, which will stringify the entire queue object instead of its name. This is inconsistent with the other metric registrations (lines 37, 48, 59, 70) which correctly use ${queue.name}.
Apply this diff to fix the metric name generation:
-      name: `${queue}_active_count`,
+      name: `${queue.name}_active_count`,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| name: `${queue}_active_count`, | |
| name: `${queue.name}_active_count`, | 
🤖 Prompt for AI Agents
In apps/worker/src/metrics.ts around line 26 the metric name was built using
`${queue}_active_count` which stringifies the queue object; change it to use the
queue's name property by replacing `${queue}` with `${queue.name}` so the metric
becomes `${queue.name}_active_count`, matching the other registrations and
ensuring correct metric keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
- pnpm-lock.yamlis excluded by- !**/pnpm-lock.yaml
📒 Files selected for processing (4)
- apps/api/package.json(1 hunks)
- apps/worker/package.json(1 hunks)
- apps/worker/src/metrics.ts(1 hunks)
- packages/queue/package.json(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- apps/worker/package.json
- apps/api/package.json
- apps/worker/src/metrics.ts
        
          
                packages/queue/package.json
              
                Outdated
          
        
      | "bullmq": "^5.8.7", | ||
| "groupmq": "1.0.0-next.5" | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not pin to an unpublished GroupMQ release.
1.0.0-next.5 is not on npm yet (latest is 1.0.0-next.4 as of Oct 1, 2025), so installs will fail and the queue package won't build. Please stick to a published version or wait until next.5 ships.(libraries.io)
Apply this diff to reference the latest available build:
-    "groupmq": "1.0.0-next.5"
+    "groupmq": "1.0.0-next.4"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "bullmq": "^5.8.7", | |
| "groupmq": "1.0.0-next.5" | |
| "bullmq": "^5.8.7", | |
| "groupmq": "1.0.0-next.4" | 
🤖 Prompt for AI Agents
In packages/queue/package.json around lines 11-12, the dependency "groupmq":
"1.0.0-next.5" references an unpublished release and will break installs; change
it to a published version (for example "groupmq": "1.0.0-next.4" or a caret
range like "^1.0.0-next.4"), save the file, run your package manager to update
the lockfile (npm/yarn/pnpm), and commit the updated package.json and lockfile
so CI and local installs succeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️  Outside diff range comments (1)
apps/worker/src/boot-workers.ts (1)
70-119: Type casts bypass safety and may break observability for GroupWorker.Every event listener casts
workerto BullMQ'sWorkertype to attach listeners for'error','closed','ready','failed','completed', and'ioredis:close'. If GroupWorker doesn't implement these events with the same signatures:
- Listeners will silently fail to attach or fire
- The group worker's errors, lifecycle, and job status won't be logged
- Monitoring will be incomplete
Consider one of these approaches:
Option 1: Separate lifecycle management
const workers = [ sessionsWorker, eventsWorker, cronWorker, notificationWorker, miscWorker, - eventsGroupWorker, ]; workers.forEach((worker) => { - (worker as Worker).on('error', (error) => { + worker.on('error', (error) => { logger.error('worker error', { worker: worker.name, error, }); }); // ... other listeners without casts }); + // Separate event handling for GroupWorker + eventsGroupWorker.on('error', (error) => { + logger.error('group worker error', { worker: 'eventsGroupWorker', error }); + }); + // Add other GroupWorker-specific eventsOption 2: Type guard with interface abstraction
Define a minimal common interface and use a type guard to safely attach compatible events.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
- apps/worker/src/boot-workers.ts(5 hunks)
- packages/queue/src/queues.ts(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
apps/worker/src/boot-workers.ts (2)
packages/queue/src/queues.ts (2)
EventsQueuePayloadIncomingEvent(8-27)
eventsGroupQueue(107-116)apps/worker/src/jobs/events.incoming-event.ts (1)
incomingEventPure(52-215)
packages/queue/src/queues.ts (1)
packages/redis/redis.ts (1)
getRedisGroupQueue(109-124)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: build-and-push-worker
- GitHub Check: build-and-push-api
🔇 Additional comments (3)
apps/worker/src/boot-workers.ts (2)
5-21: LGTM!The new imports are appropriate for the GroupMQ integration, bringing in the necessary types, queue instances, and worker class.
61-68: Ensure consistent Worker interface for eventsGroupWorkerIn apps/worker/src/boot-workers.ts (lines 61–68), the
workersarray now mixes BullMQWorkerinstances and youreventsGroupWorker. Confirm thateventsGroupWorkerimplements the same lifecycle event methods (on,emitfor ‘completed’, ‘failed’, etc.) or refactor it to match the BullMQWorkerinterface.packages/queue/src/queues.ts (1)
3-6: LGTM!The imports are well-structured. Aliasing
Queue as GroupQueuefrom 'groupmq' clearly distinguishes it from BullMQ's Queue class.
| const eventsGroupWorker = new GroupWorker< | ||
| EventsQueuePayloadIncomingEvent['payload'] | ||
| >({ | ||
| logger: true, | ||
| queue: eventsGroupQueue, | ||
| handler: async (job) => { | ||
| logger.info('processing event (group queue)', { | ||
| groupId: job.groupId, | ||
| timestamp: job.data.event.timestamp, | ||
| }); | ||
| await incomingEventPure(job.data); | ||
| }, | ||
| }); | ||
| eventsGroupWorker.run(); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Critical: Handler doesn't pass job context to incomingEventPure.
The handler only passes job.data, but incomingEventPure expects optional job and token parameters for critical functionality:
- Session creation locking (lines 155-162 in events.incoming-event.ts) requires job.moveToDelayed()
- Without the job object, the DelayedErrorretry mechanism will fail
- This could lead to race conditions and duplicate session creation
Verify whether GroupWorker's job object is compatible with BullMQ's Job interface:
🏁 Script executed:
#!/bin/bash
# Check GroupWorker job API and moveToDelayed support
rg -nP -A5 -B2 'class.*Job|interface.*Job' node_modules/groupmq/ || echo "groupmq not installed locally"
# Check if incomingEventPure actually requires job parameter in practice
ast-grep --pattern 'job.moveToDelayed($$$)'
# Find other usages of incomingEventPure to understand typical invocation
rg -nP -C3 'incomingEventPure\('Length of output: 1393
Critical: Pass the job context to incomingEventPure
In apps/worker/src/boot-workers.ts the handler currently calls:
await incomingEventPure(job.data);This omits the job object, so internal calls to job.moveToDelayed() in incomingEventPure will fail, breaking delayed retry/locking.
Update to:
await incomingEventPure(job.data, job);🤖 Prompt for AI Agents
In apps/worker/src/boot-workers.ts around lines 33 to 46 the GroupWorker handler
calls incomingEventPure(job.data) omitting the job context; this prevents
internal uses of job.moveToDelayed() inside incomingEventPure and breaks delayed
retries/locking. Update the handler to pass the job as the second argument by
calling incomingEventPure(job.data, job), ensuring the function receives the job
object for any job-level operations; keep await and error propagation as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (3)
apps/worker/src/jobs/events.incoming-event.ts (1)
193-204: Critical: Race condition persists—lock must run for GroupMQ too.As flagged in the previous review, gating the lock acquisition behind
if (job)leaves the GroupWorker path (wherejobis undefined) unprotected. When GroupMQ retries a timed-out message, the original and retry can run concurrently without a lock, creating duplicatesession_startevents.Move the lock acquisition outside the
if (job)conditional:if (!sessionEnd) { // Too avoid several created sessions we just throw if a lock exists // This will than retry the job - if (job) { - const lock = await getLock( - `create-session-end:${currentDeviceId}`, - 'locked', - 1000, - ); - - if (!lock) { + const lock = await getLock( + `create-session-end:${currentDeviceId}`, + 'locked', + 1000, + ); + + if (!lock) { + if (job) { await job.moveToDelayed(Date.now() + 50, token); - throw new DelayedError(); } + throw new DelayedError(); } await createSessionStart({ payload }); }This ensures both BullMQ and GroupMQ paths acquire the lock, while BullMQ jobs still get the delayed retry behavior.
apps/api/scripts/mock.ts (2)
264-265: Don't flush shared Redis by default.This was already flagged. Gate the flush behind an explicit opt-in (env/flag).
463-479: Clone payload per sequential track to avoid property leakage.Also previously flagged. Deep-clone template each loop to prevent cross‑event mutations.
🧹 Nitpick comments (3)
apps/worker/src/index.ts (1)
38-38: Remove type assertion or fix type compatibility.The
as anytype assertion on line 38 bypasses TypeScript's type checking, which can hide potential runtime issues.Verify whether
BullBoardGroupMQAdapterproperly implements the expected Bull Board adapter interface, and either:
- Fix the types in the
groupmqpackage if you control it- Create a proper type definition file if the package lacks types
- Document why the type assertion is necessary if there's a legitimate incompatibility#!/bin/bash # Check if BullBoardGroupMQAdapter exports proper TypeScript types rg -nP --type=ts 'export.*BullBoardGroupMQAdapter' node_modules/groupmq -A 5
packages/db/src/buffers/index.ts (1)
1-3: Consider clarifying naming convention.The imports rename classes with a
Redissuffix (e.g.,BotBuffer as BotBufferRedis) while importing from non-Redis-named files (./bot-buffer). This naming inconsistency may cause confusion.Consider either:
- Removing the
Redissuffix from the renamed imports if the implementations are the primary buffer classes- Aligning the file names with the class names if the Redis suffix is meaningful-import { BotBuffer as BotBufferRedis } from './bot-buffer'; -import { EventBuffer as EventBufferRedis } from './event-buffer'; -import { ProfileBuffer as ProfileBufferRedis } from './profile-buffer'; +import { BotBuffer } from './bot-buffer'; +import { EventBuffer } from './event-buffer'; +import { ProfileBuffer } from './profile-buffer'; import { SessionBuffer } from './session-buffer'; -export const eventBuffer = new EventBufferRedis(); -export const profileBuffer = new ProfileBufferRedis(); -export const botBuffer = new BotBufferRedis(); +export const eventBuffer = new EventBuffer(); +export const profileBuffer = new ProfileBuffer(); +export const botBuffer = new BotBuffer(); export const sessionBuffer = new SessionBuffer();
apps/api/src/controllers/event.controller.ts (1)
63-107: Explicit boolean cast for Redis EXISTS; consider namespaced feature flag and light caching.Same reasoning as in track.controller.
- const isGroupQueue = await getRedisCache().exists('group_queue'); + const isGroupQueue = + (await getRedisCache().exists('group_queue')) > 0;Also, both controllers duplicate enqueue logic. Consider extracting a shared helper to reduce drift.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
- pnpm-lock.yamlis excluded by- !**/pnpm-lock.yaml
📒 Files selected for processing (22)
- .github/workflows/docker-build.yml(6 hunks)
- apps/api/package.json(1 hunks)
- apps/api/scripts/mock.ts(5 hunks)
- apps/api/src/controllers/event.controller.ts(3 hunks)
- apps/api/src/controllers/track.controller.ts(3 hunks)
- apps/worker/package.json(1 hunks)
- apps/worker/src/boot-workers.ts(5 hunks)
- apps/worker/src/index.ts(2 hunks)
- apps/worker/src/jobs/events.incoming-event.ts(4 hunks)
- apps/worker/src/metrics.ts(1 hunks)
- package.json(1 hunks)
- packages/db/src/buffers/base-buffer.ts(4 hunks)
- packages/db/src/buffers/bot-buffer.ts(3 hunks)
- packages/db/src/buffers/event-buffer.test.ts(1 hunks)
- packages/db/src/buffers/event-buffer.ts(13 hunks)
- packages/db/src/buffers/index.ts(1 hunks)
- packages/db/src/buffers/profile-buffer.ts(5 hunks)
- packages/db/src/buffers/session-buffer.ts(5 hunks)
- packages/queue/package.json(1 hunks)
- packages/queue/src/queues.ts(2 hunks)
- packages/redis/redis.ts(5 hunks)
- packages/redis/run-every.ts(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- packages/queue/package.json
- apps/worker/src/boot-workers.ts
- packages/queue/src/queues.ts
- apps/worker/package.json
- apps/worker/src/metrics.ts
- apps/api/package.json
🧰 Additional context used
🧬 Code graph analysis (11)
packages/db/src/buffers/profile-buffer.ts (1)
packages/redis/redis.ts (1)
getRedisCache(66-72)
apps/api/src/controllers/track.controller.ts (2)
packages/redis/redis.ts (1)
getRedisCache(66-72)packages/queue/src/queues.ts (2)
eventsGroupQueue(110-119)
eventsQueue(98-108)
apps/worker/src/jobs/events.incoming-event.ts (4)
packages/queue/src/queues.ts (1)
EventsQueuePayloadIncomingEvent(11-30)packages/db/src/services/event.service.ts (1)
IServiceEvent(138-175)packages/redis/redis.ts (1)
getLock(122-125)apps/worker/src/utils/session-handler.ts (1)
createSessionStart(15-25)
packages/db/src/buffers/bot-buffer.ts (1)
packages/redis/redis.ts (1)
getRedisCache(66-72)
apps/api/src/controllers/event.controller.ts (3)
apps/api/src/controllers/track.controller.ts (2)
getTimestamp(55-82)
getStringHeaders(18-37)packages/redis/redis.ts (1)
getRedisCache(66-72)packages/queue/src/queues.ts (2)
eventsGroupQueue(110-119)
eventsQueue(98-108)
packages/db/src/buffers/event-buffer.test.ts (3)
packages/redis/redis.ts (1)
getRedisCache(66-72)packages/db/src/buffers/event-buffer.ts (1)
EventBuffer(39-885)packages/db/src/clickhouse/client.ts (1)
ch(130-140)
apps/api/scripts/mock.ts (1)
packages/redis/redis.ts (1)
getRedisCache(66-72)
packages/db/src/buffers/base-buffer.ts (2)
packages/redis/run-every.ts (1)
runEvery(3-20)packages/redis/redis.ts (1)
getRedisCache(66-72)
packages/db/src/buffers/event-buffer.ts (4)
packages/db/src/services/event.service.ts (2)
IClickhouseEvent(61-95)
transformEvent(97-131)packages/redis/redis.ts (2)
Redis(9-9)
getRedisCache(66-72)packages/redis/publisher.ts (1)
publishEvent(28-36)packages/json/index.ts (1)
getSafeJson(3-9)
apps/worker/src/index.ts (1)
packages/queue/src/queues.ts (1)
eventsGroupQueue(110-119)
packages/redis/run-every.ts (1)
packages/redis/redis.ts (1)
getRedisCache(66-72)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: lint-and-test
🔇 Additional comments (24)
apps/worker/src/jobs/events.incoming-event.ts (3)
45-50: LGTM! Clean delegation pattern.The wrapper function correctly delegates to
incomingEventPurewhile maintaining backward compatibility.
52-56: Good signature for dual usage (BullMQ and GroupMQ).Making the
jobparameter optional allows the function to work with both BullMQ and GroupMQ workflows. However, ensure the lock acquisition logic (lines 193-204) properly handles the case whenjobis undefined.
162-162: Correct payload propagation.Both calls to
createEventAndNotifycorrectly passjobPayloadinstead ofjob.data.payload, maintaining consistency with the new pure function design.Also applies to: 208-208
package.json (1)
9-9: Verify impact on local development workflow.The test script changed from
vitest(watch mode) tovitest run(single run). This is suitable for CI/CD and the pre-push hook, but developers accustomed to watch mode during local development may need to adjust their workflow.Consider documenting this change if it affects the team's development practices, or provide a separate
test:watchscript for local development:"scripts": { "test": "vitest run", + "test:watch": "vitest",apps/worker/src/index.ts (1)
9-9: LGTM! New GroupQueue integration added correctly.The imports of
eventsGroupQueueandBullBoardGroupMQAdapterproperly integrate the new GroupMQ-based event processing into the worker's Bull Board dashboard.Also applies to: 17-17
packages/redis/run-every.ts (1)
18-18: LGTM! Improved Redis operation reliability.The cache set operation is now properly awaited, ensuring the interval lock is established before executing the function. The value change from
'true'to'1'is also appropriate for Redis conventions.packages/db/src/buffers/bot-buffer.ts (3)
27-32: LGTM! Atomic buffer operations prevent race conditions.The addition now uses Redis multi/exec to atomically enqueue the event and increment the counter. This ensures the counter stays synchronized with the actual buffer size even under concurrent access.
68-72: LGTM! Atomic removal with counter decrement.The buffer processing correctly uses multi/exec to atomically trim the list and decrement the counter after successful ClickHouse insertion. This maintains counter consistency even if failures occur.
83-86: LGTM! Counter-based size tracking improves efficiency.The new
getBufferSizeWithCounterapproach avoids expensive LLEN operations by maintaining a counter. The fallback to LLEN ensures correctness if the counter is missing.packages/db/src/buffers/session-buffer.ts (5)
1-1: LGTM! Unused import removed.The
runEveryimport is no longer used and has been correctly removed.
64-64: LGTM! Zero duration now accepted.The change from
duration > 0toduration >= 0appropriately allows sessions with zero duration (e.g., single-page visits or instant bounces).
177-178: LGTM! Counter-based tracking added.The counter increment by
sessions.lengthmaintains accurate buffer size tracking consistent with the atomic operations pattern used across other buffers.
222-226: LGTM! Atomic removal maintains counter consistency.The multi/exec transaction atomically trims the buffer and decrements the counter, ensuring consistency even under concurrent access or partial failures.
237-237: LGTM! Counter-based size tracking implemented.The delegation to
getBufferSizeWithCounteraligns with the consistent buffering pattern across the codebase and improves performance by avoiding repeated LLEN calls.packages/redis/redis.ts (2)
11-11: LGTM! DRY improvement with REDIS_URL constant.Extracting the Redis URL to a constant eliminates repetition and makes future configuration changes easier.
107-120: LGTM! Dedicated Redis connection for GroupQueue.The new
getRedisGroupQueue()function correctly creates a dedicated Redis client to avoid blocking BullMQ operations. The configuration matchesgetRedisQueue(), ensuring consistent behavior, and the singleton pattern prevents connection leaks.packages/db/src/buffers/profile-buffer.ts (4)
23-23: LGTM! Counter key for buffer size tracking added.The
bufferCounterKeyfield enables counter-based buffer size tracking consistent with other buffer implementations.
106-106: LGTM! Counter increment and adjusted index.The counter increment at line 106 maintains buffer size tracking, and the adjusted result index at line 117 (from 2 to 3) correctly accounts for the new
incroperation in the multi transaction.Also applies to: 117-117
205-210: LGTM! Atomic removal with counter consistency.The multi/exec transaction atomically trims processed profiles and decrements the counter, maintaining consistency under concurrent access.
221-229: LGTM! Counter-based size with fallback initialization.The
getBufferSize()implementation properly handles the counter lifecycle: returns the counter if present, otherwise initializes it from the actual list length. This ensures correctness while benefiting from counter-based performance.apps/api/src/controllers/track.controller.ts (2)
9-10: Imports look good.New queue/cache imports align with the grouped path.
286-331: Coerce Redis EXISTS result to boolean; consider a namespaced feature key.exists returns a number (0/1). Cast explicitly and avoid generic key collisions.
[ suggest_recommended_refactor ]
Apply this minimal fix:- const isGroupQueue = await getRedisCache().exists('group_queue'); + const isGroupQueue = + (await getRedisCache().exists('group_queue')) > 0;Optionally: use a namespaced flag (e.g., feature:group_queue:enabled) and cache it with a short TTL to cut per-request roundtrips. I can draft a helper if desired.
apps/api/src/controllers/event.controller.ts (2)
6-8: Imports match the new grouping flow.
20-23: Timestamp handling reuse LGTM.
| constructor(options: { | ||
| name: string; | ||
| onFlush: () => Promise<void>; | ||
| bufferCounterKey?: string; | ||
| }) { | ||
| this.logger = createLogger({ name: options.name }); | ||
| this.name = options.name; | ||
| this.lockKey = `lock:${this.name}`; | ||
| this.onFlush = options.onFlush; | ||
| this.bufferCounterKey = `${this.name}:buffer:count`; | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Constructor ignores provided bufferCounterKey.
options.bufferCounterKey is accepted but not used, breaking custom counter keys in subclasses.
Apply:
   constructor(options: {
     name: string;
     onFlush: () => Promise<void>;
     bufferCounterKey?: string;
   }) {
@@
-    this.bufferCounterKey = `${this.name}:buffer:count`;
+    this.bufferCounterKey =
+      options.bufferCounterKey ?? `${this.name}:buffer:count`;
   }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| constructor(options: { | |
| name: string; | |
| onFlush: () => Promise<void>; | |
| bufferCounterKey?: string; | |
| }) { | |
| this.logger = createLogger({ name: options.name }); | |
| this.name = options.name; | |
| this.lockKey = `lock:${this.name}`; | |
| this.onFlush = options.onFlush; | |
| this.bufferCounterKey = `${this.name}:buffer:count`; | |
| } | |
| constructor(options: { | |
| name: string; | |
| onFlush: () => Promise<void>; | |
| bufferCounterKey?: string; | |
| }) { | |
| this.logger = createLogger({ name: options.name }); | |
| this.name = options.name; | |
| this.lockKey = `lock:${this.name}`; | |
| this.onFlush = options.onFlush; | |
| this.bufferCounterKey = | |
| options.bufferCounterKey ?? `${this.name}:buffer:count`; | |
| } | 
🤖 Prompt for AI Agents
In packages/db/src/buffers/base-buffer.ts around lines 14 to 24, the constructor
accepts options.bufferCounterKey but always overwrites this.bufferCounterKey
with `${this.name}:buffer:count`; update the constructor to use the provided
options.bufferCounterKey when present (e.g., this.bufferCounterKey =
options.bufferCounterKey ?? `${this.name}:buffer:count`) so subclasses can pass
custom counter keys while preserving the default behavior.
| beforeEach(async () => { | ||
| await redis.flushall(); | ||
| }); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Use flushdb (not flushall) and ensure test Redis isolation.
flushall wipes all DBs; safer to clear only the selected DB, and only in a dedicated test Redis instance.
Suggested change:
-beforeEach(async () => {
-  await redis.flushall();
-});
+beforeEach(async () => {
+  await redis.flushdb();
+});Please confirm tests run against a dedicated Redis (e.g., REDIS_URL points to a test DB/container). To verify, run:
🏁 Script executed:
#!/bin/bash
rg -n -C2 -i 'REDIS_URL|redis.*test|vitest' --type ts --type tsx --type js --type jsonLength of output: 27
🏁 Script executed:
#!/bin/bash
rg -n -C2 -i 'REDIS_URL|redis.*test|vitest' -g '*.ts' -g '*.js' -g '*.json'Length of output: 19951
🏁 Script executed:
#!/bin/bash
sed -n '1,50p' vitest.shared.tsLength of output: 653
Use flushdb() and isolate tests on a dedicated Redis
Replace in packages/db/src/buffers/event-buffer.test.ts:
-beforeEach(async () => {
-  await redis.flushall();
-});
+beforeEach(async () => {
+  await redis.flushdb();
+});Your Vitest setup doesn’t override REDIS_URL (defaults to redis://localhost:6379, DB 0), so tests run against your local Redis. Configure a dedicated test instance or add env.REDIS_URL in your Vitest config to point to a test DB/container.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| beforeEach(async () => { | |
| await redis.flushall(); | |
| }); | |
| beforeEach(async () => { | |
| await redis.flushdb(); | |
| }); | 
🤖 Prompt for AI Agents
In packages/db/src/buffers/event-buffer.test.ts around lines 54–56, the test
currently calls redis.flushall(), which clears all Redis databases and may hit a
developer's local Redis; change the test to call redis.flushdb() instead and
ensure your Vitest environment is pointed at a dedicated test Redis by adding or
overriding env.REDIS_URL in your Vitest config (e.g., use a test container or a
different DB number in the URL) so tests run against an isolated instance;
update the beforeEach to use flushdb() and add/update Vitest environment
configuration to set REDIS_URL to the test Redis URL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️  Outside diff range comments (2)
packages/db/src/buffers/bot-buffer.ts (1)
56-59: Filter out invalid JSON before ClickHouse insert.getSafeJson can return null; inserting nulls will break inserts or data quality.
- const parsedEvents = events.map((e) => - getSafeJson<IClickhouseBotEvent>(e), - ); + const parsedEvents = events + .map((e) => getSafeJson<IClickhouseBotEvent>(e)) + .filter((e): e is IClickhouseBotEvent => e !== null);packages/db/src/buffers/profile-buffer.ts (1)
192-195: Filter out invalid JSON before ClickHouse insert.getSafeJson can return null; ensure only valid profiles are inserted.
- const parsedProfiles = profiles.map((p) => - getSafeJson<IClickhouseProfile>(p), - ); + const parsedProfiles = profiles + .map((p) => getSafeJson<IClickhouseProfile>(p)) + .filter((p): p is IClickhouseProfile => p !== null);
🧹 Nitpick comments (2)
packages/db/src/buffers/profile-buffer.ts (1)
87-92: Avoid logging full profiles (PII-heavy) at debug.Reduce payload to IDs/flags to minimize PII in logs.
- this.logger.debug('Merged profile will be inserted', { - mergedProfile, - existingProfile, - profile, - }); + this.logger.debug('Merged profile will be inserted', { + projectId: profile.project_id, + profileId: profile.id, + changed: !!existingProfile, + });packages/db/src/buffers/event-buffer.ts (1)
389-398: Batch 'received' publishes to reduce round-trips (optional).When add() is called in bulk, we still publish per-event. Consider accumulating to a pub multi and exec once to cut I/O.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
- packages/db/src/buffers/base-buffer.ts(5 hunks)
- packages/db/src/buffers/bot-buffer.ts(3 hunks)
- packages/db/src/buffers/event-buffer.test.ts(1 hunks)
- packages/db/src/buffers/event-buffer.ts(13 hunks)
- packages/db/src/buffers/profile-buffer.ts(6 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/db/src/buffers/event-buffer.test.ts
🧰 Additional context used
🧬 Code graph analysis (2)
packages/db/src/buffers/event-buffer.ts (4)
packages/db/src/services/event.service.ts (2)
IClickhouseEvent(61-95)
transformEvent(97-131)packages/redis/redis.ts (2)
Redis(9-9)
getRedisCache(66-72)packages/redis/publisher.ts (1)
publishEvent(28-36)packages/json/index.ts (1)
getSafeJson(3-9)
packages/db/src/buffers/base-buffer.ts (2)
packages/redis/run-every.ts (1)
runEvery(3-20)packages/redis/redis.ts (1)
getRedisCache(66-72)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: lint-and-test
🔇 Additional comments (6)
packages/db/src/buffers/base-buffer.ts (1)
33-78: Counter-backed size: good NaN handling and resync pattern.Reading/initializing the Redis counter with validation and runEvery resync looks solid. The reset-on-error in tryFlush is a good drift guard.
packages/db/src/buffers/bot-buffer.ts (2)
27-33: Atomic enqueue + counter increment.Using a single multi/exec for rpush + incr is the right move to keep queue and counter in sync.
82-84: Leverage counter-aware size with LLEN fallback.Delegating to getBufferSizeWithCounter(() => llen(...)) is correct and consistent with the new pattern.
packages/db/src/buffers/profile-buffer.ts (1)
101-107: Indexing result for LLEN looks correct.LLEN at position 3 is right for the multi chain; consider handling per-command errors if needed.
Also applies to: 116-116
packages/db/src/buffers/event-buffer.ts (2)
510-553: Paging logic honors per-session ≥2 budget.Good fix ensuring at least two events per session and time-bounded scanning.
608-613: Counter decrement aligned with trimmed regular events.LTRIM + DECRBY mirrors the fetched window correctly.
| logger: ILogger; | ||
| lockKey: string; | ||
| lockTimeout = 60; | ||
| onFlush: () => void; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Type the onFlush field as Promise-returning.
Property is declared as () => void but used with await. Make it () => Promise for correct typing.
-  onFlush: () => void;
+  onFlush: () => Promise<void>;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| onFlush: () => void; | |
| onFlush: () => Promise<void>; | 
🤖 Prompt for AI Agents
In packages/db/src/buffers/base-buffer.ts around line 10, the onFlush property
is currently typed as "() => void" but callers await it; change the declaration
to "() => Promise<void>" so TypeScript knows it returns a promise. Update the
interface/type declaration so onFlush: () => Promise<void>, and ensure any
implementing classes/objects return/forward a Promise<void> (or are declared
async) to satisfy the new signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
apps/api/src/controllers/track.controller.ts (2)
283-284: Same Redis check concerns apply here.The Redis existence check on line 283 has the same error handling and performance concerns flagged in
event.controller.ts(line 63). Consider applying the same error handling and caching recommendations.See the review comment on
event.controller.tslines 63-64 for details.
322-328: Same backoff delay concern applies here.The 200ms backoff delay differs from the default 1000ms, same as in
event.controller.ts(lines 102-109). Ensure this is intentional and consistent across both controllers.See the review comment on
event.controller.tslines 102-109 for details.
🧹 Nitpick comments (6)
apps/api/src/controllers/event.controller.ts (4)
7-7: Unused import:getLock.The
getLockimport is not used anywhere in this file. Consider removing it to keep the imports clean.Apply this diff to remove the unused import:
-import { getLock, getRedisCache } from '@openpanel/redis'; +import { getRedisCache } from '@openpanel/redis';
63-64: Consider error handling and performance for Redis check.The Redis existence check on every event ingestion could impact latency and lacks error handling if Redis is unavailable. Consider:
- Wrapping the check in a try-catch with a fallback to the non-group path
- Caching the flag value with a short TTL to reduce Redis calls per event
Example error handling:
- const isGroupQueue = await getRedisCache().exists('group_queue'); + let isGroupQueue = false; + try { + isGroupQueue = await getRedisCache().exists('group_queue'); + } catch (error) { + // Log error and fall back to non-group queue + console.error('Redis check failed, using non-group queue', error); + }
68-83: Clamp excessively old timestamps in getTimestamp
getTimestamp only rejects future dates—events with arbitrarily old timestamps still yield orderMs, which can disrupt groupQueue ordering. Enforce a lower bound (e.g. no older than 7 days) or clearly document the allowed timestamp range.
102-109: Review exponential backoff configuration (apps/api/src/controllers/event.controller.ts:102-109)
- Confirm that the 200 ms min backoff (vs the 1000 ms default in packages/queue/src/queues.ts) is intentional.
- Recommend aligning with best practices: implement truncated exponential backoff with a max cap, introduce jitter, and add observability around retry attempts and delays.
apps/api/src/controllers/track.controller.ts (2)
10-10: Unused import:getLock.The
getLockimport is not used in this file. This is the same issue as inevent.controller.ts. Consider removing it to keep the imports clean.Apply this diff to remove the unused import:
-import { getLock, getRedisCache } from '@openpanel/redis'; +import { getRedisCache } from '@openpanel/redis';
283-331: Consider extracting common queuing logic.The conditional queuing logic (lines 283-331) is duplicated between this file and
event.controller.ts(lines 63-110). Consider extracting a shared helper function to reduce duplication and ensure consistent behavior.Example refactor:
// In a shared util file async function enqueueEvent(params: { projectId: string; headers: Record<string, string | undefined>; event: any; geo: GeoLocation; currentDeviceId: string; previousDeviceId: string; timestamp: string; isTimestampFromThePast: boolean; profileId?: string; }) { const isGroupQueue = await getRedisCache().exists('group_queue'); if (isGroupQueue) { const groupId = params.profileId ? `${params.projectId}:${params.profileId}` : params.currentDeviceId; await eventsGroupQueue.add({ orderMs: new Date(params.timestamp).getTime(), data: { projectId: params.projectId, headers: params.headers, event: { ...params.event, timestamp: params.timestamp, isTimestampFromThePast: params.isTimestampFromThePast, }, geo: params.geo, currentDeviceId: params.currentDeviceId, previousDeviceId: params.previousDeviceId, }, groupId, }); } else { await eventsQueue.add( 'event', { type: 'incomingEvent', payload: { projectId: params.projectId, headers: params.headers, event: { ...params.event, timestamp: params.timestamp, isTimestampFromThePast: params.isTimestampFromThePast, }, geo: params.geo, currentDeviceId: params.currentDeviceId, previousDeviceId: params.previousDeviceId, }, }, { attempts: 3, backoff: { type: 'exponential', delay: 200, }, }, ); } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
- apps/api/src/controllers/event.controller.ts(3 hunks)
- apps/api/src/controllers/track.controller.ts(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
apps/api/src/controllers/track.controller.ts (2)
packages/redis/redis.ts (1)
getRedisCache(66-72)packages/queue/src/queues.ts (2)
eventsGroupQueue(110-119)
eventsQueue(98-108)
apps/api/src/controllers/event.controller.ts (4)
apps/api/src/controllers/track.controller.ts (2)
getTimestamp(56-83)
getStringHeaders(19-38)apps/api/src/utils/get-client-ip.ts (1)
getClientIp(6-8)packages/redis/redis.ts (1)
getRedisCache(66-72)packages/queue/src/queues.ts (2)
eventsGroupQueue(110-119)
eventsQueue(98-108)
🔇 Additional comments (3)
apps/api/src/controllers/event.controller.ts (3)
20-27: LGTM: Improved timestamp and headers handling.The destructured timestamp handling and consistent use of
getStringHeadersimprove code clarity and align with the pattern used intrack.controller.ts.
65-67: LGTM: GroupId construction prevents cross-project grouping.The groupId construction correctly prefixes with
projectIdwhen aprofileIdexists, ensuring events for the same profile across different projects are not inadvertently grouped together.
85-110: LGTM: Non-group path maintains backward compatibility.The non-group queue path correctly maintains the existing
incomingEventpayload structure and retry configuration, ensuring backward compatibility with existing queue consumers.
Summary by CodeRabbit
New Features
Improvements
Refactor
Tests
Chores