diff --git a/secondary/indexer/pause_pauser.go b/secondary/indexer/pause_pauser.go index e2d6e8412..e8500ecf6 100644 --- a/secondary/indexer/pause_pauser.go +++ b/secondary/indexer/pause_pauser.go @@ -723,21 +723,30 @@ func (this *Pauser) run(master bool) { var err error reader := bytes.NewReader(nil) + dbg := true // TODO: use system config here + ///////////////////////////////////////////// // Work done by master only ///////////////////////////////////////////// if master { + this.task.pauseMetadata.setVersionNoLock(common.GetLocalInternalVersion().String()) // Write the version.json file to the archive - byteSlice = []byte(fmt.Sprintf("{\"version\":%v}\n", ARCHIVE_VERSION)) - reader.Reset(byteSlice) - err = Upload(this.task.archivePath, FILENAME_VERSION, reader) + // byteSlice = []byte(fmt.Sprintf("{\"version\":%v}\n", ARCHIVE_VERSION)) + // reader.Reset(byteSlice) + data, err := json.Marshal(this.task.pauseMetadata) + if err != nil { + this.failPause("Pauser::run:", "Marshal PauseMetadata", err) + return + } + reader.Reset(common.ChecksumAndCompress(data,!dbg)) + err = Upload(this.task.archivePath, FILENAME_PAUSE_METADATA, reader) if err != nil { - this.failPause("Pauser::run:", "Upload "+FILENAME_VERSION, err) + this.failPause("Pauser::run:", "Upload "+FILENAME_PAUSE_METADATA, err) return } - logging.Tracef("Pauser::run: indexer version successfully uploaded to %v%v for taskId %v", this.task.archivePath, FILENAME_VERSION, this.task.taskId) + logging.Tracef("Pauser::run: indexer version successfully uploaded to %v%v for taskId %v", this.task.archivePath, FILENAME_PAUSE_METADATA, this.task.taskId) // Notify the followers to start working on this task this.pauseMgr.RestNotifyPause(this.otherIndexAddrs, this.task) @@ -749,7 +758,6 @@ func (this *Pauser) run(master bool) { // nodePath is the path to the node-specific archive subdirectory for the current node nodePath := this.task.archivePath + this.nodeDir - dbg := true // TODO: use system config here // Get the index metadata from all nodes and write it as a single file to the archive byteSlice, indexMetadata, err := this.restGetLocalIndexMetadataBinary(!dbg) diff --git a/secondary/indexer/pause_service_manager.go b/secondary/indexer/pause_service_manager.go index e336d6c23..2986dabd7 100644 --- a/secondary/indexer/pause_service_manager.go +++ b/secondary/indexer/pause_service_manager.go @@ -135,8 +135,8 @@ const FILENAME_METADATA = "indexMetadata.json" // FILENAME_STATS is the name of the file to write containing persisted index stats from ONE node. const FILENAME_STATS = "indexStats.json" -// FILENAME_VERSION is the name of the file to write containing the ARCHIVE_VERSION. -const FILENAME_VERSION = "version.json" +// FILENAME_PAUSE_METADATA is the name of the file to write containing the PauseMetadata. +const FILENAME_PAUSE_METADATA = "pauseMetadata.json" //////////////////////////////////////////////////////////////////////////////////////////////////// // Enums @@ -214,6 +214,48 @@ func (this bucketStateEnum) String() string { // Type definitions //////////////////////////////////////////////////////////////////////////////////////////////////// +// this information should be considered read only during resume +// created by pause leader node +type PauseMetadata struct { + // nodeId gives node info + // map[nodeId]->shardIds gives information about shardIds copied from node + // map[shardId] -> string are obj store paths where each shard is saved + Data map[service.NodeID][]common.ShardId `json:"metadata"` + + // cluster Version during data creation + Version string `json:"version"` + + lock *sync.RWMutex +} + +func NewPauseMetadata() *PauseMetadata { + return &PauseMetadata{ + // size value should be max nodes in a subcluster + Data: make(map[service.NodeID][]common.ShardId, 2), + lock: &sync.RWMutex{}, + } +} + +func (pm *PauseMetadata) setVersionNoLock(ver string) { + pm.Version = ver +} + +func (pm *PauseMetadata) addShardNoLock(nodeId service.NodeID, shardId common.ShardId) { + nodeMap, ok := pm.Data[nodeId] + if !ok { + // size value should be no of shards per bucket + pm.Data[nodeId] = make([]common.ShardId, 0, 2) + nodeMap = pm.Data[nodeId] + } + pm.Data[nodeId] = append(nodeMap, shardId) +} + +func (pm *PauseMetadata) addShard(nodeId service.NodeID, shardId common.ShardId) { + pm.lock.Lock() + defer pm.lock.Unlock() + pm.addShardNoLock(nodeId, shardId) +} + // taskObj represents one task of any type in the taskType enum below. This is the GSI internal // representation, not the GetTaskList return format. // @@ -258,6 +300,10 @@ type taskObj struct { // ctx holds the context object that can be used for task context ctx context.Context cancelFunc context.CancelFunc + + // PauseMetadata stores the metadata about the pause + // this is only created and saved by the master + pauseMetadata *PauseMetadata } // NewTaskObj is the constructor for the taskObj class. If the parameters are not valid, it will @@ -295,6 +341,7 @@ func (this *taskObj) TaskObjSetFailed(errMsg string) { func (this *taskObj) setMasterNoLock() { this.master = true + this.pauseMetadata = NewPauseMetadata() } func (this *taskObj) updateTaskTypeNoLock(newTaskType service.TaskType) { diff --git a/secondary/tests/functionaltests/set14_rebalance_test.go b/secondary/tests/functionaltests/set14_rebalance_test.go index c8a47a688..d8fbbbccb 100644 --- a/secondary/tests/functionaltests/set14_rebalance_test.go +++ b/secondary/tests/functionaltests/set14_rebalance_test.go @@ -10,7 +10,9 @@ import ( "time" "github.com/couchbase/cbauth/service" + "github.com/couchbase/indexing/secondary/common" c "github.com/couchbase/indexing/secondary/common" + "github.com/couchbase/indexing/secondary/indexer" cluster "github.com/couchbase/indexing/secondary/tests/framework/clusterutility" tc "github.com/couchbase/indexing/secondary/tests/framework/common" "github.com/couchbase/indexing/secondary/tests/framework/secondaryindex" @@ -840,16 +842,18 @@ func TestPause(t *testing.T) { // Pause is async so give some time for it to write its files to archive time.Sleep(5 * time.Second) - // Read and verify /tmp/TestPause/version.json - filePath := archivePath + "version.json" + // Read and verify /tmp/TestPause/pauseMetadata.json + filePath := archivePath + indexer.FILENAME_PAUSE_METADATA versionJson, err := tc.ReadFileToString(filePath) if err != nil { t.Fatalf("%v Pause ReadFileToString(%v) returned error: %v", _TestPause, filePath, err) } - const expectedJson = "{\"version\":100}\n" - if versionJson != expectedJson { + expectedJson := common.GetLocalInternalVersion() + pauseMetadata := indexer.NewPauseMetadata() + json.Unmarshal([]byte(versionJson[8:]), pauseMetadata) + if !expectedJson.Equals(c.InternalVersion(pauseMetadata.Version)) { t.Fatalf("%v Pause expected versionJson '%v', got '%v", _TestPause, expectedJson, - versionJson) + versionJson[8:]) } // kjc Extend this as more Pause functionality is completed. Cancel the Pause in the mean time