Skip to content

Commit

Permalink
MB-52992 Integrate ns_server config with rebalance logic
Browse files Browse the repository at this point in the history
ns_server would share the destination location for shard
rebalance over three config settings:

a. blob_storage_scheme
b. blob_storage_bucket
c. blob_storage_prefix

The final destination can be computed as a concatenation of
blob_storage_scheme + blob_storage_bucket + blob_storage_prefix

Change-Id: Iea0fb8c36c2b7e4d84a12faacdd4c516ffda30f2
  • Loading branch information
varunv-cb committed Oct 4, 2022
1 parent cb13c01 commit 9b5b337
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 1 deletion.
14 changes: 14 additions & 0 deletions secondary/indexer/rebalancer.go
Expand Up @@ -388,8 +388,10 @@ func (r *Rebalancer) initRebalAsync() {
start := time.Now()

if c.IsServerlessDeployment() {

r.transferTokens, hostToIndexToRemove, err = planner.ExecuteTenantAwareRebalance(cfg["clusterAddr"].String(),
*r.topologyChange, r.nodeUUID)

} else {
r.transferTokens, hostToIndexToRemove, err = planner.ExecuteRebalance(cfg["clusterAddr"].String(), *r.topologyChange,
r.nodeUUID, onEjectOnly, disableReplicaRepair, threshold, timeout, cpuProfile,
Expand All @@ -406,6 +408,18 @@ func (r *Rebalancer) initRebalAsync() {
}
if len(r.transferTokens) == 0 {
r.transferTokens = nil
} else if c.IsServerlessDeployment() {
destination, err := getDestinationFromConfig(r.config.Load())
if err != nil {
l.Errorf("Rebalancer::initRebalAsync err: %v", err)
go r.finishRebalance(err)
return
}
// Populate destination in transfer tokens
for _, token := range r.transferTokens {
token.Destination = destination
}
l.Infof("Rebalancer::initRebalAsync Populated destination: %v in all transfer tokens", destination)
}

elapsed := time.Since(start)
Expand Down
25 changes: 25 additions & 0 deletions secondary/indexer/shard_rebalancer.go
Expand Up @@ -199,6 +199,19 @@ func (sr *ShardRebalancer) initRebalAsync() {

if len(sr.transferTokens) == 0 {
sr.transferTokens = nil
} else {
destination, err := getDestinationFromConfig(sr.config.Load())
if err != nil {
l.Errorf("ShardRebalancer::initRebalAsync err: %v", err)
go sr.finishRebalance(err)
return
}
// Populate destination in transfer tokens
for _, token := range sr.transferTokens {
token.Destination = destination
}
l.Infof("ShardRebalancer::initRebalAsync Populated destination: %v in all transfer tokens", destination)

}

break loop
Expand All @@ -209,6 +222,18 @@ func (sr *ShardRebalancer) initRebalAsync() {
go sr.doRebalance()
}

func getDestinationFromConfig(cfg c.Config) (string, error) {
blobStorageScheme := cfg["settings.rebalance.blob_storage_scheme"].String()
blobStorageBucket := cfg["settings.rebalance.blob_storage_bucket"].String()
blobStoragePrefix := cfg["settings.rebalance.blob_storage_prefix"].String()

destination := blobStorageScheme + blobStorageBucket + blobStoragePrefix
if len(destination) == 0 {
return "", errors.New("Empty destination for shard rebalancer")
}
return destination, nil
}

// processTokens is invoked by observeRebalance() method
// processTokens invokes processShardTokens of ShardRebalancer
func (sr *ShardRebalancer) processShardTokens(kve metakv.KVEntry) error {
Expand Down
3 changes: 2 additions & 1 deletion secondary/planner/executor.go
Expand Up @@ -3297,7 +3297,8 @@ func ExecuteTenantAwareRebalanceInternal(clusterUrl string,

filterSolution(p.Result.Placement)

transferTokens, err := genShardTransferToken(p.Result, masterId, topologyChange, deleteNodes)
transferTokens, err := genShardTransferToken(p.Result, masterId,
topologyChange, deleteNodes)
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit 9b5b337

Please sign in to comment.