diff --git a/cmd/config.go b/cmd/config.go index bed861d..1eaa221 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -37,6 +37,10 @@ var logFormat string var removeSingleAssetStacks bool var includeVideos bool var includeVideosFlagSet bool +var preventSelfRekt bool +var preventSelfRektFlagSet bool +var stackConcurrency int +var stackConcurrencyFlagSet bool var filterAlbumIDs []string var filterTakenAfter string var filterTakenBefore string @@ -156,6 +160,8 @@ func logStartupSummary(logger *logrus.Logger) { "withDeleted": withDeleted, "removeSingleAssetStacks": removeSingleAssetStacks, "includeVideos": includeVideos, + "preventSelfRekt": preventSelfRekt, + "stackConcurrency": stackConcurrency, "criteria": criteria, "parentFilenamePromote": parentFilenamePromote, "parentExtPromote": parentExtPromote, @@ -203,6 +209,12 @@ func logStartupSummary(logger *logrus.Logger) { if includeVideos { summary = append(summary, "include-videos=true") } + if preventSelfRekt { + summary = append(summary, "prevent-self-rekt=true") + } + if stackConcurrency > 1 { + summary = append(summary, fmt.Sprintf("stack-concurrency=%d", stackConcurrency)) + } if criteria != "" { summary = append(summary, fmt.Sprintf("criteria=%s", criteria)) } @@ -300,6 +312,21 @@ func LoadEnvForTesting() LoadEnvConfig { includeVideos = envInclude == "true" } } + if !preventSelfRektFlagSet { + if envPrevent := os.Getenv("PREVENT_SELF_REKT"); envPrevent != "" { + preventSelfRekt = envPrevent == "true" + } + } + if !stackConcurrencyFlagSet { + if envVal := os.Getenv("STACK_CONCURRENCY"); envVal != "" { + if n, err := strconv.Atoi(envVal); err == nil { + stackConcurrency = n + } + } + } + if stackConcurrency < 1 { + stackConcurrency = 1 + } if parentFilenamePromote == "" || parentFilenamePromote == utils.DefaultParentFilenamePromoteString { if envVal := os.Getenv("PARENT_FILENAME_PROMOTE"); envVal != "" { parentFilenamePromote = envVal diff --git a/cmd/duplicates.go b/cmd/duplicates.go index 664e13f..fae3c79 100644 --- a/cmd/duplicates.go +++ b/cmd/duplicates.go @@ -47,7 +47,7 @@ func runDuplicates(cmd *cobra.Command, args []string) { if i > 0 { logger.Infof("\n") } - client := immich.NewClient(apiURL, key, false, false, true, withArchived, withDeleted, false, includeVideos, nil, "", "", logger) + client := immich.NewClient(apiURL, key, false, false, true, withArchived, withDeleted, false, includeVideos, stackConcurrency, nil, "", "", logger) if client == nil { logger.Errorf("Invalid client for API key: %s", key) continue diff --git a/cmd/fixtrash.go b/cmd/fixtrash.go index 2b0bc8c..b5583b8 100644 --- a/cmd/fixtrash.go +++ b/cmd/fixtrash.go @@ -51,7 +51,7 @@ func runFixTrash(cmd *cobra.Command, args []string) { if i > 0 { logger.Infof("\n") } - client := immich.NewClient(apiURL, key, false, false, dryRun, withArchived, withDeleted, false, includeVideos, nil, "", "", logger) + client := immich.NewClient(apiURL, key, false, false, dryRun, withArchived, withDeleted, false, includeVideos, stackConcurrency, nil, "", "", logger) if client == nil { logger.Errorf("Invalid client for API key: %s", key) continue diff --git a/cmd/main.go b/cmd/main.go index b26f1ac..8ca3beb 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -33,6 +33,8 @@ func bindFlags(rootCmd *cobra.Command) { rootCmd.PersistentFlags().StringVar(&logFormat, "log-format", "", "Log format: text, json (or set LOG_FORMAT env var)") rootCmd.PersistentFlags().BoolVar(&removeSingleAssetStacks, "remove-single-asset-stacks", false, "Remove stacks with only one asset (or set REMOVE_SINGLE_ASSET_STACKS=true)") rootCmd.PersistentFlags().BoolVar(&includeVideos, "include-videos", false, "Include VIDEO assets alongside IMAGE in stacking (or set INCLUDE_VIDEOS=true)") + rootCmd.PersistentFlags().BoolVar(&preventSelfRekt, "prevent-self-rekt", false, "Insert a 50ms delay before each stack write to prevent overwhelming Immich on huge libraries (or set PREVENT_SELF_REKT=true)") + rootCmd.PersistentFlags().IntVar(&stackConcurrency, "stack-concurrency", 0, "Parallel stack writes (default 1 = sequential; values like 10-20 speed up large libraries; or set STACK_CONCURRENCY)") rootCmd.PersistentFlags().StringSliceVar(&filterAlbumIDs, "filter-album-ids", nil, "Filter by album IDs or names, comma-separated (or set FILTER_ALBUM_IDS env var)") rootCmd.PersistentFlags().StringVar(&filterTakenAfter, "filter-taken-after", "", "Filter assets taken after date, ISO 8601 (or set FILTER_TAKEN_AFTER env var)") rootCmd.PersistentFlags().StringVar(&filterTakenBefore, "filter-taken-before", "", "Filter assets taken before date, ISO 8601 (or set FILTER_TAKEN_BEFORE env var)") @@ -86,6 +88,12 @@ func CreateRootCommand() *cobra.Command { if cmd.Flags().Lookup("include-videos") != nil && cmd.Flags().Lookup("include-videos").Changed { includeVideosFlagSet = true } + if cmd.Flags().Lookup("prevent-self-rekt") != nil && cmd.Flags().Lookup("prevent-self-rekt").Changed { + preventSelfRektFlagSet = true + } + if cmd.Flags().Lookup("stack-concurrency") != nil && cmd.Flags().Lookup("stack-concurrency").Changed { + stackConcurrencyFlagSet = true + } }, } diff --git a/cmd/stacker.go b/cmd/stacker.go index 4dbaa10..d9969a8 100644 --- a/cmd/stacker.go +++ b/cmd/stacker.go @@ -7,6 +7,7 @@ package main import ( "strings" + "sync" "time" "github.com/majorfi/immich-stack/pkg/immich" @@ -174,7 +175,7 @@ func runStacker(cmd *cobra.Command, args []string) { if i > 0 { logger.Infof("\n") } - client := immich.NewClient(apiURL, key, resetStacks, replaceStacks, dryRun, withArchived, withDeleted, removeSingleAssetStacks, includeVideos, filterAlbumIDs, filterTakenAfter, filterTakenBefore, logger) + client := immich.NewClient(apiURL, key, resetStacks, replaceStacks, dryRun, withArchived, withDeleted, removeSingleAssetStacks, includeVideos, stackConcurrency, filterAlbumIDs, filterTakenAfter, filterTakenBefore, logger) if client == nil { logger.Errorf("Invalid client for API key: %s", key) continue @@ -225,110 +226,135 @@ func runStackerOnce(client *immich.Client, logger *logrus.Logger, ownerID string logger.Fatalf("Error stacking assets: %v", err) } + /********************************************************************************************** + ** Process each candidate stack. When stackConcurrency > 1, multiple stacks are written in + ** parallel using a bounded semaphore — the sequence WITHIN each stack (delete children → + ** modify) stays ordered, only different stacks run concurrently. Default concurrency = 1 + ** preserves the historical sequential behavior. See issue #53. + **********************************************************************************************/ + concurrency := max(stackConcurrency, 1) + sem := make(chan struct{}, concurrency) + var wg sync.WaitGroup for i, stack := range stacks { - _, _, newStackIDs := getParentAndChildrenIDs(stack) - _, _, originalStackIDs := getOriginalStackIDs(stack) + wg.Add(1) + sem <- struct{}{} + go func(i int, stack []utils.TAsset) { + defer wg.Done() + defer func() { <-sem }() + processStack(client, logger, i, len(stacks), stack) + }(i, stack) + } + wg.Wait() +} - /****************************************************************************************** - ** Adding debug logs - ******************************************************************************************/ - { - logger.Debugf("--------------------------------") - logger.Debugf("%d/%d Key: %s", i+1, len(stacks), stack[0].OriginalFileName) - logger.WithFields(logrus.Fields{ - "Name": stack[0].OriginalFileName, - "ID": stack[0].ID, - "Time": stack[0].LocalDateTime, - }).Debugf("\tParent") - for _, child := range stack[1:] { - logger.WithFields(logrus.Fields{ - "Name": child.OriginalFileName, - "ID": child.ID, - "Time": child.LocalDateTime, - }).Debugf("\tChild") - } - } +/************************************************************************************************** +** processStack runs the per-stack pipeline: compare against existing state, optionally delete +** child stacks (for replace mode), throttle if requested, and call ModifyStack. Designed to be +** safe to invoke from many goroutines in parallel — operates on its own stack slice and uses +** the client/logger which are themselves goroutine-safe. +**************************************************************************************************/ +func processStack(client *immich.Client, logger *logrus.Logger, i int, total int, stack []utils.TAsset) { + _, _, newStackIDs := getParentAndChildrenIDs(stack) + _, _, originalStackIDs := getOriginalStackIDs(stack) - /****************************************************************************************** - ** Doing standard stacker checks. - ******************************************************************************************/ - if !isValidStack(newStackIDs) { - logger.Debugf("\t⚠️ Invalid stack: %s", stack[0].OriginalFileName) - continue - } - if !needsStackUpdate(originalStackIDs, newStackIDs) { - logger.Debugf("\tℹ️ No update needed for stack: %s", stack[0].OriginalFileName) - continue - } - childrenWithStack, hasChildrenWithStack := getChildrenWithStack(stack) - if hasChildrenWithStack && !replaceStacks { - logger.Debugf("\tℹ️ No replaceStacks, skipping stack: %s", stack[0].OriginalFileName) - continue - } + /********************************************************************************************** + ** Adding debug logs + **********************************************************************************************/ + logger.Debugf("--------------------------------") + logger.Debugf("%d/%d Key: %s", i+1, total, stack[0].OriginalFileName) + logger.WithFields(logrus.Fields{ + "Name": stack[0].OriginalFileName, + "ID": stack[0].ID, + "Time": stack[0].LocalDateTime, + }).Debugf("\tParent") + for _, child := range stack[1:] { + logger.WithFields(logrus.Fields{ + "Name": child.OriginalFileName, + "ID": child.ID, + "Time": child.LocalDateTime, + }).Debugf("\tChild") + } - /****************************************************************************************** - ** Adding info logs, but only if we are not in debug mode. - ******************************************************************************************/ - { - if !logger.IsLevelEnabled(logrus.DebugLevel) { - logger.Infof("--------------------------------") - logger.Infof("%d/%d Key: %s", i+1, len(stacks), stack[0].OriginalFileName) - } - if !logger.IsLevelEnabled(logrus.DebugLevel) { - logger.WithFields(logrus.Fields{ - "Name": stack[0].OriginalFileName, - "ID": stack[0].ID, - "Time": stack[0].LocalDateTime, - }).Infof("\tParent") - for _, child := range stack[1:] { - logger.WithFields(logrus.Fields{ - "Name": child.OriginalFileName, - "ID": child.ID, - "Time": child.LocalDateTime, - }).Infof("\tChild") - } - } - } + /********************************************************************************************** + ** Doing standard stacker checks. + **********************************************************************************************/ + if !isValidStack(newStackIDs) { + logger.Debugf("\t⚠️ Invalid stack: %s", stack[0].OriginalFileName) + return + } + if !needsStackUpdate(originalStackIDs, newStackIDs) { + logger.Debugf("\tℹ️ No update needed for stack: %s", stack[0].OriginalFileName) + return + } + childrenWithStack, hasChildrenWithStack := getChildrenWithStack(stack) + if hasChildrenWithStack && !replaceStacks { + logger.Debugf("\tℹ️ No replaceStacks, skipping stack: %s", stack[0].OriginalFileName) + return + } - /****************************************************************************************** - ** Add comparison debug logging. - ******************************************************************************************/ - if logger.IsLevelEnabled(logrus.DebugLevel) { - logger.Debugf("\tStack comparison:") - logger.Debugf("\t Original: %v", originalStackIDs) - logger.Debugf("\t Expected: %v", newStackIDs) - logger.Debugf("\t REPLACE_STACKS: %v", replaceStacks) + /********************************************************************************************** + ** Adding info logs, but only if we are not in debug mode. + **********************************************************************************************/ + if !logger.IsLevelEnabled(logrus.DebugLevel) { + logger.Infof("--------------------------------") + logger.Infof("%d/%d Key: %s", i+1, total, stack[0].OriginalFileName) + logger.WithFields(logrus.Fields{ + "Name": stack[0].OriginalFileName, + "ID": stack[0].ID, + "Time": stack[0].LocalDateTime, + }).Infof("\tParent") + for _, child := range stack[1:] { + logger.WithFields(logrus.Fields{ + "Name": child.OriginalFileName, + "ID": child.ID, + "Time": child.LocalDateTime, + }).Infof("\tChild") } + } - /****************************************************************************************** - ** Delete children stacks if replaceStacks is true. - ******************************************************************************************/ - if replaceStacks { - for _, childID := range childrenWithStack { - client.DeleteStack(childID, utils.REASON_REPLACE_CHILD_STACK_WITH_NEW_ONE) - } - } + /********************************************************************************************** + ** Add comparison debug logging. + **********************************************************************************************/ + if logger.IsLevelEnabled(logrus.DebugLevel) { + logger.Debugf("\tStack comparison:") + logger.Debugf("\t Original: %v", originalStackIDs) + logger.Debugf("\t Expected: %v", newStackIDs) + logger.Debugf("\t REPLACE_STACKS: %v", replaceStacks) + } - /****************************************************************************************** - ** Determine action type for logging. - ******************************************************************************************/ - var actionMsg string - if len(originalStackIDs) == 0 { - actionMsg = "\t🆕 Creating new stack" - } else if replaceStacks && len(childrenWithStack) > 0 { - actionMsg = "\t🔄 Replacing existing stack (deleted child stacks)" - } else { - actionMsg = "\t✏️ Updating stack configuration" + /********************************************************************************************** + ** Delete children stacks if replaceStacks is true. + **********************************************************************************************/ + if replaceStacks { + for _, childID := range childrenWithStack { + client.DeleteStack(childID, utils.REASON_REPLACE_CHILD_STACK_WITH_NEW_ONE) } - logger.Info(actionMsg) + } - /****************************************************************************************** - ** Modify the stack after a little delay to avoid self-rekt. - ******************************************************************************************/ - time.Sleep(100 * time.Millisecond) - if err := client.ModifyStack(newStackIDs); err != nil { - logger.Errorf("Error modifying stack: %v", err) - } + /********************************************************************************************** + ** Determine action type for logging. + **********************************************************************************************/ + var actionMsg string + if len(originalStackIDs) == 0 { + actionMsg = "\t🆕 Creating new stack" + } else if replaceStacks && len(childrenWithStack) > 0 { + actionMsg = "\t🔄 Replacing existing stack (deleted child stacks)" + } else { + actionMsg = "\t✏️ Updating stack configuration" + } + logger.Info(actionMsg) + + /********************************************************************************************** + ** Optional throttle between stack writes. Default is no delay — empirically Immich has no + ** rate limit on POST /stacks and handles bursts fine. Users with very large libraries on + ** slow hosting can opt in to a 50ms gap via PREVENT_SELF_REKT=true if they observe upstream + ** errors. See issue #53. + **********************************************************************************************/ + if preventSelfRekt { + time.Sleep(50 * time.Millisecond) + } + if err := client.ModifyStack(newStackIDs); err != nil { + logger.Errorf("Error modifying stack: %v", err) } } @@ -346,7 +372,7 @@ func runCronLoopForAllUsers(apiKeys []string, apiURL string, logger *logrus.Logg if i > 0 { logger.Infof("\n") } - client := immich.NewClient(apiURL, key, resetStacks, replaceStacks, dryRun, withArchived, withDeleted, removeSingleAssetStacks, includeVideos, filterAlbumIDs, filterTakenAfter, filterTakenBefore, logger) + client := immich.NewClient(apiURL, key, resetStacks, replaceStacks, dryRun, withArchived, withDeleted, removeSingleAssetStacks, includeVideos, stackConcurrency, filterAlbumIDs, filterTakenAfter, filterTakenBefore, logger) if client == nil { logger.Errorf("Invalid client for API key: %s", key) continue diff --git a/cmd/stacker_test.go b/cmd/stacker_test.go index 154814e..c355149 100644 --- a/cmd/stacker_test.go +++ b/cmd/stacker_test.go @@ -39,6 +39,10 @@ func resetGlobalConfig() { removeSingleAssetStacks = false includeVideos = false includeVideosFlagSet = false + preventSelfRekt = false + preventSelfRektFlagSet = false + stackConcurrency = 0 + stackConcurrencyFlagSet = false } func clearEnvironment() { diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index 4e3d7a2..a5ce5ad 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -172,6 +172,61 @@ INCLUDE_VIDEOS=true This resolves [issue #54](https://github.com/Majorfi/immich-stack/issues/54). +### Slow Stacking on Large Libraries + +**Symptoms:** + +- A single stacking run takes 30 minutes or more on a library with thousands of stacks +- CPU usage during the run is near 0% (the tool is waiting, not computing) +- Reset operations (`RESET_STACKS=true`) take comparable time on big libraries + +**Cause:** + +Historically the tool inserted a 100 ms pause before every `POST /stacks` call to avoid +hammering Immich. For libraries with 10k+ stacks, that single pause dominated wall-clock +time (e.g., 21 000 stacks × 100 ms ≈ 35 minutes of pure sleep). See [issue #53](https://github.com/Majorfi/immich-stack/issues/53). + +**What changed:** + +- The preemptive sleep is no longer applied by default — empirically Immich has no rate + limit on `POST /stacks` and handles bursts of hundreds of requests per second cleanly. +- Stack writes can now run in parallel via `STACK_CONCURRENCY` (default `1`, i.e., sequential). +- Both the main stacking loop AND the `RESET_STACKS` / `REMOVE_SINGLE_ASSET_STACKS` cleanup + paths respect the same concurrency setting. + +**Configuration:** + +```sh +# Default — sequential writes, no artificial delay +STACK_CONCURRENCY=1 + +# Recommended for libraries above 10k stacks — 10× speedup typical +STACK_CONCURRENCY=10 + +# Aggressive — use only if your Immich host is sized for it +STACK_CONCURRENCY=20 + +# Safety throttle — opt in if you observe upstream errors on a slow host +# (inserts a 50 ms pause before each write; combinable with STACK_CONCURRENCY) +PREVENT_SELF_REKT=true +``` + +**Expect interleaved logs when `STACK_CONCURRENCY > 1`:** + +Each individual log line stays intact (the logger is goroutine-safe), but the per-stack +sequence ("1/N Key: …", "Parent …", "Child …", "Creating new stack") is no longer printed +together for a given stack — lines from different stacks interleave. This is the visible +trade-off for the speedup. Drop back to `STACK_CONCURRENCY=1` if you need sequential logs +for debugging a specific stack. + +**When to enable `PREVENT_SELF_REKT`:** + +- Self-hosted Immich on a single low-power machine (e.g., Raspberry Pi) where the database + starts queueing under sustained writes +- You see repeated `502 Bad Gateway` errors during stack writes +- You're combining `STACK_CONCURRENCY > 10` with a constrained host and want a per-write + cooldown + ### Grouping Issues **Symptoms:** @@ -299,6 +354,7 @@ If you experienced this issue, update to the latest version and verify: ``` 1. Files with numbers beyond your promote list are handled automatically: + - If you specify `0000,0001,0002,0003` but have files up to `0999`, they will be sorted correctly at position 999 1. Understanding `sequence:X` behavior: @@ -473,16 +529,19 @@ docker logs -f immich-stack ## Best Practices 1. **Testing** + - Always use dry run mode first - Test with small asset sets - Verify criteria before production 1. **Monitoring** + - Enable debug logging - Monitor resource usage - Check operation results 1. **Maintenance** + - Regular stack cleanup - API key rotation - Configuration review diff --git a/pkg/immich/client.go b/pkg/immich/client.go index 36c65b3..f35707a 100644 --- a/pkg/immich/client.go +++ b/pkg/immich/client.go @@ -9,6 +9,7 @@ import ( "net/http" "net/url" "strconv" + "sync" "time" "github.com/majorfi/immich-stack/pkg/utils" @@ -72,6 +73,7 @@ type Client struct { withDeleted bool removeSingleAssetStacks bool includeVideos bool + stackConcurrency int filterAlbumIDs []string filterTakenAfter string filterTakenBefore string @@ -97,7 +99,10 @@ type Client struct { ** @param logger - Logger instance for output ** @return *Client - Configured Immich client instance **************************************************************************************************/ -func NewClient(apiURL, apiKey string, resetStacks bool, replaceStacks bool, dryRun bool, withArchived bool, withDeleted bool, removeSingleAssetStacks bool, includeVideos bool, filterAlbumIDs []string, filterTakenAfter string, filterTakenBefore string, logger *logrus.Logger) *Client { +func NewClient(apiURL, apiKey string, resetStacks bool, replaceStacks bool, dryRun bool, withArchived bool, withDeleted bool, removeSingleAssetStacks bool, includeVideos bool, stackConcurrency int, filterAlbumIDs []string, filterTakenAfter string, filterTakenBefore string, logger *logrus.Logger) *Client { + if stackConcurrency < 1 { + stackConcurrency = 1 + } if apiKey == "" { return nil } @@ -137,6 +142,7 @@ func NewClient(apiURL, apiKey string, resetStacks bool, replaceStacks bool, dryR withDeleted: withDeleted, removeSingleAssetStacks: removeSingleAssetStacks, includeVideos: includeVideos, + stackConcurrency: stackConcurrency, filterAlbumIDs: filterAlbumIDs, filterTakenAfter: filterTakenAfter, filterTakenBefore: filterTakenBefore, @@ -254,18 +260,40 @@ func (c *Client) FetchAllStacks() (map[string]utils.TStack, error) { } } - // Handle single-asset stacks and reset if needed - for _, stack := range stacks { - if c.resetStacks { - c.logger.Debugf("🔄 Resetting stack %s", stack.PrimaryAssetID) - if err := c.DeleteStack(stack.ID, utils.REASON_RESET_STACK); err != nil { - c.logger.Errorf("Error deleting stack: %v", err) - } - } else if c.removeSingleAssetStacks && len(stack.Assets) <= 1 { - if err := c.DeleteStack(stack.ID, utils.REASON_DELETE_STACK_WITH_ONE_ASSET); err != nil { - c.logger.Errorf("Error deleting stack: %v", err) + // Handle single-asset stacks and reset if needed. + // Capture the flags locally — c.resetStacks is mutated AFTER this block (line below), + // so goroutines reading the field directly could race with that mutation if a future + // refactor moves the reset earlier. Capturing is defensive and makes intent explicit. + shouldReset := c.resetStacks + shouldRemoveSingle := c.removeSingleAssetStacks + { + concurrency := max(c.stackConcurrency, 1) + sem := make(chan struct{}, concurrency) + var wg sync.WaitGroup + for _, stack := range stacks { + var reason string + switch { + case shouldReset: + reason = utils.REASON_RESET_STACK + case shouldRemoveSingle && len(stack.Assets) <= 1: + reason = utils.REASON_DELETE_STACK_WITH_ONE_ASSET + default: + continue } + wg.Add(1) + sem <- struct{}{} + go func(stack utils.TStack, reason string) { + defer wg.Done() + defer func() { <-sem }() + if reason == utils.REASON_RESET_STACK { + c.logger.Debugf("🔄 Resetting stack %s", stack.PrimaryAssetID) + } + if err := c.DeleteStack(stack.ID, reason); err != nil { + c.logger.Errorf("Error deleting stack: %v", err) + } + }(stack, reason) } + wg.Wait() } if c.resetStacks { diff --git a/pkg/immich/client_concurrency_test.go b/pkg/immich/client_concurrency_test.go new file mode 100644 index 0000000..87e3dfd --- /dev/null +++ b/pkg/immich/client_concurrency_test.go @@ -0,0 +1,161 @@ +package immich + +import ( + "io" + "net/http" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +/************************************************************************************************** +** concurrencyProbe is an HTTP transport that counts in-flight DELETE /stacks/{id} calls so a +** test can assert that the bounded-concurrency semaphore in FetchAllStacks actually caps +** parallelism at the configured value. The same goroutine pattern is used by the main stacker +** loop in cmd/, so validating it here covers both code paths. +** +** Each DELETE increments `inFlight`, sleeps briefly to force overlap, then decrements. +** `peak` tracks the running max via atomic CAS to be race-free. +**************************************************************************************************/ +type concurrencyProbe struct { + inFlight atomic.Int64 + peak atomic.Int64 + delay time.Duration + stacksBody string + deleteCount atomic.Int64 +} + +func (p *concurrencyProbe) RoundTrip(req *http.Request) (*http.Response, error) { + switch { + case req.Method == http.MethodGet && strings.HasSuffix(req.URL.Path, "/stacks"): + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(p.stacksBody)), + }, nil + case req.Method == http.MethodDelete && strings.Contains(req.URL.Path, "/stacks/"): + p.deleteCount.Add(1) + now := p.inFlight.Add(1) + for { + cur := p.peak.Load() + if now <= cur || p.peak.CompareAndSwap(cur, now) { + break + } + } + time.Sleep(p.delay) + p.inFlight.Add(-1) + return &http.Response{ + StatusCode: http.StatusNoContent, + Body: io.NopCloser(strings.NewReader("")), + }, nil + } + return &http.Response{ + StatusCode: http.StatusNotFound, + Body: io.NopCloser(strings.NewReader(`{}`)), + }, nil +} + +func TestFetchAllStacksResetRespectsStackConcurrency(t *testing.T) { + // Build a fake /stacks response with 40 stacks so we have enough work to observe parallelism. + var sb strings.Builder + sb.WriteString("[") + for i := 0; i < 40; i++ { + if i > 0 { + sb.WriteString(",") + } + sb.WriteString(`{"id":"stack-`) + sb.WriteString(strconv.Itoa(i)) + sb.WriteString(`","primaryAssetId":"asset-`) + sb.WriteString(strconv.Itoa(i)) + sb.WriteString(`","assets":[{"id":"asset-`) + sb.WriteString(strconv.Itoa(i)) + sb.WriteString(`"}]}`) + } + sb.WriteString("]") + stacksBody := sb.String() + + for _, tt := range []struct { + name string + concurrency int + }{ + {name: "sequential (1)", concurrency: 1}, + {name: "moderate (5)", concurrency: 5}, + {name: "high (15)", concurrency: 15}, + } { + t.Run(tt.name, func(t *testing.T) { + logger := logrus.New() + logger.SetOutput(io.Discard) + + probe := &concurrencyProbe{ + delay: 20 * time.Millisecond, + stacksBody: stacksBody, + } + client := &Client{ + apiKey: "test", + apiURL: "http://test/api", + logger: logger, + resetStacks: true, + stackConcurrency: tt.concurrency, + client: &http.Client{Transport: probe}, + } + + _, err := client.FetchAllStacks() + require.NoError(t, err) + + require.EqualValues(t, 40, probe.deleteCount.Load(), + "every stack should be deleted exactly once") + + peak := probe.peak.Load() + assert.LessOrEqual(t, int(peak), tt.concurrency, + "peak in-flight DELETE /stacks/{id} (%d) exceeded configured limit (%d)", + peak, tt.concurrency) + + // Sanity: with concurrency > 1 we want real overlap, otherwise the test isn't + // proving anything beyond sequential behavior. + if tt.concurrency > 1 { + assert.Greater(t, int(peak), 1, + "with concurrency=%d the probe should observe ≥ 2 in-flight calls (got %d) — either the delay is too small or the parallelism is broken", + tt.concurrency, peak) + } + }) + } +} + +func TestFetchAllStacksResetGuardsZeroConcurrency(t *testing.T) { + // Regression test: a Client constructed directly (bypassing NewClient) with + // stackConcurrency = 0 must not deadlock on an unbuffered semaphore. The defensive + // `max(c.stackConcurrency, 1)` inside FetchAllStacks keeps it safe. + logger := logrus.New() + logger.SetOutput(io.Discard) + + probe := &concurrencyProbe{ + delay: 1 * time.Millisecond, + stacksBody: `[{"id":"s1","primaryAssetId":"a1","assets":[{"id":"a1"}]}]`, + } + client := &Client{ + apiKey: "test", + apiURL: "http://test/api", + logger: logger, + resetStacks: true, + stackConcurrency: 0, + client: &http.Client{Transport: probe}, + } + + done := make(chan struct{}) + go func() { + _, _ = client.FetchAllStacks() + close(done) + }() + select { + case <-done: + assert.EqualValues(t, 1, probe.deleteCount.Load()) + case <-time.After(5 * time.Second): + t.Fatal("FetchAllStacks deadlocked with stackConcurrency=0 — defensive guard missing or broken") + } +} + diff --git a/pkg/immich/client_test.go b/pkg/immich/client_test.go index 464fd31..17757ac 100644 --- a/pkg/immich/client_test.go +++ b/pkg/immich/client_test.go @@ -58,7 +58,7 @@ func TestNewClient(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Act - client := NewClient(tt.apiURL, tt.apiKey, tt.resetStacks, tt.replaceStacks, tt.dryRun, true, false, false, false, nil, "", "", logrus.New()) + client := NewClient(tt.apiURL, tt.apiKey, tt.resetStacks, tt.replaceStacks, tt.dryRun, true, false, false, false, 1, nil, "", "", logrus.New()) // Assert if tt.wantErr { @@ -892,7 +892,7 @@ func TestNewClientWithFilterParams(t *testing.T) { client := NewClient( "http://test.com", "test-key", - false, false, false, false, false, false, false, + false, false, false, false, false, false, false, 1, tt.filterAlbumIDs, tt.filterTakenAfter, tt.filterTakenBefore, @@ -1689,7 +1689,7 @@ func TestNewClientEdgeCases(t *testing.T) { client := NewClient( tt.apiURL, tt.apiKey, - false, false, false, false, false, false, false, + false, false, false, false, false, false, false, 1, nil, "", "", tt.logger, )