Add data compliance: retention engine, orphan cleanup, GDPR export#498
Add data compliance: retention engine, orphan cleanup, GDPR export#4982witstudios merged 8 commits intomasterfrom
Conversation
Automated cleanup for 10 tables with expiresAt columns, including sessions, verification tokens, socket tokens, email tokens, pulse summaries, page versions, drive backups, invitations, page permissions, and AI usage logs. Respects isPinned flag for versions/backups. Adds expiresAt column + index to aiUsageLogs table. Adds cron endpoint at /api/cron/retention-cleanup. Fixes #460 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Content-addressed storage aware orphan detection using LEFT JOINs across filePages, channelMessages, and pages tables. Adds delete endpoints to processor service and cron job to find/purge orphans. Fixes #457 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Self-service endpoint at GET /api/account/export for GDPR Art. 15/20 data subject access requests. Collects profile, drives, pages, messages (AI chat, channel, conversation, DM), files metadata, activity logs, AI usage, and tasks into a streamed ZIP archive. Rate limited to 1 export per 24 hours. Fixes #461 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📝 WalkthroughWalkthroughImplements data subject export (GDPR Art. 15/20), orphaned file detection and cleanup, and a retention policy engine. Adds a DELETE file API in the processor, exports user data as ZIP via new endpoint, runs periodic cron jobs to clean expired data and orphaned files, and introduces database retention tracking with schema changes and utility libraries for compliance workflows. Changes
Sequence DiagramssequenceDiagram
participant User
participant WebAPI as Web API<br/>/account/export
participant DataCollector as Data Collector<br/>(gdpr-export)
participant Database as Database
participant Archiver as Archiver
participant Response
User->>WebAPI: GET /api/account/export (session auth)
WebAPI->>WebAPI: Validate rate limit (24h per user)
WebAPI->>DataCollector: collectAllUserData(userId)
DataCollector->>Database: Query profile, drives, pages
DataCollector->>Database: Query messages, files, activity
DataCollector->>Database: Query AI usage, tasks
Database-->>DataCollector: Return all data
DataCollector-->>WebAPI: AllUserData object
WebAPI->>Archiver: Create ZIP with 8 JSON files
Archiver->>Archiver: Serialize & compress data
Archiver-->>WebAPI: ZIP stream
WebAPI->>Response: Stream ZIP with headers
Response-->>User: ZIP file
sequenceDiagram
participant Cron as Cron Schedule
participant WebCron as Web Cron API<br/>/cleanup-orphaned-files
participant Database as Database
participant DriveService as Drive Service<br/>(token generation)
participant Processor as Processor API<br/>DELETE /api/files
participant Storage as File Storage
Cron->>WebCron: POST /api/cron/cleanup-orphaned-files
WebCron->>WebCron: Validate cron secret
WebCron->>Database: SELECT orphaned files
Database-->>WebCron: List of orphans
loop For each orphan with storagePath
WebCron->>WebCron: Extract contentHash from path
WebCron->>DriveService: Get token for FILE_DELETE_SCOPES
DriveService-->>WebCron: Token (30s window)
WebCron->>Processor: DELETE /api/files/{contentHash}
Processor->>Storage: Delete file & cache
Storage-->>Processor: Deletion result
Processor-->>WebCron: Success/failure
WebCron->>WebCron: Track outcome
end
WebCron->>Database: Hard-delete all orphan records
WebCron-->>Cron: JSON summary<br/>(success, counts, timestamp)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Fix all issues with AI agents
In `@apps/processor/src/api/delete-file.ts`:
- Around line 12-38: The delete-file route currently returns success:true even
when nothing was deleted; update the router.delete handler to check the result
of contentStore.deleteOriginalAndCache(contentHash) and if both originalDeleted
and cacheDeleted are false respond with res.status(404).json({ error: 'Content
not found', contentHash }) (and log this case) instead of the 200 success
response; otherwise keep the existing 200 response and payload. Reference: the
async router.delete handler and the contentStore.deleteOriginalAndCache call to
locate the change.
In `@apps/web/src/app/api/account/export/route.ts`:
- Around line 62-64: The current archive.on('data', (chunk: Buffer) => {
writer.write(chunk); }) is dropping backpressure because writer.write() returns
a Promise that must be awaited; update the export flow to await writes or use
proper piping: either convert the Node archiver stream to a web ReadableStream
(e.g., use Readable.toWeb(archive) or wrap archive in a new ReadableStream that
uses archive.on('data') in its pull) and then pipeTo the writable, or inside the
existing handler use the writer.ready/await writer.write(chunk) sequence and
await writer.ready before each write (or queue writes sequentially) and close
the writer on 'end'/'finish' events; target symbols: archive and writer.write to
implement this backpressure-safe solution.
In `@apps/web/src/app/api/cron/cleanup-orphaned-files/route.ts`:
- Around line 53-55: Detect when the 64-hex segment lookup falls back to
orphan.id (i.e., contentHash === orphan.id) and emit a clear warning including
the orphan.id and orphan.storagePath so operators can investigate malformed
storage paths; specifically, after computing contentHash from
orphan.storagePath, if it equals orphan.id call the existing logger (same logger
used elsewhere in this file) to warn that the storagePath lacked a valid 64-hex
content hash and that this record will be skipped/flagged instead of passed to
processor.isValidContentHash / delete logic to avoid silently leaving an
unreachable file.
- Around line 49-81: The fetch call inside the orphan-processing loop (where you
call createDriveServiceToken and then
fetch(`${PROCESSOR_URL}/api/files/${contentHash}`)) has no timeout and can hang;
add an AbortController per-iteration with a setTimeout (e.g. 10-30s) and pass
controller.signal to fetch, clear the timer on completion, and treat an aborted
request like other failures (push orphan.id to failedPhysicalDeletes and log a
timeout-specific message); ensure you catch the abort error in the existing
try/catch so timeouts are handled the same as other fetch failures.
In `@packages/lib/src/compliance/export/gdpr-export.ts`:
- Around line 178-202: collectUserPages currently returns every page in drives
the user is a member of which can leak other users' data; update
collectUserPages to restrict exports to the data subject's own pages by adding a
filter on the page owner/author/editor fields (e.g., pages.createdBy,
pages.authorId, pages.updatedBy or similar—inspect your pages schema) so the
database.select/.where call includes eq(pages.createdBy, userId) or
eq(pages.authorId, userId) (and/or any editor relationship you deem necessary),
or alternatively make the function explicitly accept an includeWorkspaceData
flag and document the behavior clearly as "workspace data the user had access
to" if exporting all drive pages is intentional; reference collectUserPages,
collectUserDrives, and the pages columns to locate where to apply the filter or
adjust the API/documentation.
- Around line 178-202: collectUserPages currently calls collectUserDrives again
and issues N+1 queries; change its signature to accept driveIds:
collectUserPages(database: DB, driveIds: string[]): Promise<UserPageExport[]>,
remove the internal call to collectUserDrives, and replace the per-drive loop
with a single query using inArray(pages.driveId, driveIds) to fetch all pages at
once; update the caller collectAllUserData to fetch drives first via
collectUserDrives(database, userId), map to driveIds and pass that array into
collectUserPages, and add the import for inArray from 'drizzle-orm' where
needed.
- Around line 382-406: collectAllUserData currently calls collectUserDrives and
collectUserPages in parallel, but collectUserPages itself fetches drives (hidden
dependency), causing duplicate work and wasted parallelism; fix by first
awaiting collectUserDrives(database, userId), then pass the resulting drives (or
their IDs) into collectUserPages (e.g., change collectUserPages signature to
collectUserPages(database, userId, drivesOrDriveIds)) and remove the internal
drive fetch from collectUserPages; update any other call sites of
collectUserPages accordingly so drives are provided instead of being fetched
internally.
🧹 Nitpick comments (15)
apps/web/package.json (1)
67-67: Move@types/archivertodevDependencies.Type definition packages are only needed at compile time and should not be shipped as runtime dependencies.
Proposed fix
Move
"@types/archiver": "^7.0.0"fromdependenciestodevDependencies.apps/web/src/app/api/cron/cleanup-orphaned-files/route.ts (1)
57-62: A new service token is minted per orphan.If many orphans share the same
driveId, this creates redundant token overhead. Consider grouping orphans bydriveIdand reusing tokens within each group, especially since the token TTL is only 30 seconds.apps/processor/src/cache/content-store.ts (1)
513-532: Consider addingforce: truetofs.rmfor clearer intent.Without
force: true,fs.rmthrowsENOENTwhen the directory doesn't exist. Thecatchblock handles this by returningfalse, but this conflates "not found" with genuine errors (e.g., permission denied). Addingforce: truewould makefalsereturns more meaningful — they'd only indicate actual failures.Proposed fix
try { - await fs.rm(dir, { recursive: true }); + await fs.rm(dir, { recursive: true, force: true }); return true; } catch { return false; }Apply the same change in
deleteCache(Line 548).apps/web/src/app/api/account/export/route.ts (2)
8-10: In-memory rate limit won't survive restarts or scale across instances.The
Map-based rate limit resets on every deploy/restart and is per-process, so in a multi-instance deployment a user can bypass the 24-hour cooldown. Since the export triggers a heavy DB query plus ZIP generation, this could be abused.Consider persisting the last-export timestamp in the database (e.g., a column on
usersor a lightweightuser_exportstable) so the rate limit survives restarts and works across instances. This can be a follow-up.
51-52: Rate limit is recorded before the ZIP stream is consumed by the client.If streaming fails after Line 52, the user is rate-limited for 24 hours despite not receiving a complete export. Moving the timestamp recording earlier (before finalize) is a deliberate tradeoff to prevent repeated expensive DB queries, so this is acceptable — just worth documenting with a brief comment explaining the reasoning.
packages/lib/src/compliance/file-cleanup/orphan-detector.test.ts (2)
142-196: MissingisFileOrphanedtest for files referenced viachannelMessages.
findOrphanedFileRecordshas tests for bothfilePagesandchannelMessagesreferences, butisFileOrphanedonly tests thefilePagespath. Adding a case like'returns false for a file referenced by channelMessages'would ensure symmetry and guard against regressions if the query logic diverges.
38-41: Test cleanup relies on per-test manual deletes that won't run on assertion failure.Each test manually deletes its created records at the end, but if an assertion fails, those cleanup lines are skipped. Consider moving per-test entity cleanup (files, pages, filePages, channelMessages) into the
afterEachblock, or using a helper that tracks created records for guaranteed teardown.packages/lib/src/compliance/export/gdpr-export.test.ts (1)
8-18:collectUserFilesandcollectUserActivityare imported but lack dedicated test blocks.Both are imported (Lines 13-14) and exercised indirectly through
collectAllUserData, but they have nodescribeblocks verifying their individual behavior (e.g., that file metadata or activity logs are returned correctly for a known user). Adding targeted tests would increase confidence and catch regressions in those collectors independently.apps/web/src/app/api/cron/retention-cleanup/route.ts (1)
27-36: Consider adding a timeout or batch-size guard for large-scale cleanups.If any table accumulates a large backlog of expired rows (e.g., after the retention engine is first deployed against an existing database),
runRetentionCleanupcould be long-running and may exceed the cron timeout. Consider whether the underlying cleanup functions should support aLIMITparameter so the work can be spread across multiple cron invocations.packages/lib/src/compliance/retention/retention-engine.test.ts (1)
258-273: Consider adding a negative test for non-PENDING drive invitations.The PR description states the engine only deletes
PENDINGinvitations. A test inserting anACCEPTEDorDECLINEDinvitation with a pastexpiresAtand asserting it's not deleted would guard against regressions in the status filter.packages/lib/src/compliance/file-cleanup/orphan-detector.ts (3)
1-4: Unused imports:and,isNull,filePages,channelMessages.
andandisNull(from drizzle-orm) andfilePages/channelMessages(from@pagespace/db) are imported but never referenced as Drizzle symbols — the raw SQL queries use string table names instead. Onlyeq,sql, andfilesare actually used.🧹 Clean up unused imports
-import { eq, sql, and, isNull } from 'drizzle-orm'; +import { eq, sql } from 'drizzle-orm'; import type { NodePgDatabase } from 'drizzle-orm/node-postgres'; -import { files, filePages } from '@pagespace/db'; -import { channelMessages } from '@pagespace/db'; +import { files } from '@pagespace/db';
26-49: Unbounded result set from orphan detection query.
findOrphanedFileRecordsreturns all orphaned files in a single query with noLIMIT. In a system with significant file churn, this could return thousands of rows, causing high memory usage and a long-running query. Consider adding a configurable limit (e.g., abatchSizeparameter) so the cron job processes orphans in manageable chunks.♻️ Add an optional limit parameter
-export async function findOrphanedFileRecords(database: DB): Promise<OrphanedFile[]> { +export async function findOrphanedFileRecords(database: DB, limit = 1000): Promise<OrphanedFile[]> { const result = await database.execute(sql` SELECT f.id, f."storagePath", f."driveId", f."sizeBytes" FROM files f LEFT JOIN file_pages fp ON fp."fileId" = f.id LEFT JOIN channel_messages cm ON cm."fileId" = f.id LEFT JOIN pages p ON p."filePath" = f."storagePath" AND p."filePath" IS NOT NULL WHERE fp."fileId" IS NULL AND cm."fileId" IS NULL AND p.id IS NULL + LIMIT ${limit} `);
75-87: N individual DELETEs instead of a single batched statement.
deleteFileRecordsissues oneDELETEper file ID, resulting in N round-trips to the database. For a batch of orphaned files this is unnecessarily slow. Drizzle'sinArrayoperator can delete all matching rows in a single statement.♻️ Batch delete with `inArray`
+import { inArray } from 'drizzle-orm'; + export async function deleteFileRecords(database: DB, fileIds: string[]): Promise<number> { if (fileIds.length === 0) return 0; - let deleted = 0; - for (const id of fileIds) { - const result = await database - .delete(files) - .where(eq(files.id, id)) - .returning({ id: files.id }); - deleted += result.length; - } - return deleted; + const result = await database + .delete(files) + .where(inArray(files.id, fileIds)) + .returning({ id: files.id }); + return result.length; }If the list can be very large, chunk the IDs (e.g., batches of 500) to stay within PostgreSQL's parameter limit.
packages/lib/src/compliance/retention/retention-engine.ts (1)
22-29:.returning()materializes all deleted IDs in memory.For high-volume tables like
sessions, expiration cleanup could delete tens of thousands of rows at once..returning({ id })loads every deleted ID into Node.js memory just to count them. If the row counts are expected to stay modest this is fine, but for unbounded growth scenarios consider either:
- Adding a
LIMITto the delete (process in batches), or- Using raw SQL with
sql\DELETE ... WHERE ... RETURNING count(*)`or checkingresult.rowCount` if available from the driver.This applies to all 10 cleanup functions following the same pattern.
packages/lib/src/compliance/export/gdpr-export.ts (1)
349-380: N+1 query pattern for task items.One query per task list fetches items individually. This can be combined into a single query using
inArray.♻️ Batch fetch task items
export async function collectUserTasks(database: DB, userId: string): Promise<UserTaskExport[]> { const lists = await database .select({ id: taskLists.id, title: taskLists.title, }) .from(taskLists) .where(eq(taskLists.userId, userId)); + if (lists.length === 0) return []; + + const listIds = lists.map(l => l.id); + const allItems = await database + .select({ + id: taskItems.id, + title: taskItems.title, + status: taskItems.status, + priority: taskItems.priority, + createdAt: taskItems.createdAt, + taskListId: taskItems.taskListId, + }) + .from(taskItems) + .where(inArray(taskItems.taskListId, listIds)); + + const itemsByList = new Map<string, typeof allItems>(); + for (const item of allItems) { + const group = itemsByList.get(item.taskListId) ?? []; + group.push(item); + itemsByList.set(item.taskListId, group); + } + const result: UserTaskExport[] = []; - for (const list of lists) { - const items = await database - .select({ - id: taskItems.id, - title: taskItems.title, - status: taskItems.status, - priority: taskItems.priority, - createdAt: taskItems.createdAt, - }) - .from(taskItems) - .where(eq(taskItems.taskListId, list.id)); - result.push({ listId: list.id, listTitle: list.title, - items, + items: (itemsByList.get(list.id) ?? []).map(({ taskListId, ...rest }) => rest), }); } - return result; }
Replace per-record loops with batch inArray queries in deleteFileRecords, collectUserPages, and collectUserTasks. Remove unused drizzle-orm and schema imports. Eliminate redundant collectUserDrives call in collectAllUserData. Restore .worktrees/ and .codename-grove/ gitignore exclusions with trailing newline. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ch timeout, hash fallback - delete-file.ts: Return 404 when neither original nor cache existed. Fix pre-existing TS2742 with explicit Router type annotation. - export/route.ts: Replace TransformStream+writer with ReadableStream controller to avoid unawaited backpressure promises. - cleanup-orphaned-files/route.ts: Add AbortSignal.timeout(10s) to processor fetch calls. Skip orphans with malformed storagePath instead of falling back to orphan.id. Fix missing userId arg in createDriveServiceToken call. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Self-service GDPR data export UI wired to the existing /api/account/export endpoint, with rate-limit handling and blob download. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Renumber our migration 0078_sturdy_the_enforcers → 0079 to resolve conflict with master's 0078_fat_fallen_one (AI provider security). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Both PRs generated migrations from the same parent snapshot (0077), creating a fork. Re-chain 0079 as child of 0078 and fix journal timestamp ordering. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
expiresAtcolumns (sessions, tokens, page versions, drive backups, invitations, AI usage logs, etc). RespectsisPinnedflag. Cron endpoint at/api/cron/retention-cleanup. Fixes No data retention policy engine #460filePages,channelMessages, andpages. Processor service delete endpoint + cron job to find and purge orphaned files from both DB and disk. Fixes Orphaned files persist forever after page deletion or account removal #457GET /api/account/exportendpoint (GDPR Art. 15/20). Streams ZIP archive with profile, drives, pages, messages (AI chat, channel, conversation, DM), file metadata, activity logs, AI usage, and tasks. Rate limited to 1 per 24h. Fixes No data subject export (GDPR Art. 15/20) #461Schema change
expiresAtcolumn + index toaiUsageLogstable (migration0078)Review feedback addressed
success: true). Fixed pre-existing TS2742 with explicit Router type annotation.TransformStream+ unawaitedwriter.write()withReadableStreamcontroller to avoid backpressure issues.AbortSignal.timeout(10s)to processor fetch calls. Skip orphans with malformedstoragePath(log warning + add to failures) instead of falling back toorphan.id. Fixed missinguserIdarg increateDriveServiceToken.inArrayqueries incollectUserPages,collectUserTasks, anddeleteFileRecords. Eliminated redundantcollectUserDrivescall incollectAllUserData. Removed unused imports (or,and,isNull,filePages,channelMessages)..worktrees/and.codename-grove/exclusions with trailing newline.Test plan
pnpm typecheck— all packages pass (lib, db, web, processor)curl -H "Authorization: Bearer $CRON_SECRET" http://localhost:3000/api/cron/retention-cleanupcurl -H "Authorization: Bearer $CRON_SECRET" http://localhost:3000/api/cron/cleanup-orphaned-filesGET /api/account/export→ downloads ZIPpnpm db:migrateto apply theaiUsageLogs.expiresAtmigration🤖 Generated with Claude Code