Skip to content

Commit

Permalink
MB-46245 : Add a unit test for Scheduled index creation
Browse files Browse the repository at this point in the history
Change-Id: Ic9fd772230c2aeee95e684c3a6def37fa41fc759
  • Loading branch information
ksaikrishnateja committed Jun 30, 2021
1 parent 12a1b0a commit 33af27c
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 3 deletions.
148 changes: 145 additions & 3 deletions secondary/tests/framework/secondaryindex/secondaryindexmanagement.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"path"
"path/filepath"
"sync"
"time"

c "github.com/couchbase/indexing/secondary/common"
Expand Down Expand Up @@ -51,6 +52,17 @@ func CreateClient(server, serviceAddr string) (*qc.GsiClient, error) {
return client, nil
}

func CreateClientWithConfig(server string, givenConfig c.Config) (*qc.GsiClient, error) {
config := givenConfig.SectionConfig("queryport.client.", true)
client, err := qc.NewGsiClient(server, config)
if err != nil {
log.Printf("Error while creating gsi client: ", err)
return nil, err
}

return client, nil
}

func GetDefnID(client *qc.GsiClient, bucket, indexName string) (defnID uint64, ok bool) {

return GetDefnID2(client, bucket, c.DEFAULT_SCOPE, c.DEFAULT_COLLECTION, indexName)
Expand All @@ -73,6 +85,33 @@ func GetDefnID2(client *qc.GsiClient, bucket, scopeName,
return uint64(c.IndexDefnId(0)), false
}

func GetDefnIdsDefault(client *qc.GsiClient, bucket string, indexNames []string) map[string]uint64 {
return GetDefnIds(client, bucket, c.DEFAULT_SCOPE, c.DEFAULT_COLLECTION, indexNames)
}

func GetDefnIds(client *qc.GsiClient, bucket, scopeName, collectionName string,
indexNames []string) map[string]uint64 {

defnIds := make(map[string]uint64, len(indexNames))
for _, i := range indexNames {
defnIds[i] = 0
}

indexes, _, _, _, err := client.Refresh()
tc.HandleError(err, "Error while listing the indexes")
for _, index := range indexes {
defn := index.Definition
if defn.Bucket != bucket || defn.Scope != scopeName || defn.Collection != collectionName {
continue
}
_, ok := defnIds[defn.Name]
if ok {
defnIds[defn.Name] = uint64(index.Definition.DefnId)
}
}
return defnIds
}

// Creates an index and waits for it to become active
func CreateSecondaryIndex(
indexName, bucketName, server, whereExpr string, indexFields []string, isPrimary bool, with []byte,
Expand Down Expand Up @@ -153,6 +192,46 @@ func CreateSecondaryIndexAsync(
skipIfExists, 0 /*timeout*/, client)
}

func CreateIndexesConcurrently(bucketName, server string,
indexNameToFieldMap map[string][]string, indexNameToWhereExp map[string]string,
indexNameToIsPrimary map[string]bool, indexNameToWith map[string][]byte,
skipIfExists bool, client *qc.GsiClient) []error {

var wg sync.WaitGroup
errList := make([]error, len(indexNameToFieldMap))

createAsync := func(in string, fields []string, errListIndex int, wg *sync.WaitGroup) {
defer wg.Done()

var where string
var with []byte
var isPrimary bool
if indexNameToWhereExp != nil {
where = indexNameToWhereExp[in]
}
if indexNameToWith != nil {
with = indexNameToWith[in]
}
if indexNameToIsPrimary != nil {
isPrimary = indexNameToIsPrimary[in]
}

err := CreateSecondaryIndexAsync(in, bucketName, server, where, fields, isPrimary, with, skipIfExists, nil)
errList[errListIndex] = err
}

errListIndex := 0
for indexName, indexFields := range indexNameToFieldMap {
wg.Add(1)
go createAsync(indexName, indexFields, errListIndex, &wg)
errListIndex++
}

wg.Wait()

return errList
}

// Todo: Remove this function and update functional tests to use BuildIndexes
func BuildIndex(indexName, bucketName, server string, indexActiveTimeoutSeconds int64) error {
client, e := GetOrCreateClient(server, "2itest")
Expand Down Expand Up @@ -233,14 +312,77 @@ func WaitTillIndexActive(defnID uint64, client *qc.GsiClient, indexActiveTimeout
state, _ := client.IndexState(defnID)

if state == c.INDEX_STATE_ACTIVE {
log.Printf("Index is now active")
log.Printf("Index is %d now active", defnID)
return nil
} else {
log.Printf("Waiting for index to go active ...")
log.Printf("Waiting for index %d to go active ...", defnID)
time.Sleep(1 * time.Second)
}
}
return nil
}

func GetStatusOfAllIndexes(server string) (indexStateMap map[string]c.IndexState, err error) {
var client *qc.GsiClient
client, err = GetOrCreateClient(server, "2itest")
if err != nil {
log.Printf("PrintStatusOfAllIndexes(): Error from GetOrCreateClient: %v ", err)
return
}

indexes, _, _, _, err := client.Refresh()
if err != nil {
log.Printf("PrintStatusOfAllIndexes(): Error from client.Refresh(): %v ", err)
return
}

indexStateMap = make(map[string]c.IndexState, len(indexes))
for _, index := range indexes {
defn := index.Definition
indexStateMap[defn.Name] = index.State
}

return
}

func WaitTillAllIndexesActive(defnIDs []uint64, client *qc.GsiClient, indexActiveTimeoutSeconds int64) error {
start := time.Now()
activeMap := make(map[uint64]bool, len(defnIDs))
for {
elapsed := time.Since(start)
if elapsed.Seconds() >= float64(indexActiveTimeoutSeconds) {
err := errors.New(fmt.Sprintf("Index did not become active after %d seconds", indexActiveTimeoutSeconds))
return err
}

for _, defnID := range defnIDs {
if active, ok := activeMap[defnID]; ok && active {
continue
}
state, _ := client.IndexState(defnID)

if state == c.INDEX_STATE_ACTIVE {
log.Printf("Index %d is now active", defnID)
activeMap[defnID] = true
} else {
log.Printf("Waiting for index %d in state %v to go active ...", defnID, state)
activeMap[defnID] = false
}
}

allActive := true
for _, active := range activeMap {
if !active {
allActive = false
break
}
}

if allActive {
return nil
}

time.Sleep(3 * time.Second)
}
}

func WaitTillAllIndexNodesActive(server string, indexerActiveTimeoutSeconds int64) error {
Expand Down
103 changes: 103 additions & 0 deletions secondary/tests/functionaltests/set20_scheduled_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package functionaltests

import (
"log"
"testing"
"time"

c "github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/queryport/client"
tc "github.com/couchbase/indexing/secondary/tests/framework/common"
"github.com/couchbase/indexing/secondary/tests/framework/kvutility"
"github.com/couchbase/indexing/secondary/tests/framework/secondaryindex"
)

// Create multiple indexes Asynchronously atleast one of them will be scheduled and wait till it is created.
func TestScheduleIndexBasic(t *testing.T) {
log.Printf("In TestMultipleDeferredIndexes_BuildTogether()")

var client *client.GsiClient
var bucketName = "default"
var index1 = "id_company"
var index2 = "id_age"
var index3 = "id_gender"
var index4 = "id_isActive"

err := secondaryindex.DropAllSecondaryIndexes(indexManagementAddress)
FailTestIfError(err, "Error in DropAllSecondaryIndexes", t)

// Load data
docsToCreate := generateDocs(50000, "users.prod")
UpdateKVDocs(docsToCreate, docs)
log.Printf("Setting JSON docs in KV")
kvutility.SetKeyValues(docsToCreate, "default", "", clusterconfig.KVAddress)

// Disable background index creation to verify if some indexes are in SCHEDULED state
err = secondaryindex.ChangeIndexerSettings("indexer.debug.enableBackgroundIndexCreation", false, clusterconfig.Username, clusterconfig.Password, kvaddress)
tc.HandleError(err, "Error in ChangeIndexerSettings")

// Launch in another go routine as waitForScheduledIndex will block the goroutines creating scheduled index
log.Printf("Creating indexes Asynchronously")
go func() {
indexNameToFieldMap := make(map[string][]string, 4)
indexNameToFieldMap[index1] = []string{"company"}
indexNameToFieldMap[index2] = []string{"age"}
indexNameToFieldMap[index3] = []string{"gender"}
indexNameToFieldMap[index4] = []string{"isActive"}

errList := secondaryindex.CreateIndexesConcurrently(bucketName, indexManagementAddress, indexNameToFieldMap, nil, nil, nil, true, nil)
for _, err := range errList {
log.Printf("Error %v Observed when creating index", err)
}
}()

// Get all definition IDs
log.Printf("Finding definition IDs for all indexes")
client, err = secondaryindex.GetOrCreateClient(indexManagementAddress, "")
FailTestIfError(err, "Unable to get or create client", t)
retryCount := 0
retry:
defnIDMap := secondaryindex.GetDefnIdsDefault(client, bucketName, []string{index1, index2, index3, index4})
defnIDs := make([]uint64, 0)
for index, defnID := range defnIDMap {
if defnID == 0 {
if retryCount >= 5 {
t.Fatalf("Index Definition not available for %v", index)
}
retryCount++
time.Sleep(3 * time.Second)
goto retry
}
defnIDs = append(defnIDs, defnID)
}

// Print status of indexes.
log.Printf("Status of all indexes")
indexStateMap, err := secondaryindex.GetStatusOfAllIndexes(indexManagementAddress)
tc.HandleError(err, "Error while fetching status of indexes")

// Throw error if no index is in scheduled state
anyScheduledIndex := false
for idxName, idxState := range indexStateMap {
log.Printf("Index %s is in state %v", idxName, idxState)
if idxState == c.INDEX_STATE_SCHEDULED {
anyScheduledIndex = true
break
}
}

if !anyScheduledIndex {
t.Fatalf("Found no indexes in scheduled state")
}

// Enable Creation of indexes for scheduled indexes to proceed with creation.
err = secondaryindex.ChangeIndexerSettings("indexer.debug.enableBackgroundIndexCreation", true, clusterconfig.Username, clusterconfig.Password, kvaddress)
tc.HandleError(err, "Error in ChangeIndexerSettings")

// Wait till all indexes are created.
log.Printf("Waiting for all indexes to become active")
err = secondaryindex.WaitTillAllIndexesActive(defnIDs, client, defaultIndexActiveTimeout)
if err != nil {
FailTestIfError(err, "Error in WaitTillIndexActive", t)
}
}

0 comments on commit 33af27c

Please sign in to comment.