Skip to content

Commit

Permalink
MB-51825: Pass numVBuckets to Storage from indexer
Browse files Browse the repository at this point in the history
Change-Id: Ic105822c120447d00669cc2ee231274781571854
  • Loading branch information
ksaikrishnateja committed Sep 3, 2022
1 parent 9f5873f commit a36626b
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 26 deletions.
63 changes: 46 additions & 17 deletions secondary/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func NewIndexer(config common.Config) (Indexer, Message) {
idx.stats = NewIndexerStats()
idx.initFromConfig()

logging.Infof("Indexer::NewIndexer Starting with Vbuckets %v", idx.config["numVbuckets"].Int())
logging.Infof("Indexer::NewIndexer done initializing from config")

useCInfoLite := idx.config["use_cinfo_lite"].Bool()
idx.cinfoProvider, err = common.NewClusterInfoProvider(useCInfoLite, clusterAddr,
Expand Down Expand Up @@ -5154,22 +5154,43 @@ func (idx *indexer) initPartnInstance(indexInst common.IndexInst,

//add a single slice per partition for now
var slice Slice
idx.cinfoProviderLock.RLock()
ephemeral, err := idx.cinfoProvider.IsEphemeral(indexInst.Defn.Bucket)
idx.cinfoProviderLock.RUnlock()
var err error
var ephemeral bool
var numVBuckets int

func() {
idx.cinfoProviderLock.RLock()
defer idx.cinfoProviderLock.RUnlock()

ephemeral, err = idx.cinfoProvider.IsEphemeral(indexInst.Defn.Bucket)
if err != nil {
logging.Errorf("Indexer::initPartnInstance Failed to check bucket type ephemeral: %v\n", err)
return
}

numVBuckets, err = idx.cinfoProvider.GetNumVBuckets(indexInst.Defn.Bucket)
if err != nil {
logging.Errorf("Indexer::initPartnInstance Failed to get number of vBuckets: %v\n", err)
return
}
}()
if err != nil {
logging.Errorf("Indexer::initPartnInstance Failed to check bucket type ephemeral: %v\n", err)
} else {
slice, err = NewSlice(SliceId(0), &indexInst, &partnInst, idx.config, idx.stats, ephemeral, !bootstrapPhase, idx.meteringMgr)
}
errStr := fmt.Sprintf("Error getting cluster info for creating slice %v", err)
err1 := errors.New(errStr)

if err == nil {
partnInst.Sc.AddSlice(0, slice)
logging.Infof("Indexer::initPartnInstance Initialized Slice: \n\t Index: %v Slice: %v",
indexInst.InstId, slice)
if respCh != nil {
respCh <- &MsgError{
err: Error{code: ERROR_INDEXER_INTERNAL_ERROR,
severity: FATAL,
cause: err1,
category: INDEXER}}
}
return nil, nil, err1
}

partnInstMap[partnDefn.GetPartitionId()] = partnInst
} else {
slice, err = NewSlice(SliceId(0), &indexInst, &partnInst, idx.config, idx.stats, ephemeral, !bootstrapPhase,
idx.meteringMgr, numVBuckets)
if err != nil {
if bootstrapPhase && err == errStorageCorrupted {
errStr := fmt.Sprintf("storage corruption for indexInst %v partnDefn %v", indexInst, partnDefn)
logging.Errorf("Indexer:: initPartnInstance %v", errStr)
Expand All @@ -5190,6 +5211,12 @@ func (idx *indexer) initPartnInstance(indexInst common.IndexInst,
}
return nil, nil, err1
}

partnInst.Sc.AddSlice(0, slice)
logging.Infof("Indexer::initPartnInstance Initialized Slice: \n\t Index: %v Slice: %v",
indexInst.InstId, slice)

partnInstMap[partnDefn.GetPartitionId()] = partnInst
}

return partnInstMap, failedPartnInstances, nil
Expand Down Expand Up @@ -6740,6 +6767,7 @@ func (idx *indexer) startKeyspaceIdStream(streamId common.StreamId, keyspaceId s
if streamId == common.INIT_STREAM {
// Asyncronously compute the KV timestamp
go idx.computeKeyspaceBuildTsAsync(clustAddr, keyspaceId, cid, numVb, streamId, mutex)

}

idx.internalRecvCh <- &MsgRecovery{mType: INDEXER_RECOVERY_DONE,
Expand Down Expand Up @@ -8952,7 +8980,8 @@ func (idx *indexer) memoryUsedStorage() int64 {
}

func NewSlice(id SliceId, indInst *common.IndexInst, partnInst *PartitionInst,
conf common.Config, stats *IndexerStats, ephemeral, isNew bool, meteringMgr *MeteringThrottlingMgr) (slice Slice, err error) {
conf common.Config, stats *IndexerStats, ephemeral, isNew bool,
meteringMgr *MeteringThrottlingMgr, numVBuckets int) (slice Slice, err error) {

isInitialBuild := func() bool {
return indInst.State == common.INDEX_STATE_INITIAL || indInst.State == common.INDEX_STATE_CATCHUP ||
Expand All @@ -8976,13 +9005,13 @@ func NewSlice(id SliceId, indInst *common.IndexInst, partnInst *PartitionInst,
switch indInst.Defn.Using {
case common.MemDB, common.MemoryOptimized:
slice, err = NewMemDBSlice(path, id, indInst.Defn, instId, partitionId, indInst.Defn.IsPrimary, !ephemeral, numPartitions, conf,
stats.GetPartitionStats(indInst.InstId, partitionId))
stats.GetPartitionStats(indInst.InstId, partitionId), numVBuckets)
case common.ForestDB:
slice, err = NewForestDBSlice(path, id, indInst.Defn, instId, partitionId, indInst.Defn.IsPrimary, numPartitions, conf,
stats.GetPartitionStats(indInst.InstId, partitionId))
case common.PlasmaDB:
slice, err = NewPlasmaSlice(storage_dir, log_dir, path, id, indInst.Defn, instId, partitionId, indInst.Defn.IsPrimary, numPartitions, conf,
stats.GetPartitionStats(indInst.InstId, partitionId), stats, isNew, isInitialBuild(), meteringMgr)
stats.GetPartitionStats(indInst.InstId, partitionId), stats, isNew, isInitialBuild(), meteringMgr, numVBuckets)
}

return
Expand Down
8 changes: 4 additions & 4 deletions secondary/indexer/memdb_slice_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ type memdbSlice struct {

// NewMemDBSlice is the constructor for memdbSlice.
func NewMemDBSlice(path string, sliceId SliceId, idxDefn common.IndexDefn,
idxInstId common.IndexInstId, partitionId common.PartitionId,
isPrimary bool, hasPersistance bool, numPartitions int,
sysconf common.Config, idxStats *IndexStats) (*memdbSlice, error) {
idxInstId common.IndexInstId, partitionId common.PartitionId, isPrimary bool,
hasPersistance bool, numPartitions int, sysconf common.Config, idxStats *IndexStats,
numVBuckets int) (*memdbSlice, error) {

info, err := iowrap.Os_Stat(path)
if err != nil || err == nil && info.IsDir() {
Expand Down Expand Up @@ -219,7 +219,7 @@ func NewMemDBSlice(path string, sliceId SliceId, idxDefn common.IndexDefn,
mdb.numWriters = sysconf["numSliceWriters"].Int()
mdb.maxRollbacks = sysconf["settings.moi.recovery.max_rollbacks"].Int()
mdb.maxDiskSnaps = sysconf["recovery.max_disksnaps"].Int()
mdb.numVbuckets = sysconf["numVbuckets"].Int()
mdb.numVbuckets = numVBuckets
mdb.clusterAddr = sysconf["clusterAddr"].String()
mdb.exposeItemCopy = sysconf["moi.exposeItemCopy"].Bool()

Expand Down
2 changes: 1 addition & 1 deletion secondary/indexer/plasma_community.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var errStorageCorrupted = fmt.Errorf("Storage corrupted and unrecoverable")
func NewPlasmaSlice(storage_dir string, log_dir string, path string, sliceId SliceId, idxDefn common.IndexDefn,
idxInstId common.IndexInstId, partitionId common.PartitionId, isPrimary bool, numPartitions int,
sysconf common.Config, idxStats *IndexStats, indexerStats *IndexerStats, isNew bool, isInitialBuild bool,
meteringMgr *MeteringThrottlingMgr) (Slice, error) {
meteringMgr *MeteringThrottlingMgr, numVBuckets int) (Slice, error) {
panic("Plasma is only supported in Enterprise Edition")
}

Expand Down
5 changes: 3 additions & 2 deletions secondary/indexer/plasma_enterprise.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ func NewPlasmaSlice(storage_dir string, log_dir string, path string, sliceId Sli
idxInstId common.IndexInstId, partitionId common.PartitionId,
isPrimary bool, numPartitions int,
sysconf common.Config, idxStats *IndexStats, indexerStats *IndexerStats, isNew bool, isInitialBuild bool,
meteringMgr *MeteringThrottlingMgr) (*plasmaSlice, error) {
meteringMgr *MeteringThrottlingMgr, numVBuckets int) (*plasmaSlice, error) {

return newPlasmaSlice(storage_dir, log_dir, path, sliceId,
idxDefn, idxInstId, partitionId, isPrimary, numPartitions,
sysconf, idxStats, indexerStats, isNew, isInitialBuild, meteringMgr)
sysconf, idxStats, indexerStats, isNew, isInitialBuild, meteringMgr, numVBuckets)
}

func DestroyPlasmaSlice(storageDir string, path string) error {
Expand Down
4 changes: 2 additions & 2 deletions secondary/indexer/plasma_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func newPlasmaSlice(storage_dir string, log_dir string, path string, sliceId Sli
idxInstId common.IndexInstId, partitionId common.PartitionId,
isPrimary bool, numPartitions int,
sysconf common.Config, idxStats *IndexStats, indexerStats *IndexerStats, isNew bool, isInitialBuild bool,
meteringMgr *MeteringThrottlingMgr) (*plasmaSlice, error) {
meteringMgr *MeteringThrottlingMgr, numVBuckets int) (*plasmaSlice, error) {

slice := &plasmaSlice{}

Expand Down Expand Up @@ -207,7 +207,7 @@ func newPlasmaSlice(storage_dir string, log_dir string, path string, sliceId Sli
slice.maxNumWriters = sysconf["numSliceWriters"].Int()
slice.hasPersistence = !sysconf["plasma.disablePersistence"].Bool()
slice.clusterAddr = sysconf["clusterAddr"].String()
slice.numVbuckets = sysconf["numVbuckets"].Int()
slice.numVbuckets = numVBuckets

slice.maxRollbacks = sysconf["settings.plasma.recovery.max_rollbacks"].Int()
slice.maxDiskSnaps = sysconf["recovery.max_disksnaps"].Int()
Expand Down

0 comments on commit a36626b

Please sign in to comment.