Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
MB-54943 create and upload pause metadata
background: instead of recreating paused info from obj store
we can save some computation and upload pause metadata to obj store

expected: struct to capture pause metadata.
this metadata includes information about indexer version and
nodes that performed pause activity with information about
the shards it copied

stubs/future-work: collection of information from metakv tokens

Change-Id: Iaddb43ef4ce28787e137f107eea994aea53581bb
  • Loading branch information
NightWing1998 committed Jan 10, 2023
1 parent 64158e8 commit 735b591
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 13 deletions.
20 changes: 14 additions & 6 deletions secondary/indexer/pause_pauser.go
Expand Up @@ -723,21 +723,30 @@ func (this *Pauser) run(master bool) {
var err error
reader := bytes.NewReader(nil)

dbg := true // TODO: use system config here

/////////////////////////////////////////////
// Work done by master only
/////////////////////////////////////////////

if master {
this.task.pauseMetadata.setVersionNoLock(common.GetLocalInternalVersion().String())
// Write the version.json file to the archive
byteSlice = []byte(fmt.Sprintf("{\"version\":%v}\n", ARCHIVE_VERSION))
reader.Reset(byteSlice)
err = Upload(this.task.archivePath, FILENAME_VERSION, reader)
// byteSlice = []byte(fmt.Sprintf("{\"version\":%v}\n", ARCHIVE_VERSION))
// reader.Reset(byteSlice)
data, err := json.Marshal(this.task.pauseMetadata)
if err != nil {
this.failPause("Pauser::run:", "Marshal PauseMetadata", err)
return
}
reader.Reset(common.ChecksumAndCompress(data,!dbg))
err = Upload(this.task.archivePath, FILENAME_PAUSE_METADATA, reader)
if err != nil {
this.failPause("Pauser::run:", "Upload "+FILENAME_VERSION, err)
this.failPause("Pauser::run:", "Upload "+FILENAME_PAUSE_METADATA, err)
return
}

logging.Tracef("Pauser::run: indexer version successfully uploaded to %v%v for taskId %v", this.task.archivePath, FILENAME_VERSION, this.task.taskId)
logging.Tracef("Pauser::run: indexer version successfully uploaded to %v%v for taskId %v", this.task.archivePath, FILENAME_PAUSE_METADATA, this.task.taskId)

// Notify the followers to start working on this task
this.pauseMgr.RestNotifyPause(this.otherIndexAddrs, this.task)
Expand All @@ -749,7 +758,6 @@ func (this *Pauser) run(master bool) {

// nodePath is the path to the node-specific archive subdirectory for the current node
nodePath := this.task.archivePath + this.nodeDir
dbg := true // TODO: use system config here

// Get the index metadata from all nodes and write it as a single file to the archive
byteSlice, indexMetadata, err := this.restGetLocalIndexMetadataBinary(!dbg)
Expand Down
51 changes: 49 additions & 2 deletions secondary/indexer/pause_service_manager.go
Expand Up @@ -135,8 +135,8 @@ const FILENAME_METADATA = "indexMetadata.json"
// FILENAME_STATS is the name of the file to write containing persisted index stats from ONE node.
const FILENAME_STATS = "indexStats.json"

// FILENAME_VERSION is the name of the file to write containing the ARCHIVE_VERSION.
const FILENAME_VERSION = "version.json"
// FILENAME_PAUSE_METADATA is the name of the file to write containing the PauseMetadata.
const FILENAME_PAUSE_METADATA = "pauseMetadata.json"

////////////////////////////////////////////////////////////////////////////////////////////////////
// Enums
Expand Down Expand Up @@ -214,6 +214,48 @@ func (this bucketStateEnum) String() string {
// Type definitions
////////////////////////////////////////////////////////////////////////////////////////////////////

// this information should be considered read only during resume
// created by pause leader node
type PauseMetadata struct {
// nodeId gives node info
// map[nodeId]->shardIds gives information about shardIds copied from node
// map[shardId] -> string are obj store paths where each shard is saved
Data map[service.NodeID][]common.ShardId `json:"metadata"`

// cluster Version during data creation
Version string `json:"version"`

lock *sync.RWMutex
}

func NewPauseMetadata() *PauseMetadata {
return &PauseMetadata{
// size value should be max nodes in a subcluster
Data: make(map[service.NodeID][]common.ShardId, 2),
lock: &sync.RWMutex{},
}
}

func (pm *PauseMetadata) setVersionNoLock(ver string) {
pm.Version = ver
}

func (pm *PauseMetadata) addShardNoLock(nodeId service.NodeID, shardId common.ShardId) {
nodeMap, ok := pm.Data[nodeId]
if !ok {
// size value should be no of shards per bucket
pm.Data[nodeId] = make([]common.ShardId, 0, 2)
nodeMap = pm.Data[nodeId]
}
pm.Data[nodeId] = append(nodeMap, shardId)
}

func (pm *PauseMetadata) addShard(nodeId service.NodeID, shardId common.ShardId) {
pm.lock.Lock()
defer pm.lock.Unlock()
pm.addShardNoLock(nodeId, shardId)
}

// taskObj represents one task of any type in the taskType enum below. This is the GSI internal
// representation, not the GetTaskList return format.
//
Expand Down Expand Up @@ -258,6 +300,10 @@ type taskObj struct {
// ctx holds the context object that can be used for task context
ctx context.Context
cancelFunc context.CancelFunc

// PauseMetadata stores the metadata about the pause
// this is only created and saved by the master
pauseMetadata *PauseMetadata
}

// NewTaskObj is the constructor for the taskObj class. If the parameters are not valid, it will
Expand Down Expand Up @@ -295,6 +341,7 @@ func (this *taskObj) TaskObjSetFailed(errMsg string) {

func (this *taskObj) setMasterNoLock() {
this.master = true
this.pauseMetadata = NewPauseMetadata()
}

func (this *taskObj) updateTaskTypeNoLock(newTaskType service.TaskType) {
Expand Down
14 changes: 9 additions & 5 deletions secondary/tests/functionaltests/set14_rebalance_test.go
Expand Up @@ -10,7 +10,9 @@ import (
"time"

"github.com/couchbase/cbauth/service"
"github.com/couchbase/indexing/secondary/common"
c "github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/indexer"
cluster "github.com/couchbase/indexing/secondary/tests/framework/clusterutility"
tc "github.com/couchbase/indexing/secondary/tests/framework/common"
"github.com/couchbase/indexing/secondary/tests/framework/secondaryindex"
Expand Down Expand Up @@ -840,16 +842,18 @@ func TestPause(t *testing.T) {
// Pause is async so give some time for it to write its files to archive
time.Sleep(5 * time.Second)

// Read and verify /tmp/TestPause/version.json
filePath := archivePath + "version.json"
// Read and verify /tmp/TestPause/pauseMetadata.json
filePath := archivePath + indexer.FILENAME_PAUSE_METADATA
versionJson, err := tc.ReadFileToString(filePath)
if err != nil {
t.Fatalf("%v Pause ReadFileToString(%v) returned error: %v", _TestPause, filePath, err)
}
const expectedJson = "{\"version\":100}\n"
if versionJson != expectedJson {
expectedJson := common.GetLocalInternalVersion()
pauseMetadata := indexer.NewPauseMetadata()
json.Unmarshal([]byte(versionJson[8:]), pauseMetadata)
if !expectedJson.Equals(c.InternalVersion(pauseMetadata.Version)) {
t.Fatalf("%v Pause expected versionJson '%v', got '%v", _TestPause, expectedJson,
versionJson)
versionJson[8:])
}

// kjc Extend this as more Pause functionality is completed. Cancel the Pause in the mean time
Expand Down

0 comments on commit 735b591

Please sign in to comment.