Skip to content

Commit

Permalink
MB-26597 : GSI - Add Log Redaction Support - Part 2
Browse files Browse the repository at this point in the history
Added Log Redaction support for below packages -
cmd
common
indexer

Change-Id: If0ce20b5b186cf22f20d071d1acaf3bf73e6d57b
  • Loading branch information
prathibha-cb committed Jan 24, 2018
1 parent 0586f6e commit c7aecc7
Show file tree
Hide file tree
Showing 14 changed files with 139 additions and 105 deletions.
2 changes: 1 addition & 1 deletion secondary/cmd/indexer/main.go
Expand Up @@ -68,7 +68,7 @@ func main() {
// setup cbauth
if *auth != "" {
up := strings.Split(*auth, ":")
logging.Tracef("Initializing cbauth with user %v for cluster %v\n", up[0], *cluster)
logging.Tracef("Initializing cbauth with user %v for cluster %v\n", logging.TagUD(up[0]), *cluster)
if _, err := cbauth.InternalRetryDefaultInit(*cluster, up[0], up[1]); err != nil {
logging.Fatalf("Failed to initialize cbauth: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion secondary/common/cluster_info.go
Expand Up @@ -160,7 +160,7 @@ func (c *ClusterInfoCache) Fetch() error {
// node being added (but not yet rebalanced in)
addNodes = append(addNodes, n)
} else {
logging.Warnf("ClsuterInfoCache: unrecognized node membership %v", n.ClusterMembership)
logging.Warnf("ClusterInfoCache: unrecognized node membership %v", n.ClusterMembership)
}

// Find the minimum cluster compatibility
Expand Down
41 changes: 21 additions & 20 deletions secondary/common/index.go
Expand Up @@ -12,6 +12,7 @@ package common
import (
"encoding/json"
"fmt"
"github.com/couchbase/indexing/secondary/logging"
"strings"
)

Expand Down Expand Up @@ -231,11 +232,11 @@ func (idx IndexDefn) String() string {
str += fmt.Sprintf("IsPrimary: %v ", idx.IsPrimary)
str += fmt.Sprintf("NumReplica: %v ", idx.NumReplica)
str += fmt.Sprintf("InstVersion: %v ", idx.InstVersion)
str += fmt.Sprintf("\n\t\tSecExprs: %v ", idx.SecExprs)
str += fmt.Sprintf("\n\t\tSecExprs: %v ", logging.TagUD(idx.SecExprs))
str += fmt.Sprintf("\n\t\tDesc: %v", idx.Desc)
str += fmt.Sprintf("\n\t\tPartitionScheme: %v ", idx.PartitionScheme)
str += fmt.Sprintf("PartitionKeys: %v ", idx.PartitionKeys)
str += fmt.Sprintf("WhereExpr: %v ", idx.WhereExpr)
str += fmt.Sprintf("WhereExpr: %v ", logging.TagUD(idx.WhereExpr))
str += fmt.Sprintf("RetainDeletedXATTR: %v ", idx.RetainDeletedXATTR)
return str

Expand All @@ -245,23 +246,23 @@ func (idx IndexDefn) String() string {
// field. It is a shallow copy (e.g. does not clone field 'Nodes').
func (idx IndexDefn) Clone() *IndexDefn {
return &IndexDefn{
DefnId: idx.DefnId,
Name: idx.Name,
Using: idx.Using,
Bucket: idx.Bucket,
BucketUUID: idx.BucketUUID,
IsPrimary: idx.IsPrimary,
SecExprs: idx.SecExprs,
Desc: idx.Desc,
ExprType: idx.ExprType,
PartitionScheme: idx.PartitionScheme,
PartitionKeys: idx.PartitionKeys,
WhereExpr: idx.WhereExpr,
Deferred: idx.Deferred,
Immutable: idx.Immutable,
Nodes: idx.Nodes,
IsArrayIndex: idx.IsArrayIndex,
NumReplica: idx.NumReplica,
DefnId: idx.DefnId,
Name: idx.Name,
Using: idx.Using,
Bucket: idx.Bucket,
BucketUUID: idx.BucketUUID,
IsPrimary: idx.IsPrimary,
SecExprs: idx.SecExprs,
Desc: idx.Desc,
ExprType: idx.ExprType,
PartitionScheme: idx.PartitionScheme,
PartitionKeys: idx.PartitionKeys,
WhereExpr: idx.WhereExpr,
Deferred: idx.Deferred,
Immutable: idx.Immutable,
Nodes: idx.Nodes,
IsArrayIndex: idx.IsArrayIndex,
NumReplica: idx.NumReplica,
RetainDeletedXATTR: idx.RetainDeletedXATTR,
}
}
Expand Down Expand Up @@ -517,7 +518,7 @@ func IsEquivalentIndex(d1, d2 *IndexDefn) bool {
d1.ExprType != d2.ExprType ||
d1.PartitionScheme != d2.PartitionScheme ||
d1.WhereExpr != d2.WhereExpr ||
d1.RetainDeletedXATTR != d2.RetainDeletedXATTR {
d1.RetainDeletedXATTR != d2.RetainDeletedXATTR {

return false
}
Expand Down
22 changes: 11 additions & 11 deletions secondary/indexer/flusher.go
Expand Up @@ -342,7 +342,7 @@ func (f *flusher) flushSingleMutation(mut *MutationKeys, streamId common.StreamI
func (f *flusher) flush(mutk *MutationKeys, streamId common.StreamId) {

logging.LazyTrace(func() string {
return fmt.Sprintf("Flusher::flush Flushing Stream %v Mutations %v", streamId, mutk)
return fmt.Sprintf("Flusher::flush Flushing Stream %v Mutations %v", streamId, logging.TagUD(mutk))
})

var processedUpserts []common.IndexInstId
Expand All @@ -353,7 +353,7 @@ func (f *flusher) flush(mutk *MutationKeys, streamId common.StreamId) {
if idxInst, ok = f.indexInstMap[mut.uuid]; !ok {
logging.LazyTrace(func() string {
return fmt.Sprintf("Flusher::flush Unknown Index Instance Id %v. "+
"Skipped Mutation Key %v", mut.uuid, mut.key)
"Skipped Mutation Key %v", mut.uuid, logging.TagUD(mut.key))
})
continue
}
Expand All @@ -363,7 +363,7 @@ func (f *flusher) flush(mutk *MutationKeys, streamId common.StreamId) {
logging.LazyTrace(func() string {
return fmt.Sprintf("Flusher::flush Found Mutation For IndexId: %v Stream: %v In "+
"Stream: %v. Skipped Mutation Key %v", idxInst.InstId, idxInst.Stream,
streamId, mut.key)
streamId, logging.TagUD(mut.key))
})
continue
}
Expand All @@ -373,7 +373,7 @@ func (f *flusher) flush(mutk *MutationKeys, streamId common.StreamId) {
if idxInst.State == common.INDEX_STATE_DELETED {
logging.LazyTrace(func() string {
return fmt.Sprintf("Flusher::flush Found Mutation For IndexId: %v In "+
"DELETED State. Skipped Mutation Key %v", idxInst.InstId, mut.key)
"DELETED State. Skipped Mutation Key %v", idxInst.InstId, logging.TagUD(mut.key))
})
continue
}
Expand Down Expand Up @@ -412,7 +412,7 @@ func (f *flusher) flush(mutk *MutationKeys, streamId common.StreamId) {

default:
logging.Errorf("Flusher::flush Unknown mutation type received. Skipped %v",
mut.key)
logging.TagUD(mut.key))
}
}
}
Expand All @@ -427,7 +427,7 @@ func (f *flusher) processUpsert(mut *Mutation, docid []byte, meta *MutationMeta)
var ok bool
if partnInstMap, ok = f.indexPartnMap[mut.uuid]; !ok {
logging.Errorf("Flusher::processUpsert Missing Partition Instance Map"+
"for IndexInstId: %v. Skipped Mutation Key: %v", mut.uuid, mut.key)
"for IndexInstId: %v. Skipped Mutation Key: %v", mut.uuid, logging.TagUD(mut.key))
return
}

Expand All @@ -436,16 +436,16 @@ func (f *flusher) processUpsert(mut *Mutation, docid []byte, meta *MutationMeta)
if err := slice.Insert(mut.key, docid, meta); err != nil {
logging.Errorf("Flusher::processUpsert Error indexing Key: %s "+
"docid: %s in Slice: %v. Error: %v. Skipped.",
mut.key, docid, slice.Id(), err)
logging.TagUD(mut.key), logging.TagStrUD(docid), slice.Id(), err)

if err2 := slice.Delete(docid, meta); err2 != nil {
logging.Errorf("Flusher::processUpsert Error removing entry due to error %v Key: %s "+
"docid: %s in Slice: %v. Error: %v", err, mut.key, docid, slice.Id(), err2)
"docid: %s in Slice: %v. Error: %v", err, logging.TagUD(mut.key), logging.TagStrUD(docid), slice.Id(), err2)
}
}
} else {
logging.Debugf("Flusher::processUpsert Partition Instance not found "+
"for Id: %v Skipped Mutation Key: %v", partnId, mut.key)
"for Id: %v Skipped Mutation Key: %v", partnId, logging.TagUD(mut.key))
}

}
Expand All @@ -456,15 +456,15 @@ func (f *flusher) processDelete(mut *Mutation, docid []byte, meta *MutationMeta)
var ok bool
if partnInstMap, ok = f.indexPartnMap[mut.uuid]; !ok {
logging.Errorf("Flusher:processDelete Missing Partition Instance Map"+
"for IndexInstId: %v. Skipped Mutation Key: %v", mut.uuid, mut.key)
"for IndexInstId: %v. Skipped Mutation Key: %v", mut.uuid, logging.TagUD(mut.key))
return
}

