feat: bulk event ingestion (POST /track/batch) for offline-first SDKs#377
feat: bulk event ingestion (POST /track/batch) for offline-first SDKs#377mraj602-tohands wants to merge 3 commits into
Conversation
Reimplements the batch ingestion feature from PR Openpanel-dev#374 on top of current upstream/main (post buffer-perf, kafka client, ClickHouse round-robin). Adds __syncedAt property to all worker-processed events. See PR description for full architectural details. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThis PR implements batch event ingestion ( ChangesBatch event ingestion with deterministic session bucketing
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ESLint
ESLint skipped: no ESLint configuration detected in root package.json. To enable, add Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
apps/worker/src/jobs/events.incoming-events.test.ts (1)
553-596: 💤 Low valueConsider adding test coverage for historical events when lock is not acquired.
This test verifies historical event handling when the lock is acquired (session_start emitted). Consider adding a companion test where
getLockreturnsfalseto verify that:
session_startis skipped- The event itself is still created
sessionEndis still not scheduled (historical path)This would complete coverage of the historical event path.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/worker/src/jobs/events.incoming-events.test.ts` around lines 553 - 596, Add a new test alongside the existing historical-event test that simulates the lock NOT being acquired by mocking getLock to return false; call incomingEvent with the same historical payload and assert that createEvent is still called for the event (but there is no 'session_start' createEvent call), sessionsQueue.add is not called (no sessionEnd scheduled), and that the event createEvent call preserves deviceId/sessionId; reference the incomingEvent handler, getLock mock, createEvent mock, and sessionsQueue.add to locate and update the test harness.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@apps/worker/src/jobs/events.incoming-event.ts`:
- Around line 217-219: The referrer fields (enrichment.referrer,
enrichment.referrer_name, enrichment.referrer_type) use "?? undefined" which can
yield undefined instead of falling back to the base event; change them to use
the same fallback pattern as the other fields by replacing "enrichment.referrer
?? undefined" with "enrichment.referrer ?? baseEvent.referrer" and likewise
"enrichment.referrer_name ?? baseEvent.referrer_name" and
"enrichment.referrer_type ?? baseEvent.referrer_type" so string-type
expectations are preserved.
---
Nitpick comments:
In `@apps/worker/src/jobs/events.incoming-events.test.ts`:
- Around line 553-596: Add a new test alongside the existing historical-event
test that simulates the lock NOT being acquired by mocking getLock to return
false; call incomingEvent with the same historical payload and assert that
createEvent is still called for the event (but there is no 'session_start'
createEvent call), sessionsQueue.add is not called (no sessionEnd scheduled),
and that the event createEvent call preserves deviceId/sessionId; reference the
incomingEvent handler, getLock mock, createEvent mock, and sessionsQueue.add to
locate and update the test harness.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 91718804-8af2-4a6e-9663-17127061acf7
📒 Files selected for processing (9)
apps/api/src/controllers/event.controller.tsapps/api/src/controllers/track.controller.tsapps/api/src/routes/track-batch.router.test.tsapps/api/src/routes/track.router.tsapps/api/src/utils/ids.tsapps/worker/src/jobs/events.incoming-event.tsapps/worker/src/jobs/events.incoming-events.test.tspackages/queue/src/queues.tspackages/validation/src/track.validation.ts
💤 Files with no reviewable changes (1)
- packages/queue/src/queues.ts
Tests use toStrictEqual, so the new __syncedAt property in event properties needs to be included in assertions. Uses expect.any(String) since the exact ISO timestamp varies per run. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…hment When session enrichment has null referrer fields, fall back to baseEvent values instead of undefined. Consistent with all other enrichment fields (path, os, browser, geo, etc.). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Closes #373.
Summary
Adds
POST /track/batch— accepts up to 2000 events / 10 MB per request and back-dates each event to its own__timestamp(up to 5 days in the past). Built so offline-capable SDKs (mobile, IoT, edge) can drain a local buffer in one round-trip and have those events land in the right historical session, not all stamped with arrival time.What's new vs #374
main__syncedAtproperty__syncedAt(ISO 8601) in its properties, recording when the worker actually handled the event. For batch ingestion this is distinct fromcreatedAt(the client's__timestamp), making it easy to diagnose ingestion latency and spot retried events.Architecture
Per-row rejection instead of whole-batch failure
A 2000-event batch where one row is malformed shouldn't fail the other 1999. The controller
safeParses each row throughzTrackHandlerPayload; failures become{ index, reason: 'validation' | 'internal', error }entries inrejected[]. Caller gets 202 with{ accepted, rejected[] }and can re-send only the bad indices.aliasis rejected per-row withreason: 'validation', matching the existing single-event/track400 for alias. Same behaviour, surfaced consistently in both shapes.Salts + geo fetched once per request
buildSharedRequestContextdoes the salts query and request-IP geo lookup once.buildEventContextthen uses those for every event. Only events that override__iptrigger a second geo lookup. For a typical 2000-event batch this is the difference between 1 PG round-trip + 1 geo lookup vs. 4000 of each.2000 events / 10 MB caps
Mixpanel
/importuses the same numbers. Going higher means a single bad request can OOM a Fastify worker. Going lower forces SDKs to chunk excessively for the IoT use case (a device with a week of 1-min readings has 10080 events; 2000-per-batch = 5 round-trips, which is fine; 1000-per-batch = 11 round-trips, noticeably worse on a flaky cell network).The 10 MB body cap is enforced via Fastify's existing limit; no new mechanism needed.
Deterministic session_id from
__timestampThis is the load-bearing change for the IoT use case. Before this PR, when a client sends
__deviceId(override path), the server returnedsessionId: ''and let the worker re-derive a session at processing time. The worker usesDate.now()for that derivation. Result: a 6-hour-old buffered event lands in whatever session is currently active for that device, not the session it belonged to when captured.After this PR:
getDeviceIdaccepts aneventMsparameter (the event's own timestamp, not wall-clock now)eventMs__deviceIdoverride path no longer short-circuits to emptysessionIddeviceIdandsessionIdpopulatedA batch covering 5 days of buffered readings produces 240 distinct sessions (one per 30-min bucket per device), each with
session_startback-dated to the actual capture time. Dashboards show the activity in the right hour of the right day.Hard 5-day floor
Same as Mixpanel's
/importcontract. The deterministic session bucket has finite memory in Redis (TTL = 30 min), and emittingsession_startrows for events older than the analytics tool will reasonably backfill is wasteful. 5 days is the longest plausible offline window for the use cases this is designed to support.In single-event
/trackthis returns 400. In batch it surfaces as a per-row{ reason: 'validation', error: 'event timestamp older than 5 days' }.Three-way worker dispatch: server / historical / live
Before: the worker had two paths — server events (enrich from
sessionBufferif a profileId is supplied) and everything else (extend live session or emit session_start + schedule sessionEnd).After: a third "historical" path. An event is historical when
Date.now() - createdAt > SESSION_TIMEOUT (30 min)and not server-side. Historical events:deviceId/sessionIdextendSessionEndJob(would push the live timer for whatever current session exists)createSessionEndJob(would schedule a 30-min close timer for a 6-hour-old session — meaningless)session_startper bucket via the lockThis keeps live session state and historical backfill in completely separate code paths. A device that's actively online while a batch of historical events is being processed for it doesn't get its current session disturbed.
Redis lock for session_start dedup
Two scenarios where duplicate
session_startrows leak in without a lock:Live race: N events for the same
(projectId, deviceId)arrive at the API in parallel. All see no Redis sessionEnd key. All queue with no active session. The worker — even with sequential per-device processing — has all N events trying to emit session_start before any of them has finished writing.Historical batch: a single batch contains 50 events spanning the same 30-min bucket. Each one independently tries to emit session_start.
The lock key is
session_start:{projectId}:{sessionId}, TTL = SESSION_TIMEOUT (30 min). First caller acquires, emits, others skip. The lock-not-acquired branch still callscreateSessionEndJob(idempotent on jobId) so the session still closes cleanly.Keyed on
sessionId, notdeviceId, so two events from the same device but different historical buckets each get their ownsession_start.__syncedAt— server-stamped sync timeEvery event processed by the worker now gets
__syncedAt: new Date().toISOString()in its properties. This is set at worker processing time (not API receipt time), so for batch ingestion the delta betweencreatedAt(the client's__timestamp) and__syncedAtincludes both network latency and queue wait time.Use cases:
createdAt, different__syncedAt)Stored as a regular property (not a ClickHouse column) — no migration needed.
Bounded batch concurrency
Events within a batch are processed in chunks of 50 (
BATCH_CONCURRENCY). Each event hits Redis (session lookup + queue add) and may trigger a geo lookup if__ipis overridden. UnboundedPromise.allover 2000 events can spike Redis pool usage and the geo provider's rate budget. 50 keeps the pipeline saturated without thundering-herd behaviour.What this PR does NOT do
session_startis deduped via the Redis lock, but events themselves are not. This is deliberate so the dedup design can be discussed in isolation. Listed as out-of-scope in Bulk event ingestion (POST /track/batch) for offline-first SDKs — IoT, mobile, edge #373.Production verification
Deployed against a self-hosted instance and ran ~80 assertions across 24 scenarios. All passing with ClickHouse cross-checks for session_id distribution, session_start counts, and bucket boundaries:
/track).__ipoverride: distinct geo per event in CH within one shared session.Test plan
pnpm vitest run apps/api/src/routes/track-batch.router.test.ts— 14 test casespnpm vitest run apps/worker/src/jobs/events.incoming-events.test.ts— existing + 2 new tests/trackregression — no behavioural change__syncedAtappears in event properties in dashboard🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Tests