Replace queue-based indexing with pipeline coordinator#180
Merged
Conversation
- Rename internal/clip to internal/ai
- Add internal/image/pipeline package with sequential coordinator
- Add internal/task package with typed task constants
- Remove old queue-based indexFiles/indexMetadata/indexContents methods
- Add new HTTP task API (POST /tasks, DELETE /tasks/{id})
- Update api.yaml and regenerate openapi bindings
- Add PIPELINE_DESIGN.md
- Update coordinator.go AddFiles comment - Update coordinator_test.go priority order test comment - Rewrite pipeline README.md removing Stage 3 (faces), ForceFaces, FaceWorkers, FaceDetector, HasFaces/WriteFaces, face performance notes
- Migrate unique content (channel return pattern, no EXIF thumbs rationale) - Remove duplicate sections from README - Delete top-level PIPELINE_DESIGN.md
…nce to NewFacesTask
- Add stopTask() API call (DELETE /tasks/{id})
- Update createTask() to accept force option
- Replace deprecated Reindex buttons with Index metadata / color+AI / all
- Add force reindex checkbox to CollectionDebug
- Add stop button to each task in TaskList
- Forward force parameter through CollectionPanel and App reload handler
- Fix stale NewFacesTask reference in NewIndexTask deprecation comment
Contributor
There was a problem hiding this comment.
Pull request overview
This PR replaces the legacy queue-based indexing flow with a new sequential, prioritized indexing pipeline (internal/image/pipeline) coordinated by a single in-process coordinator, and updates the API + UI to support “force reindex”, “index all”, and task cancellation (DELETE /tasks/{id}).
Changes:
- Introduces
internal/image/pipelinestages (files → metadata → contents) with a coordinator that deduplicates tasks and runs them sequentially. - Extracts task lifecycle into
internal/taskand wires the coordinator intomain.gohandlers (including task stop endpoint). - Updates UI task controls (new indexing buttons, “Force reindex” toggle, and stop button) and updates the tasks API schema/SDK.
Reviewed changes
Copilot reviewed 31 out of 32 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| ui/src/components/TaskList.vue | Adds per-task stop button calling DELETE /tasks/{id}. |
| ui/src/components/CollectionPanel.vue | Updates reload event forwarding to include (type, force). |
| ui/src/components/CollectionDebug.vue | Replaces old reindex buttons with new task types + force checkbox. |
| ui/src/App.vue | Updates reload handler to pass {force:true} to task creation. |
| ui/src/api.js | Adds stopTask() and extends createTask() to accept options (e.g. force). |
| SPLIT_PLAN.md | Documents plan to split the broader refactor branch into multiple PRs. |
| main.go | Removes global task queues; wires pipeline coordinator into /tasks handlers and adds DELETE /tasks/{id}. |
| internal/task/task.go | New task + registry types used by the pipeline coordinator. |
| internal/render/scene.go | Renames clip import/types to internal/ai. |
| internal/openapi/api.gen.go | Regenerated OpenAPI server/types (force flag, INDEX_ALL, DELETE /tasks/{id}). |
| internal/image/source.go | Removes queue-backed indexing; adds accessors used by the pipeline; renames clip→ai. |
| internal/image/search.go | Renames clip→ai and updates similarity helpers. |
| internal/image/pipeline/types.go | Defines typed stage payload structs for the pipeline. |
| internal/image/pipeline/thumbnail.go | Implements thumbnail load/generate stage with worker pool + progress. |
| internal/image/pipeline/source.go | Implements DB-sourced file candidate streams (forced vs missing-only) + metadata batching. |
| internal/image/pipeline/README.md | Adds design/usage documentation for the new pipeline. |
| internal/image/pipeline/progress.go | Adds structured progress logging with rate/ETA. |
| internal/image/pipeline/metadata.go | Implements metadata extraction stage writing directly to DB. |
| internal/image/pipeline/files.go | Implements directory walk + DB add/remove logic for file scanning. |
| internal/image/pipeline/coordinator.go | Implements sequential coordinator, priority ordering, and task stop support. |
| internal/image/pipeline/coordinator_test.go | Adds coordinator tests (sequential run, dedupe, priority order, shutdown). |
| internal/image/pipeline/contents.go | Implements color extraction + AI embedding from thumbnails. |
| internal/image/indexMetadata.go | Deletes old queue-worker metadata indexer. |
| internal/image/indexFiles.go | Deletes old file walk helper (replaced by pipeline stage). |
| internal/image/indexContents.go | Deletes old queue-worker contents indexer. |
| internal/image/database.go | Renames clip→ai and adds CountMissing helper for pipeline stage totals. |
| internal/collection/collection.go | Renames clip→ai in similarity API. |
| internal/ai/embedding.go | New embedding type + similarity/dot product helpers (renamed from clip). |
| internal/ai/client.go | New AI client (embeddings + faces API client code) under internal/ai. |
| go.mod | Adjusts prometheus client_model requirement classification (now indirect). |
| config.go | Renames config type from clip.AI to ai.AI. |
| api.yaml | Adds force flag, INDEX_ALL task type, and DELETE /tasks/{id} endpoint; wraps POST /tasks responses in {items:[...]}. |
Files not reviewed (1)
- internal/openapi/api.gen.go: Language not supported
Comment on lines
+20
to
+52
| func walkFiles(dir string, extensions []string, maxFiles int) <-chan string { | ||
| out := make(chan string) | ||
| go func() { | ||
| finished := metrics.Elapsed(fmt.Sprintf("index %s", dir)) | ||
| defer finished() | ||
|
|
||
| progress := newProgress("files", 0) | ||
| defer progress.Done() | ||
|
|
||
| files := 0 | ||
| err := godirwalk.Walk(dir, &godirwalk.Options{ | ||
| Unsorted: true, | ||
| Callback: func(path string, _ *godirwalk.Dirent) error { | ||
| if strings.Contains(path, "@eaDir") { | ||
| return filepath.SkipDir | ||
| } | ||
|
|
||
| matched := false | ||
| for _, ext := range extensions { | ||
| if strings.HasSuffix(strings.ToLower(path), ext) { | ||
| matched = true | ||
| break | ||
| } | ||
| } | ||
| if !matched { | ||
| return nil | ||
| } | ||
|
|
||
| files++ | ||
| progress.Inc(1) | ||
| out <- path | ||
| if maxFiles > 0 && files >= maxFiles { | ||
| return errSkip |
Comment on lines
+88
to
+93
| candidates = c | ||
| } else { | ||
| // Query for missing contents | ||
| log.Println("index source files missing contents") | ||
| candidates = db.ListMissing(dirs, maxPhotos, img.Missing{Color: true, Embedding: true}) | ||
| } |
Comment on lines
+100
to
+132
| batchSize := 100 | ||
| ids := make([]img.ImageId, 0, batchSize) | ||
| idToPath := make(map[img.ImageId]string) | ||
| idToMissing := make(map[img.ImageId]img.Missing) | ||
|
|
||
| candidateCount := 0 | ||
| for candidate := range candidates { | ||
| candidateCount++ | ||
| ids = append(ids, candidate.Id) | ||
| idToPath[candidate.Id] = candidate.Path | ||
| idToMissing[candidate.Id] = candidate.Missing | ||
|
|
||
| // Process batch when full | ||
| if len(ids) >= batchSize { | ||
| log.Printf("index source batch %d files\n", len(ids)) | ||
| results := db.GetBatch(ids) | ||
| resultCount := 0 | ||
| for result := range results { | ||
| resultCount++ | ||
| missingInfo := idToMissing[result.Id] | ||
| select { | ||
| case out <- fileWithMeta{ | ||
| fileRef: fileRef{ID: result.Id, Path: idToPath[result.Id]}, | ||
| Info: result.Info, | ||
| Tags: nil, | ||
| Missing: missingInfo, | ||
| }: | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| ids = ids[:0] | ||
| } |
| ThumbnailGenerators: pipelineThumbGens, | ||
| ThumbnailSink: imageSource.ThumbSink(), | ||
| AIService: imageSource.Clip, | ||
| ImageDecoder: imageSource.ThumbSink(), |
Comment on lines
+41
to
+49
| // Counter returns a channel for incrementing the task's Done counter | ||
| func (t *Task) Counter() chan<- int { | ||
| counter := make(chan int, 10) | ||
| go func() { | ||
| for add := range counter { | ||
| t.Done += add | ||
| } | ||
| }() | ||
| return counter |
Comment on lines
+32
to
+38
| <ui-icon-button | ||
| v-if="task.id" | ||
| icon="close" | ||
| class="stop-button" | ||
| title="Stop task" | ||
| @click="stopTask(task.id)" | ||
| ></ui-icon-button> |
Comment on lines
+343
to
+346
| All stages return error channels. Coordinator collects all errors: | ||
| - Logs first 10 errors | ||
| - Continues processing remaining items | ||
| - Returns error if any occurred |
Comment on lines
+118
to
+131
| } else { | ||
| if count, ok := cfg.DB.CountMissing(dirs, img.Missing{Color: true, Embedding: true}); ok { | ||
| if maxPhotos > 0 && count > maxPhotos { | ||
| count = maxPhotos | ||
| } | ||
| t.Total = count | ||
| if count > 0 { | ||
| log.Printf("index contents extract %d files\n", count) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| metaOut := fileSourceWithMetadata(ctx, cfg.DB, dirs, maxPhotos, force) | ||
|
|
…ty, and UI stop handling Agent-Logs-Url: https://github.com/SmilyOrg/photofield/sessions/12500dec-3761-4a3f-ab32-3f791d94d6ab Co-authored-by: SmilyOrg <1451391+SmilyOrg@users.noreply.github.com>
Contributor
|
Just as a heads up, I was blocked by some firewall rules while working on your feedback. Expand below for details. Warning Firewall rules blocked me from connecting to one or more addresses (expand for details)I tried to connect to the following addresses, but was blocked by firewall rules:
If you need me to access, download, or install something from one of these locations, you can either:
|
- Add TaskRunner hook to Config for test injection - Rewrite TestCoordinatorDuplicatePrevention to verify in-flight dedup using a blocking TaskRunner with started/block channels - Rewrite TestCoordinatorList to assert post-completion empty state instead of a mid-flight snapshot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Replaces the old queue-based indexing system (
globalTasks sync.Map,metadataQueue,contentsQueue) with a newinternal/image/pipelinepackage that coordinates indexing as a sequential, prioritised pipeline.Changes
Backend
internal/image/pipeline/— new package with clean stage separation:coordinator.go— sequential task runner with priority ordering (files → metadata → contents); deduplicates concurrent requests; supports cancellation viaDELETE /tasks/{id}files.go— stage 1: walk directories, add/remove DB recordsmetadata.go— stage 2: extract EXIF/metadata with configurable workerscontents.go— stage 3: generate thumbnails, extract prominent colour and AI embeddingssource.go— file sourcing helpers (force vs. missing-only)thumbnail.go— thumbnail load/generate abstractionprogress.go— structured progress logging with rate/ETAtypes.go— pipeline-internal stage typescoordinator_test.go— sequential execution, duplicate prevention, priority ordering, shutdown testsinternal/task/— new task lifecycle package extracted frommain.gointernal/ai/— renamed frominternal/clip(embedding types/functions)main.go— wirespipeline.Coordinatorinto HTTP handlers; removesglobalTasks,indexCollection, and queue-backed task stubs; addsDELETE /tasks/{id}handlerapi.yaml— addsforceparameter to task creation,INDEX_ALLtask type,DELETE /tasks/{id}endpoint; deprecatesINDEX_CONTENTS_COLOR/INDEX_CONTENTS_AIinternal/image/database.go— addsCountMissing,GetDirsCounthelpers used by pipeline stagesFrontend
Reindex *buttons withIndex metadata,Index color & AI,Index allforce: trueto the API)DELETE /tasks/{id})api.js: addstopTask(), updatecreateTask()to accept an options object