for _, partnInst := range partnInstMap {
slice := partnInst.Sc.GetSliceByIndexKey(common.IndexKey(mut.key))
if err := slice.Delete(docid, meta); err != nil {
logging.Errorf("Flusher::processDelete Error Deleting DocId: %v "+
"from Slice: %v", docid, slice.Id())
"from Slice: %v", logging.TagStrUD(docid), slice.Id())
}
}
}
Expand Down
42 changes: 21 additions & 21 deletions secondary/indexer/forestdb_slice_writer.go
Expand Up @@ -330,7 +330,7 @@ loop:

default:
logging.Errorf("ForestDBSlice::handleCommandsWorker \n\tSliceId %v IndexInstId %v Received "+
"Unknown Command %v", fdb.id, fdb.idxInstId, c)
"Unknown Command %v", fdb.id, fdb.idxInstId, logging.TagUD(c))
}

fdb.idxStats.numItemsFlushed.Add(int64(nmut))
Expand Down Expand Up @@ -369,15 +369,15 @@ func (fdb *fdbSlice) insert(key []byte, rawKey []byte, docid []byte, workerId in
func (fdb *fdbSlice) insertPrimaryIndex(key []byte, docid []byte, workerId int) (nmut int) {
var err error

logging.Tracef("ForestDBSlice::insert \n\tSliceId %v IndexInstId %v Set Key - %s", fdb.id, fdb.idxInstId, docid)
logging.Tracef("ForestDBSlice::insert \n\tSliceId %v IndexInstId %v Set Key - %s", fdb.id, fdb.idxInstId, logging.TagStrUD(docid))

//check if the docid exists in the main index
t0 := time.Now()
if _, err = fdb.main[workerId].GetKV(key); err == nil {
fdb.idxStats.Timings.stKVGet.Put(time.Now().Sub(t0))
//skip
logging.Tracef("ForestDBSlice::insert \n\tSliceId %v IndexInstId %v Key %v Already Exists. "+
"Primary Index Update Skipped.", fdb.id, fdb.idxInstId, string(docid))
"Primary Index Update Skipped.", fdb.id, fdb.idxInstId, logging.TagStrUD(docid))
} else if err != nil && err != forestdb.FDB_RESULT_KEY_NOT_FOUND {
fdb.checkFatalDbError(err)
logging.Errorf("ForestDBSlice::insert \n\tSliceId %v IndexInstId %v Error locating "+
Expand All @@ -388,7 +388,7 @@ func (fdb *fdbSlice) insertPrimaryIndex(key []byte, docid []byte, workerId int)
if err = fdb.main[workerId].SetKV(key, nil); err != nil {
fdb.checkFatalDbError(err)
logging.Errorf("ForestDBSlice::insert \n\tSliceId %v IndexInstId %v Error in Main Index Set. "+
"Skipped Key %s. Error %v", fdb.id, fdb.idxInstId, string(docid), err)
"Skipped Key %s. Error %v", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), err)
}
fdb.idxStats.Timings.stKVSet.Put(time.Now().Sub(t0))
atomic.AddInt64(&fdb.insert_bytes, int64(len(key)))
Expand Down Expand Up @@ -416,7 +416,7 @@ func (fdb *fdbSlice) insertSecIndex(key []byte, docid []byte, workerId int) (nmu
//in mutation, skip it.
if bytes.Equal(oldkey, key) {
logging.Tracef("ForestDBSlice::insert \n\tSliceId %v IndexInstId %v Received Unchanged Key for "+
"Doc Id %v. Key %v. Skipped.", fdb.id, fdb.idxInstId, string(docid), key)
"Doc Id %v. Key %v. Skipped.", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), logging.TagStrUD(key))
return
}

