-
Couldn't load subscription status.
- Fork 244
new importer #214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
lindesvard
wants to merge
1
commit into
main
Choose a base branch
from
feature/import
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
new importer #214
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,338 @@ | ||
| # Import Package Code Review | ||
|
|
||
| ## Summary | ||
| Successfully implemented a flexible, provider-based import system for analytics data. The architecture is solid, but there are several improvements we can make. | ||
|
|
||
| --- | ||
|
|
||
| ## 🔴 Critical Issues | ||
|
|
||
| ### 1. **Type Safety in Import Job** | ||
| **Location:** `apps/worker/src/jobs/import.ts` | ||
|
|
||
| **Issue:** Extensive use of `any` types throughout the file | ||
| ```typescript | ||
| const eventBatch: any[] = []; // Line 39 | ||
| provider: any, // Line 113 | ||
| events: any[], // Line 169 | ||
| ``` | ||
|
|
||
| **Fix:** Create proper types | ||
| ```typescript | ||
| import type { BaseImportProvider } from '@openpanel/importer'; | ||
|
|
||
| async function processBatch<T = unknown>( | ||
| rawEvents: T[], | ||
| provider: BaseImportProvider<T>, | ||
| sessionReconstructor: SessionReconstructor, | ||
| jobLogger: Logger, | ||
| batchNumber: number | ||
| ) | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ## 🟡 Medium Priority Issues | ||
|
|
||
| ### 2. **Missing UUID Dependency** | ||
| **Location:** `packages/importer/package.json` | ||
|
|
||
| **Issue:** `uuid` is used but not in dependencies | ||
| ```json | ||
| "dependencies": { | ||
| "csv-parser": "^3.0.0", | ||
| "ramda": "^0.29.1" | ||
| // uuid is missing! | ||
| } | ||
| ``` | ||
|
|
||
| **Fix:** Add to dependencies | ||
| ```json | ||
| "dependencies": { | ||
| "@openpanel/db": "workspace:*", | ||
| "@openpanel/common": "workspace:*", | ||
| "@openpanel/queue": "workspace:*", | ||
| "csv-parser": "^3.0.0", | ||
| "ramda": "^0.29.1", | ||
| "uuid": "^9.0.1" | ||
| } | ||
| ``` | ||
|
|
||
| ### 3. **Session Reconstructor Mutates Input** | ||
| **Location:** `packages/importer/src/session-reconstructor.ts:23` | ||
|
|
||
| **Issue:** Directly mutates the event object | ||
| ```typescript | ||
| event.session_id = session.id; // Mutates input | ||
| ``` | ||
|
|
||
| **Fix:** Return new objects instead | ||
| ```typescript | ||
| const processedEvent = { | ||
| ...event, | ||
| session_id: session.id | ||
| }; | ||
| results.push(processedEvent); | ||
| ``` | ||
|
|
||
| ### 4. **Missing Error Handling for Invalid Timestamps** | ||
| **Location:** `packages/importer/src/session-reconstructor.ts:18,24,39` | ||
|
|
||
| **Issue:** No validation for invalid dates | ||
| ```typescript | ||
| new Date(event.created_at).getTime() // Could be NaN | ||
| ``` | ||
|
|
||
| **Fix:** Add validation | ||
| ```typescript | ||
| private getTimestamp(dateString: string | undefined): number { | ||
| if (!dateString) return Date.now(); | ||
| const timestamp = new Date(dateString).getTime(); | ||
| return Number.isNaN(timestamp) ? Date.now() : timestamp; | ||
| } | ||
| ``` | ||
|
|
||
| ### 5. **Missing Types Export** | ||
| **Location:** `packages/importer/package.json` | ||
|
|
||
| **Issue:** `@types/csv-parser` not in devDependencies | ||
| ```json | ||
| "devDependencies": { | ||
| "@types/node": "^20.0.0", | ||
| "typescript": "^5.0.0", | ||
| "vitest": "^1.0.0" | ||
| // Missing @types/csv-parser | ||
| } | ||
| ``` | ||
|
|
||
| **Fix:** Add missing type definitions | ||
|
|
||
| ### 6. **Import Job Progress Calculation** | ||
| **Location:** `apps/worker/src/jobs/import.ts:70` | ||
|
|
||
| **Issue:** `totalBatches` calculation is inaccurate during processing | ||
| ```typescript | ||
| totalBatches: Math.ceil(totalEvents / BATCH_SIZE) // Changes as totalEvents grows | ||
| ``` | ||
|
|
||
| **Fix:** Calculate total batches at the end or estimate better | ||
|
|
||
| --- | ||
|
|
||
| ## 🟢 Nice-to-Have Improvements | ||
|
|
||
| ### 7. **Configurable Session Timeout** | ||
| **Current:** Hardcoded 30-minute timeout | ||
| ```typescript | ||
| private readonly SESSION_TIMEOUT = 30 * 60 * 1000; | ||
| ``` | ||
|
|
||
| **Improvement:** Make it configurable per provider | ||
| ```typescript | ||
| constructor(private readonly sessionTimeout = 30 * 60 * 1000) {} | ||
| ``` | ||
|
|
||
| ### 8. **Better Logging for Skipped Events** | ||
| **Current:** Logs entire raw event | ||
| ```typescript | ||
| jobLogger.warn('Skipping invalid event', { rawEvent }); | ||
| ``` | ||
|
|
||
| **Improvement:** Log only relevant fields to avoid noise | ||
| ```typescript | ||
| jobLogger.warn('Skipping invalid event', { | ||
| event_id: rawEvent.event_id, | ||
| session_id: rawEvent.session_id, | ||
| reason: 'validation_failed' | ||
| }); | ||
| ``` | ||
|
|
||
| ### 9. **Memory Cleanup Optimization** | ||
| **Current:** Cleanup runs on every event | ||
| ```typescript | ||
| this.cleanupStaleSessions(currentTimestamp); // Called for EVERY event | ||
| ``` | ||
|
|
||
| **Improvement:** Cleanup periodically (e.g., every 1000 events) | ||
| ```typescript | ||
| if (this.processedCount % 1000 === 0) { | ||
| this.cleanupStaleSessions(currentTimestamp); | ||
| } | ||
| ``` | ||
|
|
||
| ### 10. **Provider Registry Pattern** | ||
| **Current:** Switch statement for providers | ||
| ```typescript | ||
| function createProvider(providerName: string, config: any) { | ||
| switch (providerName) { | ||
| case 'umami': return new UmamiProvider(config); | ||
| default: throw new Error(`Unknown provider: ${providerName}`); | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| **Improvement:** Registry pattern for extensibility | ||
| ```typescript | ||
| class ProviderRegistry { | ||
| private static providers = new Map<string, typeof BaseImportProvider>(); | ||
|
|
||
| static register(name: string, provider: typeof BaseImportProvider) { | ||
| this.providers.set(name, provider); | ||
| } | ||
|
|
||
| static create(name: string, config: ImportConfig) { | ||
| const Provider = this.providers.get(name); | ||
| if (!Provider) throw new Error(`Unknown provider: ${name}`); | ||
| return new Provider(config); | ||
| } | ||
| } | ||
|
|
||
| // Usage: | ||
| ProviderRegistry.register('umami', UmamiProvider); | ||
| ProviderRegistry.register('mixpanel', MixpanelProvider); | ||
| ``` | ||
|
|
||
| ### 11. **Database Transaction Support** | ||
| **Current:** Events inserted individually | ||
| ```typescript | ||
| await Promise.all(events.map(event => createEvent(event))); | ||
| ``` | ||
|
|
||
| **Improvement:** Consider batch insert optimization or transaction support | ||
| ```typescript | ||
| await createEventsBatch(events); // Batch insert to ClickHouse | ||
| ``` | ||
|
|
||
| ### 12. **Import Metadata Tracking** | ||
| **Issue:** Import table created in schema but never used | ||
|
|
||
| **Improvement:** Update import status in database | ||
| ```typescript | ||
| // At start | ||
| await db.import.update({ | ||
| where: { id: importId }, | ||
| data: { status: 'processing', totalEvents: 0 } | ||
| }); | ||
|
|
||
| // During processing | ||
| await db.import.update({ | ||
| where: { id: importId }, | ||
| data: { | ||
| processedEvents, | ||
| totalEvents, | ||
| status: 'processing' | ||
| } | ||
| }); | ||
|
|
||
| // On completion | ||
| await db.import.update({ | ||
| where: { id: importId }, | ||
| data: { | ||
| status: 'completed', | ||
| completedAt: new Date() | ||
| } | ||
| }); | ||
| ``` | ||
|
|
||
| ### 13. **Test Coverage** | ||
| **Current:** Basic unit tests created | ||
|
|
||
| **Improvement:** Add | ||
| - Integration tests with real Redis | ||
| - Performance tests with large datasets | ||
| - Error recovery tests (resume failed imports) | ||
|
|
||
| ### 14. **Provider-Specific Configuration** | ||
| **Current:** All providers share same config structure | ||
|
|
||
| **Improvement:** Allow provider-specific options | ||
| ```typescript | ||
| interface ImportConfig { | ||
| projectId: string; | ||
| provider: string; | ||
| sourceType: 'file' | 'api'; | ||
| sourceLocation: string; | ||
| providerOptions?: Record<string, unknown>; // Provider-specific config | ||
| } | ||
|
|
||
| // Usage for Umami: | ||
| providerOptions: { | ||
| dateFormat: 'ISO8601', | ||
| timezone: 'UTC', | ||
| customFieldMapping: { ... } | ||
| } | ||
| ``` | ||
|
|
||
| ### 15. **Resumable Imports** | ||
| **Current:** No resume capability | ||
|
|
||
| **Improvement:** Track progress and allow resume | ||
| ```typescript | ||
| interface ImportProgress { | ||
| lastProcessedEventId?: string; | ||
| lastBatchNumber?: number; | ||
| checkpointTimestamp?: string; | ||
| } | ||
|
|
||
| // Store checkpoint after each batch | ||
| await saveImportCheckpoint(importId, { | ||
| lastProcessedEventId: events[events.length - 1].id, | ||
| lastBatchNumber: currentBatch, | ||
| checkpointTimestamp: new Date().toISOString() | ||
| }); | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ## 📊 Architecture Strengths | ||
|
|
||
| ✅ **Clean separation of concerns** - Provider, session reconstruction, and job logic well separated | ||
| ✅ **Extensible design** - Easy to add new providers | ||
| ✅ **Memory efficient** - Streaming approach with batching | ||
| ✅ **Good error handling** - Provider hooks for custom error handling | ||
| ✅ **Reuses existing buffers** - Integrates well with existing event processing | ||
| ✅ **Comprehensive tests** - Good test coverage structure | ||
|
|
||
| --- | ||
|
|
||
| ## 🎯 Priority Recommendations | ||
|
|
||
| **Must Fix (Before Production):** | ||
| 1. Add `uuid` to package.json dependencies | ||
| 2. Fix type safety in import.ts | ||
| 3. Add validation for invalid timestamps | ||
| 4. Implement import metadata tracking | ||
|
|
||
| **Should Fix (Soon):** | ||
| 5. Make session timeout configurable | ||
| 6. Improve memory cleanup efficiency | ||
| 7. Add @types/csv-parser to devDependencies | ||
|
|
||
| **Nice to Have (Future):** | ||
| 8. Implement provider registry pattern | ||
| 9. Add resumable imports feature | ||
| 10. Optimize batch insert performance | ||
| 11. Add comprehensive integration tests | ||
|
|
||
| --- | ||
|
|
||
| ## 📝 Code Quality Metrics | ||
|
|
||
| - **Type Safety:** 6/10 (needs improvement) | ||
| - **Error Handling:** 8/10 (good, could be better) | ||
| - **Testability:** 9/10 (excellent) | ||
| - **Extensibility:** 9/10 (excellent) | ||
| - **Performance:** 8/10 (good, streaming approach) | ||
| - **Documentation:** 7/10 (good comments, could use more) | ||
|
|
||
| --- | ||
|
|
||
| ## 🚀 Next Steps | ||
|
|
||
| 1. Fix critical issues (uuid dependency, type safety) | ||
| 2. Add import metadata tracking to database | ||
| 3. Test with larger datasets (100k+ events) | ||
| 4. Add monitoring/observability (metrics, alerts) | ||
| 5. Document provider creation guide for future providers | ||
| 6. Consider adding import preview feature (validate without importing) | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix color contrast for accessibility.
White text on a yellow-500 background likely fails WCAG AA contrast requirements (4.5:1). Yellow backgrounds conventionally require dark text for proper readability.
Apply this diff to fix the contrast issue:
Alternatively, use a darker yellow background with white text:
📝 Committable suggestion
🤖 Prompt for AI Agents