Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-09.07.2020-05.30.pass.html
Change-Id: Iabb9da08fc941350ca29d2084eec8ac7f78146bb
  • Loading branch information
jeelanp2003 committed Jul 9, 2020
2 parents 21a566f + 6be5f42 commit 4b8e777
Show file tree
Hide file tree
Showing 19 changed files with 631 additions and 142 deletions.
10 changes: 10 additions & 0 deletions secondary/common/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,16 @@ func (c *ClusterInfoCache) GetNodeUUID(nid NodeId) string {
return c.nodes[nid].NodeUUID
}

func (c *ClusterInfoCache) GetNodeIdByUUID(uuid string) (NodeId, bool) {
for nid, node := range c.nodes {
if node.NodeUUID == uuid {
return NodeId(nid), true
}
}

return NodeId(-1), false
}

func (c *ClusterInfoCache) GetNodesByServiceType(srvc string) (nids []NodeId) {
for i, svs := range c.nodesvs {
if _, ok := svs.Services[srvc]; ok {
Expand Down
7 changes: 7 additions & 0 deletions secondary/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,13 @@ var SystemConfig = Config{
false, // mutable
false, // case-insensitive
},
"projector.evalStatLoggingThreshold": ConfigValue{
200, // 200 microseconds
"Threshold after which index evaluator stats will be logged in projector logs (In microseconds)",
200, // 200 microseconds
false, // mutable
false, // case-insensitive
},
"projector.gogc": ConfigValue{
100, // 100 percent
"set GOGC percent",
Expand Down
8 changes: 7 additions & 1 deletion secondary/common/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ const (
INDEX_STATE_ERROR
// Nil State (used for no-op / invalid) -- not a persistent state
INDEX_STATE_NIL
// Scheduled state: used for the indexes scheduled for creation.
// Not a persistent state.
INDEX_STATE_SCHEDULED
)

func (s IndexState) String() string {
Expand All @@ -119,6 +122,8 @@ func (s IndexState) String() string {
return "INDEX_STATE_DELETED"
case INDEX_STATE_ERROR:
return "INDEX_STATE_ERROR"
case INDEX_STATE_SCHEDULED:
return "INDEX_STATE_SCHEDULED"
default:
return "INDEX_STATE_UNKNOWN"
}
Expand Down Expand Up @@ -495,7 +500,8 @@ func (idx IndexInstMap) String() string {
for i, index := range idx {
str += fmt.Sprintf("\tInstanceId: %v ", i)
str += fmt.Sprintf("Name: %v ", index.Defn.Name)
str += fmt.Sprintf("Bucket: %v ", index.Defn.Bucket)
str += fmt.Sprintf("Keyspace: %v/%v/%v ", index.Defn.Bucket,
index.Defn.Scope, index.Defn.Collection)
str += fmt.Sprintf("State: %v ", index.State)
str += fmt.Sprintf("Stream: %v ", index.Stream)
str += fmt.Sprintf("RState: %v ", index.RState)
Expand Down
221 changes: 169 additions & 52 deletions secondary/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5747,10 +5747,11 @@ func (idx *indexer) bootstrap1(snapshotNotifych chan IndexSnapshot) error {
}

//if any index in MAINT_STREAM has nil snapshot, it needs
//to be reset. The index was able to clear its snapshot
//on rollback but couldn't reset the metadata before crash.
//index disk snapshot is created for every index before
//moving to active state.
//to be reset. Either:
//1. The index was able to clear its snapshot on rollback
//but couldn't reset the metadata before crash.
//2. The index never created a disk snapshot as the disk
//snapshot happens only at 10mins interval.
func (idx *indexer) findAndResetEmptySnapshotIndex() {

for instId, index := range idx.indexInstMap {
Expand Down Expand Up @@ -5814,9 +5815,9 @@ func (idx *indexer) createRealInstIdMap() common.IndexInstMap {

func (idx *indexer) cleanupOrphanIndexes() {
storageDir := idx.config["storage_dir"].String()
pattern := GetIndexPathPattern()

flist, err := filepath.Glob(filepath.Join(storageDir, pattern))
mode := common.GetStorageMode()
flist, err := ListSlices(mode, storageDir)
if err != nil {
logging.Warnf("Error %v during cleaning up the orphan indexes.", err)
return
Expand Down Expand Up @@ -5864,7 +5865,7 @@ func (idx *indexer) cleanupOrphanIndexes() {

go func() {
for _, f := range orphanIndexList {
if err := os.RemoveAll(f); err != nil {
if err := DestroySlice(mode, f); err != nil {
logging.Warnf("Error %v while removing orphan index data for %v.", err, f)
} else {
logging.Infof("Cleaned up the orphan index slice %v.", f)
Expand Down Expand Up @@ -6344,53 +6345,12 @@ func (idx *indexer) backupCorruptIndexDataFiles(indexInst *common.IndexInst,
return
}

indexPath := IndexPath(indexInst, partnId, sliceId)

// delete old backups if any
deleteOldBackups := func() error {
files, err := ioutil.ReadDir(corruptDataDir)
if err != nil {
logging.Errorf("Indexer::backupCorruptIndexDataFiles %v %v error %v while taking backup:ReadDir %v",
indexInst.InstId, partnId, err, corruptDataDir)
return err
}

for _, f := range files {
if strings.HasSuffix(f.Name(), indexPath) {
fpath := filepath.Join(corruptDataDir, f.Name())
logging.Infof("Indexer::backupCorruptIndexDataFiles %v %v deleting path %v",
indexInst.InstId, partnId, fpath)
if err = os.RemoveAll(fpath); err != nil {
logging.Errorf("Indexer::backupCorruptIndexDataFiles %v %v error %v while removing old backup",
indexInst.InstId, partnId, err, corruptDataDir)
return err
}
}
}
return nil
}

if err := deleteOldBackups(); err != nil {
needsDataCleanup = true
return
}

srcPath := filepath.Join(storageDir, indexPath)
t := time.Now()
strTime := fmt.Sprintf("%d-%02d-%02dT%02d-%02d-%02d-%03d", t.Year(), t.Month(),
t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond()/1000/1000)
destIndexPath := strTime + "_" + indexPath
destPath := filepath.Join(corruptDataDir, destIndexPath)

if err := os.Rename(srcPath, destPath); err != nil {
logging.Errorf("Indexer::backupCorruptIndexDataFiles %v %v error %v while taking backup:Rename(%v, %v)",
indexInst.InstId, partnId, err, srcPath, destPath)
err := MoveSlice(common.IndexTypeToStorageMode(indexInst.Defn.Using), indexInst, partnId, sliceId, storageDir, corruptDataDir)
if err != nil {
needsDataCleanup = true
return
}

logging.Infof("Indexer::backupCorruptIndexDataFiles %v %v backup is stroed at %v",
indexInst.InstId, partnId, destPath)
needsDataCleanup = false
return
}
Expand Down Expand Up @@ -6636,7 +6596,7 @@ func (idx *indexer) upgradeSingleIndex(inst *common.IndexInst, storageMode commo
partnDefnList := inst.Pc.GetAllPartitions()
for _, partnDefn := range partnDefnList {
path := filepath.Join(storage_dir, IndexPath(inst, partnDefn.GetPartitionId(), SliceId(0)))
if err := os.RemoveAll(path); err != nil {
if err := DestroySlice(common.IndexTypeToStorageMode(inst.Defn.Using), path); err != nil {
common.CrashOnError(err)
}
}
Expand Down Expand Up @@ -6774,7 +6734,7 @@ func (idx *indexer) forceCleanupPartitionData(inst *common.IndexInst, partitionI

storage_dir := idx.config["storage_dir"].String()
path := filepath.Join(storage_dir, IndexPath(inst, partitionId, sliceId))
return os.RemoveAll(path)
return DestroySlice(common.IndexTypeToStorageMode(inst.Defn.Using), path)
}

//On warmup, if an index is found in MAINT_STREAM and state INITIAL
Expand Down Expand Up @@ -7476,6 +7436,163 @@ func NewSlice(id SliceId, indInst *common.IndexInst, partnInst *PartitionInst,
return
}

func DestroySlice(mode common.StorageMode, path string) error {

switch mode {
case common.MOI, common.FORESTDB, common.NOT_SET:
return os.RemoveAll(path)
case common.PLASMA:
return DestroyPlasmaSlice(path)
}

return fmt.Errorf("unable to delete instance %v : unrecognized storage type %v", path, mode)
}

func ListSlices(mode common.StorageMode, storageDir string) ([]string, error) {

listFiles := func() ([]string, error) {
pattern := GetIndexPathPattern()
return filepath.Glob(filepath.Join(storageDir, pattern))
}

switch mode {
case common.MOI, common.FORESTDB, common.NOT_SET:
return listFiles()
case common.PLASMA:
return ListPlasmaSlices()
}
return nil, fmt.Errorf("unable to list instance : unrecognized storage type %v", mode)
}

func MoveSlice(mode common.StorageMode, indexInst *common.IndexInst, partnId common.PartitionId, sliceId SliceId, sourceDir string, targetDir string) error {

// Given any path, rename() will add a timestamp to the first sub-directory
// after sourceDir. The renamed sub-directory will be added to the targetDir
// to form a full path (as the backup dir). This function expects that the
// first sub-directory is distinctive enough to differentiate among different
// index files. This function call needs to be stable since it can be called
// multiple times for the same path during each bootstrap. So it uses
// indexer uptime as timestamp.
rename := func(path string) (string, error) {

if !strings.HasPrefix(path, sourceDir) {
return "", fmt.Errorf("path (%v) does not contain source directory (%v)", path, sourceDir)
}

sourceDirLen := len(sourceDir)
if path[sourceDirLen] == filepath.Separator {
sourceDirLen++
}

if len(path) <= sourceDirLen {
return "", fmt.Errorf("path (%v) does not contain source directory (%v)", path, sourceDir)
}

indexPath := path[sourceDirLen:]

strTime := fmt.Sprintf("%d-%02d-%02dT%02d-%02d-%02d-%03d", uptime.Year(), uptime.Month(),
uptime.Day(), uptime.Hour(), uptime.Minute(), uptime.Second(), uptime.Nanosecond()/1000/1000)

destIndexPath := strTime + "_" + indexPath
return filepath.Join(targetDir, destIndexPath), nil
}

clean := func(srcPath string) {
deleteOldBackups(targetDir, sourceDir, srcPath)
}

switch mode {
case common.MOI, common.FORESTDB, common.NOT_SET:
return moveIndexFile(indexInst, partnId, sliceId, sourceDir, targetDir)
case common.PLASMA:
indexPath := IndexPath(indexInst, partnId, sliceId)
srcPath := filepath.Join(sourceDir, indexPath)
return BackupCorruptedPlasmaSlice(srcPath, rename, clean)
}
return fmt.Errorf("unable to move instance : unrecognized storage type %v", mode)
}

func moveIndexFile(indexInst *common.IndexInst, partnId common.PartitionId, sliceId SliceId, sourceDir string, targetDir string) error {
indexPath := IndexPath(indexInst, partnId, sliceId)
srcPath := filepath.Join(sourceDir, indexPath)

if err := deleteOldBackups(targetDir, sourceDir, srcPath); err != nil {
logging.Errorf("Indexer::moveIndexFile %v %v error %v while taking backup %v",
indexInst.InstId, partnId, err, targetDir)
return err
}

t := time.Now()
strTime := fmt.Sprintf("%d-%02d-%02dT%02d-%02d-%02d-%03d", t.Year(), t.Month(),
t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond()/1000/1000)
destIndexPath := strTime + "_" + indexPath
destPath := filepath.Join(targetDir, destIndexPath)

if err := os.Rename(srcPath, destPath); err != nil {
logging.Errorf("Indexer::moveIndexFile %v %v error %v while taking backup:Rename(%v, %v)",
indexInst.InstId, partnId, err, srcPath, destPath)
return err
}

logging.Infof("Indexer::moveIndexFile %v %v is moved to %v",
indexInst.InstId, partnId, destPath)

return nil
}

// This function deletes old backups if any. This function works in a pair with rename function in MoveSlice().
// 1) Given any path, it finds the first sub-directory after the sourceDir.
// 2) It will then iterate through the sub-directories under targetDir.
// 3) If the target sub-directory name matches the sub-directory in (1), then it creates a new path using the target sub-directory name.
// 4) The new path in (4) is removed.
func deleteOldBackups(targetDir string, sourceDir string, srcPath string) error {

strip := func(prefix string, paths []string) []string {
dirs := strings.Split(prefix, string(filepath.Separator))
return paths[len(dirs):]
}

join := func(prefix string, paths []string, startPos int) string {
if len(paths) <= startPos {
return filepath.Join(targetDir, prefix)
}

paths = paths[startPos:]
result := filepath.Join(targetDir, prefix)
for _, path := range paths {
result = filepath.Join(result, path)
}
return result
}

if !strings.HasPrefix(srcPath, sourceDir) {
return fmt.Errorf("path (%v) does not contain source directory (%v)", srcPath, sourceDir)
}

indexPaths := strings.Split(srcPath, string(filepath.Separator))
indexPaths = strip(sourceDir, indexPaths)

files, err := ioutil.ReadDir(targetDir)
if err != nil {
logging.Errorf("Indexer::deleteOldBackups encounter error %v while taking backup:ReadDir %v",
err, targetDir)
return err
}

for _, f := range files {
if strings.HasSuffix(f.Name(), indexPaths[0]) {
fpath := join(f.Name(), indexPaths, 1)
logging.Infof("Indexer::deleteOldBackups deleting path %v", fpath)
if err = os.RemoveAll(fpath); err != nil {
logging.Errorf("Indexer::deleteOldBackups error %v while removing old backup %v",
err, fpath)
return err
}
}
}
return nil
}

func (idx *indexer) setProfilerOptions(config common.Config) {
// CPU-profiling
cpuProfile, ok := config["settings.cpuProfile"]
Expand Down
15 changes: 15 additions & 0 deletions secondary/indexer/plasma_community.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,18 @@ func NewPlasmaSlice(storage_dir string, path string, sliceId SliceId, idxDefn co
func deleteFreeWriters(instId common.IndexInstId) {
// do nothing
}

func DestroyPlasmaSlice(path string) error {
// do nothing
return nil
}

func ListPlasmaSlices() ([]string, error) {
// do nothing
return nil, nil
}

func BackupCorruptedPlasmaSlice(string, func(string) (string, error), func(string)) error {
// do nothing
return nil
}
12 changes: 12 additions & 0 deletions secondary/indexer/plasma_enterprise.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,15 @@ func NewPlasmaSlice(storage_dir string, path string, sliceId SliceId, idxDefn co
idxDefn, idxInstId, partitionId, isPrimary, numPartitions,
sysconf, idxStats, indexerStats)
}

func DestroyPlasmaSlice(path string) error {
return destroyPlasmaSlice(path)
}

func ListPlasmaSlices() ([]string, error) {
return listPlasmaSlices()
}

func BackupCorruptedPlasmaSlice(prefix string, rename func(string) (string, error), clean func(string)) error {
return backupCorruptedPlasmaSlice(prefix, rename, clean)
}
Loading

0 comments on commit 4b8e777

Please sign in to comment.