Expand Down Expand Up @@ -460,7 +460,7 @@ func (fdb *fdbSlice) insertSecIndex(key []byte, docid []byte, workerId int) (nmu
if err = fdb.back[workerId].SetKV(docid, key); err != nil {
fdb.checkFatalDbError(err)
logging.Errorf("ForestDBSlice::insert \n\tSliceId %v IndexInstId %v Error in Back Index Set. "+
"Skipped Key %s. Value %v. Error %v", fdb.id, fdb.idxInstId, string(docid), key, err)
"Skipped Key %s. Value %v. Error %v", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), logging.TagStrUD(key), err)
return
}
fdb.idxStats.Timings.stKVSet.Put(time.Now().Sub(t0))
Expand Down Expand Up @@ -499,7 +499,7 @@ func (fdb *fdbSlice) insertSecArrayIndex(key []byte, rawKey []byte, docid []byte
if oldkey != nil {
if bytes.Equal(oldkey, key) {
logging.Tracef("ForestDBSlice::insert \n\tSliceId %v IndexInstId %v Received Unchanged Key for "+
"Doc Id %s. Key %v. Skipped.", fdb.id, fdb.idxInstId, docid, key)
"Doc Id %s. Key %v. Skipped.", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), logging.TagStrUD(key))
return
}

