Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ var logFormat string
var removeSingleAssetStacks bool
var includeVideos bool
var includeVideosFlagSet bool
var preventSelfRekt bool
var preventSelfRektFlagSet bool
var stackConcurrency int
var stackConcurrencyFlagSet bool
var filterAlbumIDs []string
var filterTakenAfter string
var filterTakenBefore string
Expand Down Expand Up @@ -156,6 +160,8 @@ func logStartupSummary(logger *logrus.Logger) {
"withDeleted": withDeleted,
"removeSingleAssetStacks": removeSingleAssetStacks,
"includeVideos": includeVideos,
"preventSelfRekt": preventSelfRekt,
"stackConcurrency": stackConcurrency,
"criteria": criteria,
"parentFilenamePromote": parentFilenamePromote,
"parentExtPromote": parentExtPromote,
Expand Down Expand Up @@ -203,6 +209,12 @@ func logStartupSummary(logger *logrus.Logger) {
if includeVideos {
summary = append(summary, "include-videos=true")
}
if preventSelfRekt {
summary = append(summary, "prevent-self-rekt=true")
}
if stackConcurrency > 1 {
summary = append(summary, fmt.Sprintf("stack-concurrency=%d", stackConcurrency))
}
if criteria != "" {
summary = append(summary, fmt.Sprintf("criteria=%s", criteria))
}
Expand Down Expand Up @@ -300,6 +312,21 @@ func LoadEnvForTesting() LoadEnvConfig {
includeVideos = envInclude == "true"
}
}
if !preventSelfRektFlagSet {
if envPrevent := os.Getenv("PREVENT_SELF_REKT"); envPrevent != "" {
preventSelfRekt = envPrevent == "true"
}
}
if !stackConcurrencyFlagSet {
if envVal := os.Getenv("STACK_CONCURRENCY"); envVal != "" {
if n, err := strconv.Atoi(envVal); err == nil {
stackConcurrency = n
}
}
}
if stackConcurrency < 1 {
stackConcurrency = 1
}
if parentFilenamePromote == "" || parentFilenamePromote == utils.DefaultParentFilenamePromoteString {
if envVal := os.Getenv("PARENT_FILENAME_PROMOTE"); envVal != "" {
parentFilenamePromote = envVal
Expand Down
2 changes: 1 addition & 1 deletion cmd/duplicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func runDuplicates(cmd *cobra.Command, args []string) {
if i > 0 {
logger.Infof("\n")
}
client := immich.NewClient(apiURL, key, false, false, true, withArchived, withDeleted, false, includeVideos, nil, "", "", logger)
client := immich.NewClient(apiURL, key, false, false, true, withArchived, withDeleted, false, includeVideos, stackConcurrency, nil, "", "", logger)
if client == nil {
logger.Errorf("Invalid client for API key: %s", key)
continue
Expand Down
2 changes: 1 addition & 1 deletion cmd/fixtrash.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func runFixTrash(cmd *cobra.Command, args []string) {
if i > 0 {
logger.Infof("\n")
}
client := immich.NewClient(apiURL, key, false, false, dryRun, withArchived, withDeleted, false, includeVideos, nil, "", "", logger)
client := immich.NewClient(apiURL, key, false, false, dryRun, withArchived, withDeleted, false, includeVideos, stackConcurrency, nil, "", "", logger)
if client == nil {
logger.Errorf("Invalid client for API key: %s", key)
continue
Expand Down
8 changes: 8 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func bindFlags(rootCmd *cobra.Command) {
rootCmd.PersistentFlags().StringVar(&logFormat, "log-format", "", "Log format: text, json (or set LOG_FORMAT env var)")
rootCmd.PersistentFlags().BoolVar(&removeSingleAssetStacks, "remove-single-asset-stacks", false, "Remove stacks with only one asset (or set REMOVE_SINGLE_ASSET_STACKS=true)")
rootCmd.PersistentFlags().BoolVar(&includeVideos, "include-videos", false, "Include VIDEO assets alongside IMAGE in stacking (or set INCLUDE_VIDEOS=true)")
rootCmd.PersistentFlags().BoolVar(&preventSelfRekt, "prevent-self-rekt", false, "Insert a 50ms delay before each stack write to prevent overwhelming Immich on huge libraries (or set PREVENT_SELF_REKT=true)")
rootCmd.PersistentFlags().IntVar(&stackConcurrency, "stack-concurrency", 0, "Parallel stack writes (default 1 = sequential; values like 10-20 speed up large libraries; or set STACK_CONCURRENCY)")
rootCmd.PersistentFlags().StringSliceVar(&filterAlbumIDs, "filter-album-ids", nil, "Filter by album IDs or names, comma-separated (or set FILTER_ALBUM_IDS env var)")
rootCmd.PersistentFlags().StringVar(&filterTakenAfter, "filter-taken-after", "", "Filter assets taken after date, ISO 8601 (or set FILTER_TAKEN_AFTER env var)")
rootCmd.PersistentFlags().StringVar(&filterTakenBefore, "filter-taken-before", "", "Filter assets taken before date, ISO 8601 (or set FILTER_TAKEN_BEFORE env var)")
Expand Down Expand Up @@ -86,6 +88,12 @@ func CreateRootCommand() *cobra.Command {
if cmd.Flags().Lookup("include-videos") != nil && cmd.Flags().Lookup("include-videos").Changed {
includeVideosFlagSet = true
}
if cmd.Flags().Lookup("prevent-self-rekt") != nil && cmd.Flags().Lookup("prevent-self-rekt").Changed {
preventSelfRektFlagSet = true
}
if cmd.Flags().Lookup("stack-concurrency") != nil && cmd.Flags().Lookup("stack-concurrency").Changed {
stackConcurrencyFlagSet = true
}
},
}

Expand Down
218 changes: 122 additions & 96 deletions cmd/stacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package main

import (
"strings"
"sync"
"time"

"github.com/majorfi/immich-stack/pkg/immich"
Expand Down Expand Up @@ -174,7 +175,7 @@ func runStacker(cmd *cobra.Command, args []string) {
if i > 0 {
logger.Infof("\n")
}
client := immich.NewClient(apiURL, key, resetStacks, replaceStacks, dryRun, withArchived, withDeleted, removeSingleAssetStacks, includeVideos, filterAlbumIDs, filterTakenAfter, filterTakenBefore, logger)
client := immich.NewClient(apiURL, key, resetStacks, replaceStacks, dryRun, withArchived, withDeleted, removeSingleAssetStacks, includeVideos, stackConcurrency, filterAlbumIDs, filterTakenAfter, filterTakenBefore, logger)
if client == nil {
logger.Errorf("Invalid client for API key: %s", key)
continue
Expand Down Expand Up @@ -225,110 +226,135 @@ func runStackerOnce(client *immich.Client, logger *logrus.Logger, ownerID string
logger.Fatalf("Error stacking assets: %v", err)
}

/**********************************************************************************************
** Process each candidate stack. When stackConcurrency > 1, multiple stacks are written in
** parallel using a bounded semaphore — the sequence WITHIN each stack (delete children →
** modify) stays ordered, only different stacks run concurrently. Default concurrency = 1
** preserves the historical sequential behavior. See issue #53.
**********************************************************************************************/
concurrency := max(stackConcurrency, 1)
sem := make(chan struct{}, concurrency)
var wg sync.WaitGroup
for i, stack := range stacks {
_, _, newStackIDs := getParentAndChildrenIDs(stack)
_, _, originalStackIDs := getOriginalStackIDs(stack)
wg.Add(1)
sem <- struct{}{}
go func(i int, stack []utils.TAsset) {
defer wg.Done()
defer func() { <-sem }()
processStack(client, logger, i, len(stacks), stack)
}(i, stack)
}
wg.Wait()
}

/******************************************************************************************
** Adding debug logs
******************************************************************************************/
{
logger.Debugf("--------------------------------")
logger.Debugf("%d/%d Key: %s", i+1, len(stacks), stack[0].OriginalFileName)
logger.WithFields(logrus.Fields{
"Name": stack[0].OriginalFileName,
"ID": stack[0].ID,
"Time": stack[0].LocalDateTime,
}).Debugf("\tParent")
for _, child := range stack[1:] {
logger.WithFields(logrus.Fields{
"Name": child.OriginalFileName,
"ID": child.ID,
"Time": child.LocalDateTime,
}).Debugf("\tChild")
}
}
/**************************************************************************************************
** processStack runs the per-stack pipeline: compare against existing state, optionally delete
** child stacks (for replace mode), throttle if requested, and call ModifyStack. Designed to be
** safe to invoke from many goroutines in parallel — operates on its own stack slice and uses
** the client/logger which are themselves goroutine-safe.
**************************************************************************************************/
func processStack(client *immich.Client, logger *logrus.Logger, i int, total int, stack []utils.TAsset) {
_, _, newStackIDs := getParentAndChildrenIDs(stack)
_, _, originalStackIDs := getOriginalStackIDs(stack)

/******************************************************************************************
** Doing standard stacker checks.
******************************************************************************************/
if !isValidStack(newStackIDs) {
logger.Debugf("\t⚠️ Invalid stack: %s", stack[0].OriginalFileName)
continue
}
if !needsStackUpdate(originalStackIDs, newStackIDs) {
logger.Debugf("\tℹ️ No update needed for stack: %s", stack[0].OriginalFileName)
continue
}
childrenWithStack, hasChildrenWithStack := getChildrenWithStack(stack)
if hasChildrenWithStack && !replaceStacks {
logger.Debugf("\tℹ️ No replaceStacks, skipping stack: %s", stack[0].OriginalFileName)
continue
}
/**********************************************************************************************
** Adding debug logs
**********************************************************************************************/
logger.Debugf("--------------------------------")
logger.Debugf("%d/%d Key: %s", i+1, total, stack[0].OriginalFileName)
logger.WithFields(logrus.Fields{
"Name": stack[0].OriginalFileName,
"ID": stack[0].ID,
"Time": stack[0].LocalDateTime,
}).Debugf("\tParent")
for _, child := range stack[1:] {
logger.WithFields(logrus.Fields{
"Name": child.OriginalFileName,
"ID": child.ID,
"Time": child.LocalDateTime,
}).Debugf("\tChild")
}

/******************************************************************************************
** Adding info logs, but only if we are not in debug mode.
******************************************************************************************/
{
if !logger.IsLevelEnabled(logrus.DebugLevel) {
logger.Infof("--------------------------------")
logger.Infof("%d/%d Key: %s", i+1, len(stacks), stack[0].OriginalFileName)
}
if !logger.IsLevelEnabled(logrus.DebugLevel) {
logger.WithFields(logrus.Fields{
"Name": stack[0].OriginalFileName,
"ID": stack[0].ID,
"Time": stack[0].LocalDateTime,
}).Infof("\tParent")
for _, child := range stack[1:] {
logger.WithFields(logrus.Fields{
"Name": child.OriginalFileName,
"ID": child.ID,
"Time": child.LocalDateTime,
}).Infof("\tChild")
}
}
}
/**********************************************************************************************
** Doing standard stacker checks.
**********************************************************************************************/
if !isValidStack(newStackIDs) {
logger.Debugf("\t⚠️ Invalid stack: %s", stack[0].OriginalFileName)
return
}
if !needsStackUpdate(originalStackIDs, newStackIDs) {
logger.Debugf("\tℹ️ No update needed for stack: %s", stack[0].OriginalFileName)
return
}
childrenWithStack, hasChildrenWithStack := getChildrenWithStack(stack)
if hasChildrenWithStack && !replaceStacks {
logger.Debugf("\tℹ️ No replaceStacks, skipping stack: %s", stack[0].OriginalFileName)
return
}

/******************************************************************************************
** Add comparison debug logging.
******************************************************************************************/
if logger.IsLevelEnabled(logrus.DebugLevel) {
logger.Debugf("\tStack comparison:")
logger.Debugf("\t Original: %v", originalStackIDs)
logger.Debugf("\t Expected: %v", newStackIDs)
logger.Debugf("\t REPLACE_STACKS: %v", replaceStacks)
/**********************************************************************************************
** Adding info logs, but only if we are not in debug mode.
**********************************************************************************************/
if !logger.IsLevelEnabled(logrus.DebugLevel) {
logger.Infof("--------------------------------")
logger.Infof("%d/%d Key: %s", i+1, total, stack[0].OriginalFileName)
logger.WithFields(logrus.Fields{
"Name": stack[0].OriginalFileName,
"ID": stack[0].ID,
"Time": stack[0].LocalDateTime,
}).Infof("\tParent")
for _, child := range stack[1:] {
logger.WithFields(logrus.Fields{
"Name": child.OriginalFileName,
"ID": child.ID,
"Time": child.LocalDateTime,
}).Infof("\tChild")
}
}

/******************************************************************************************
** Delete children stacks if replaceStacks is true.
******************************************************************************************/
if replaceStacks {
for _, childID := range childrenWithStack {
client.DeleteStack(childID, utils.REASON_REPLACE_CHILD_STACK_WITH_NEW_ONE)
}
}
/**********************************************************************************************
** Add comparison debug logging.
**********************************************************************************************/
if logger.IsLevelEnabled(logrus.DebugLevel) {
logger.Debugf("\tStack comparison:")
logger.Debugf("\t Original: %v", originalStackIDs)
logger.Debugf("\t Expected: %v", newStackIDs)
logger.Debugf("\t REPLACE_STACKS: %v", replaceStacks)
}

/******************************************************************************************
** Determine action type for logging.
******************************************************************************************/
var actionMsg string
if len(originalStackIDs) == 0 {
actionMsg = "\t🆕 Creating new stack"
} else if replaceStacks && len(childrenWithStack) > 0 {
actionMsg = "\t🔄 Replacing existing stack (deleted child stacks)"
} else {
actionMsg = "\t✏️ Updating stack configuration"
/**********************************************************************************************
** Delete children stacks if replaceStacks is true.
**********************************************************************************************/
if replaceStacks {
for _, childID := range childrenWithStack {
client.DeleteStack(childID, utils.REASON_REPLACE_CHILD_STACK_WITH_NEW_ONE)
}
logger.Info(actionMsg)
}

/******************************************************************************************
** Modify the stack after a little delay to avoid self-rekt.
******************************************************************************************/
time.Sleep(100 * time.Millisecond)
if err := client.ModifyStack(newStackIDs); err != nil {
logger.Errorf("Error modifying stack: %v", err)
}
/**********************************************************************************************
** Determine action type for logging.
**********************************************************************************************/
var actionMsg string
if len(originalStackIDs) == 0 {
actionMsg = "\t🆕 Creating new stack"
} else if replaceStacks && len(childrenWithStack) > 0 {
actionMsg = "\t🔄 Replacing existing stack (deleted child stacks)"
} else {
actionMsg = "\t✏️ Updating stack configuration"
}
logger.Info(actionMsg)

/**********************************************************************************************
** Optional throttle between stack writes. Default is no delay — empirically Immich has no
** rate limit on POST /stacks and handles bursts fine. Users with very large libraries on
** slow hosting can opt in to a 50ms gap via PREVENT_SELF_REKT=true if they observe upstream
** errors. See issue #53.
**********************************************************************************************/
if preventSelfRekt {
time.Sleep(50 * time.Millisecond)
}
if err := client.ModifyStack(newStackIDs); err != nil {
logger.Errorf("Error modifying stack: %v", err)
}
}

Expand All @@ -346,7 +372,7 @@ func runCronLoopForAllUsers(apiKeys []string, apiURL string, logger *logrus.Logg
if i > 0 {
logger.Infof("\n")
}
client := immich.NewClient(apiURL, key, resetStacks, replaceStacks, dryRun, withArchived, withDeleted, removeSingleAssetStacks, includeVideos, filterAlbumIDs, filterTakenAfter, filterTakenBefore, logger)
client := immich.NewClient(apiURL, key, resetStacks, replaceStacks, dryRun, withArchived, withDeleted, removeSingleAssetStacks, includeVideos, stackConcurrency, filterAlbumIDs, filterTakenAfter, filterTakenBefore, logger)
if client == nil {
logger.Errorf("Invalid client for API key: %s", key)
continue
Expand Down
4 changes: 4 additions & 0 deletions cmd/stacker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func resetGlobalConfig() {
removeSingleAssetStacks = false
includeVideos = false
includeVideosFlagSet = false
preventSelfRekt = false
preventSelfRektFlagSet = false
stackConcurrency = 0
stackConcurrencyFlagSet = false
}

func clearEnvironment() {
Expand Down
Loading
Loading