diff --git a/secondary/indexer/rebalancer.go b/secondary/indexer/rebalancer.go index 63b13f578..23ed329b0 100644 --- a/secondary/indexer/rebalancer.go +++ b/secondary/indexer/rebalancer.go @@ -388,8 +388,10 @@ func (r *Rebalancer) initRebalAsync() { start := time.Now() if c.IsServerlessDeployment() { + r.transferTokens, hostToIndexToRemove, err = planner.ExecuteTenantAwareRebalance(cfg["clusterAddr"].String(), *r.topologyChange, r.nodeUUID) + } else { r.transferTokens, hostToIndexToRemove, err = planner.ExecuteRebalance(cfg["clusterAddr"].String(), *r.topologyChange, r.nodeUUID, onEjectOnly, disableReplicaRepair, threshold, timeout, cpuProfile, @@ -406,6 +408,18 @@ func (r *Rebalancer) initRebalAsync() { } if len(r.transferTokens) == 0 { r.transferTokens = nil + } else if c.IsServerlessDeployment() { + destination, err := getDestinationFromConfig(r.config.Load()) + if err != nil { + l.Errorf("Rebalancer::initRebalAsync err: %v", err) + go r.finishRebalance(err) + return + } + // Populate destination in transfer tokens + for _, token := range r.transferTokens { + token.Destination = destination + } + l.Infof("Rebalancer::initRebalAsync Populated destination: %v in all transfer tokens", destination) } elapsed := time.Since(start) diff --git a/secondary/indexer/shard_rebalancer.go b/secondary/indexer/shard_rebalancer.go index 46ad1d329..ac5c98e69 100644 --- a/secondary/indexer/shard_rebalancer.go +++ b/secondary/indexer/shard_rebalancer.go @@ -199,6 +199,19 @@ func (sr *ShardRebalancer) initRebalAsync() { if len(sr.transferTokens) == 0 { sr.transferTokens = nil + } else { + destination, err := getDestinationFromConfig(sr.config.Load()) + if err != nil { + l.Errorf("ShardRebalancer::initRebalAsync err: %v", err) + go sr.finishRebalance(err) + return + } + // Populate destination in transfer tokens + for _, token := range sr.transferTokens { + token.Destination = destination + } + l.Infof("ShardRebalancer::initRebalAsync Populated destination: %v in all transfer tokens", destination) + } break loop @@ -209,6 +222,18 @@ func (sr *ShardRebalancer) initRebalAsync() { go sr.doRebalance() } +func getDestinationFromConfig(cfg c.Config) (string, error) { + blobStorageScheme := cfg["settings.rebalance.blob_storage_scheme"].String() + blobStorageBucket := cfg["settings.rebalance.blob_storage_bucket"].String() + blobStoragePrefix := cfg["settings.rebalance.blob_storage_prefix"].String() + + destination := blobStorageScheme + blobStorageBucket + blobStoragePrefix + if len(destination) == 0 { + return "", errors.New("Empty destination for shard rebalancer") + } + return destination, nil +} + // processTokens is invoked by observeRebalance() method // processTokens invokes processShardTokens of ShardRebalancer func (sr *ShardRebalancer) processShardTokens(kve metakv.KVEntry) error { diff --git a/secondary/planner/executor.go b/secondary/planner/executor.go index 45766c3e4..1201b2856 100644 --- a/secondary/planner/executor.go +++ b/secondary/planner/executor.go @@ -3297,7 +3297,8 @@ func ExecuteTenantAwareRebalanceInternal(clusterUrl string, filterSolution(p.Result.Placement) - transferTokens, err := genShardTransferToken(p.Result, masterId, topologyChange, deleteNodes) + transferTokens, err := genShardTransferToken(p.Result, masterId, + topologyChange, deleteNodes) if err != nil { return nil, nil, err }