Expand All @@ -522,7 +522,7 @@ func (fdb *fdbSlice) insertSecArrayIndex(key []byte, rawKey []byte, docid []byte
if oldEntriesBytes, oldKeyCount, _, err = ArrayIndexItems(oldkey, fdb.arrayExprPosition,
tmpBuf, fdb.isArrayDistinct, false); err != nil {
logging.Errorf("ForestDBSlice::insert SliceId %v IndexInstId %v Error in retrieving "+
"compostite old secondary keys. Skipping docid:%s Error: %v", fdb.id, fdb.idxInstId, docid, err)
"compostite old secondary keys. Skipping docid:%s Error: %v", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), err)
return fdb.deleteSecArrayIndex(docid, workerId)
}
}
Expand All @@ -539,7 +539,7 @@ func (fdb *fdbSlice) insertSecArrayIndex(key []byte, rawKey []byte, docid []byte
(*tmpBufPtr)[:0], fdb.isArrayDistinct, true)
if err != nil {
logging.Errorf("ForestDBSlice::insert SliceId %v IndexInstId %v Error in creating "+
"compostite new secondary keys. Skipping docid:%s Error: %v", fdb.id, fdb.idxInstId, docid, err)
"compostite new secondary keys. Skipping docid:%s Error: %v", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), err)
return fdb.deleteSecArrayIndex(docid, workerId)
}
}
Expand Down Expand Up @@ -578,7 +578,7 @@ func (fdb *fdbSlice) insertSecArrayIndex(key []byte, rawKey []byte, docid []byte
encBufPool.Put(tmpBufPtr)
// TODO: Handle skipped item here
logging.Errorf("ForestDBSlice::insert SliceId %v IndexInstId %v Error forming entry "+
"to be deleted from main index. Skipping docid:%s Error: %v", fdb.id, fdb.idxInstId, docid, err)
"to be deleted from main index. Skipping docid:%s Error: %v", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), err)
return fdb.deleteSecArrayIndex(docid, workerId)
}
keysToBeDeleted = append(keysToBeDeleted, keyToBeDeleted)
Expand All @@ -598,7 +598,7 @@ func (fdb *fdbSlice) insertSecArrayIndex(key []byte, rawKey []byte, docid []byte
encBufPool.Put(tmpBufPtr)
// TODO: Handle skipped item here
logging.Errorf("ForestDBSlice::insert SliceId %v IndexInstId %v Error forming entry "+
"to be added to main index. Skipping docid:%s Error: %v", fdb.id, fdb.idxInstId, docid, err)
"to be added to main index. Skipping docid:%s Error: %v", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), err)
return fdb.deleteSecArrayIndex(docid, workerId)
}
keysToBeAdded = append(keysToBeAdded, keyToBeAdded)
Expand All @@ -624,7 +624,7 @@ func (fdb *fdbSlice) insertSecArrayIndex(key []byte, rawKey []byte, docid []byte
if err = fdb.main[workerId].SetKV(keyToBeAdded, nil); err != nil {
fdb.checkFatalDbError(err)
logging.Errorf("ForestDBSlice::insert \n\tSliceId %v IndexInstId %v Error in Main Index Set. "+
"Skipped Key %v. Error %v", fdb.id, fdb.idxInstId, key, err)
"Skipped Key %v. Error %v", fdb.id, fdb.idxInstId, logging.TagStrUD(key), err)
return
}
fdb.idxStats.Timings.stKVSet.Put(time.Now().Sub(t0))
Expand Down Expand Up @@ -655,7 +655,7 @@ func (fdb *fdbSlice) insertSecArrayIndex(key []byte, rawKey []byte, docid []byte
if err = fdb.back[workerId].SetKV(docid, key); err != nil {
fdb.checkFatalDbError(err)
logging.Errorf("ForestDBSlice::insert \n\tSliceId %v IndexInstId %v Error in Back Index Set. "+
"Skipped Key %s. Value %v. Error %v", fdb.id, fdb.idxInstId, string(docid), key, err)
"Skipped Key %s. Value %v. Error %v", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), logging.TagStrUD(key), err)
return
}
fdb.idxStats.Timings.stKVSet.Put(time.Now().Sub(t0))
Expand Down Expand Up @@ -702,7 +702,7 @@ func (fdb *fdbSlice) deletePrimaryIndex(docid []byte, workerId int) (nmut int) {
fdb.checkFatalDbError(err)
logging.Errorf("ForestDBSlice::delete \n\tSliceId %v IndexInstId %v. Error deleting "+
"entry from main index for Doc %s. Error %v", fdb.id, fdb.idxInstId,
docid, err)
logging.TagStrUD(docid), err)
return
}
fdb.idxStats.Timings.stKVDelete.Put(time.Now().Sub(t0))
Expand All @@ -723,15 +723,15 @@ func (fdb *fdbSlice) deleteSecIndex(docid []byte, workerId int) (nmut int) {
if olditm, err = fdb.getBackIndexEntry(docid, workerId); err != nil {
fdb.checkFatalDbError(err)
logging.Errorf("ForestDBSlice::delete \n\tSliceId %v IndexInstId %v. Error locating "+
"backindex entry for Doc %s. Error %v", fdb.id, fdb.idxInstId, docid, err)
"backindex entry for Doc %s. Error %v", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), err)
return
}

//if the oldkey is nil, nothing needs to be done. This is the case of deletes
//which happened before index was created.
if olditm == nil {
logging.Tracef("ForestDBSlice::delete \n\tSliceId %v IndexInstId %v Received NIL Key for "+
"Doc Id %v. Skipped.", fdb.id, fdb.idxInstId, docid)
"Doc Id %v. Skipped.", fdb.id, fdb.idxInstId, logging.TagStrUD(docid))
return
}

Expand All @@ -741,7 +741,7 @@ func (fdb *fdbSlice) deleteSecIndex(docid []byte, workerId int) (nmut int) {
fdb.checkFatalDbError(err)
logging.Errorf("ForestDBSlice::delete \n\tSliceId %v IndexInstId %v. Error deleting "+
"entry from main index for Doc %s. Key %v. Error %v", fdb.id, fdb.idxInstId,
docid, olditm, err)
logging.TagStrUD(docid), logging.TagStrUD(olditm), err)
return
}
fdb.idxStats.Timings.stKVDelete.Put(time.Now().Sub(t0))
Expand All @@ -752,7 +752,7 @@ func (fdb *fdbSlice) deleteSecIndex(docid []byte, workerId int) (nmut int) {
if err = fdb.back[workerId].DeleteKV(docid); err != nil {
fdb.checkFatalDbError(err)
logging.Errorf("ForestDBSlice::delete \n\tSliceId %v IndexInstId %v. Error deleting "+
"entry from back index for Doc %s. Error %v", fdb.id, fdb.idxInstId, docid, err)
"entry from back index for Doc %s. Error %v", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), err)
return
}
fdb.idxStats.Timings.stKVDelete.Put(time.Now().Sub(t0))
Expand All @@ -768,13 +768,13 @@ func (fdb *fdbSlice) deleteSecArrayIndex(docid []byte, workerId int) (nmut int)
if olditm, err = fdb.getBackIndexEntry(docid, workerId); err != nil {
fdb.checkFatalDbError(err)
logging.Errorf("ForestDBSlice::delete \n\tSliceId %v IndexInstId %v. Error locating "+
"backindex entry for Doc %s. Error %v", fdb.id, fdb.idxInstId, docid, err)
"backindex entry for Doc %s. Error %v", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), err)
return
}

if olditm == nil {
logging.Tracef("ForestDBSlice::delete \n\tSliceId %v IndexInstId %v Received NIL Key for "+
"Doc Id %v. Skipped.", fdb.id, fdb.idxInstId, docid)
"Doc Id %v. Skipped.", fdb.id, fdb.idxInstId, logging.TagStrUD(docid))
return
}

Expand Down Expand Up @@ -844,7 +844,7 @@ func (fdb *fdbSlice) deleteSecArrayIndex(docid []byte, workerId int) (nmut int)
if err = fdb.back[workerId].DeleteKV(docid); err != nil {
fdb.checkFatalDbError(err)
logging.Errorf("ForestDBSlice::delete \n\tSliceId %v IndexInstId %v. Error deleting "+
"entry from back index for Doc %s. Error %v", fdb.id, fdb.idxInstId, docid, err)
"entry from back index for Doc %s. Error %v", fdb.id, fdb.idxInstId, logging.TagStrUD(docid), err)
return
}
fdb.idxStats.Timings.stKVDelete.Put(time.Now().Sub(t0))
Expand Down

0 comments on commit c7aecc7

Please sign in to comment.