Skip to content

Commit

Permalink
MB-19040 - cbdatasource logVBucketStates() for more diagnosis info
Browse files Browse the repository at this point in the history
Change-Id: Ia664671645f1d35fb96e8d022d14261ca61624e2
Reviewed-on: http://review.couchbase.org/62783
Reviewed-by: Marty Schoch <marty.schoch@gmail.com>
Tested-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information
steveyen committed Apr 13, 2016
1 parent 431be8c commit f4a0cda
Showing 1 changed file with 70 additions and 0 deletions.
70 changes: 70 additions & 0 deletions cbdatasource/cbdatasource.go
Expand Up @@ -1092,6 +1092,11 @@ func (d *bucketDataSource) worker(server string, workerCh chan []uint16) int {
d.Kick("new-worker")

for {
currVBucketsMutex.Lock()
d.logVBucketStates(server, uprOpenName, "worker, looping beg",
currVBuckets, nil)
currVBucketsMutex.Unlock()

select {
case <-sendEndCh:
atomic.AddUint64(&d.stats.TotWorkerSendEndCh, 1)
Expand All @@ -1114,7 +1119,11 @@ func (d *bucketDataSource) worker(server string, workerCh chan []uint16) int {
}

currVBucketsMutex.Lock()
d.logVBucketStates(server, uprOpenName, "refreshWorker-prior",
currVBuckets, nil)
err := d.refreshWorker(sendCh, currVBuckets, wantVBucketIDs)
d.logVBucketStates(server, uprOpenName, "refreshWorker-after",
currVBuckets, err)
currVBucketsMutex.Unlock()
if err != nil {
return cleanup(0, err)
Expand Down Expand Up @@ -1535,6 +1544,67 @@ func (d *bucketDataSource) Kick(reason string) error {

// --------------------------------------------------------------

func (d *bucketDataSource) logVBucketStates(server, uprOpenName, prefix string,
vbucketStates map[uint16]*VBucketState, errToLog error) {
if d.options.Logf == nil {
return
}

if len(vbucketStates) <= 0 {
d.options.Logf("cbdatasource: server: %s, uprOpenName: %s, %s,"+
" vbucketStates empty", server, uprOpenName, prefix)
}

// Key is VBucketState.State (ex: "", "running", "closing"),
// and value is array of vbid's.
byState := map[string]sort.IntSlice{}
for vbid, vbucketState := range vbucketStates {
byState[vbucketState.State] =
append(byState[vbucketState.State], int(vbid))
}

for state, vbids := range byState {
var buf bytes.Buffer

if len(vbids) > 0 {
first := true
emitRange := func(beg, end int) {
if !first {
fmt.Fprint(&buf, ", ")
}
if beg < end {
fmt.Fprintf(&buf, "%d-%d", beg, end)
} else {
fmt.Fprintf(&buf, "%d", beg)
}
first = false
}

sort.Sort(vbids)

vbidRangeBeg := vbids[0]
vbidRangeEnd := vbids[0]

for i := 1; i < len(vbids); i++ {
vbid := vbids[i]
if vbid > vbidRangeEnd+1 {
emitRange(vbidRangeBeg, vbidRangeEnd)
vbidRangeBeg = vbid
}
vbidRangeEnd = vbid
}

emitRange(vbidRangeBeg, vbidRangeEnd)
}

d.options.Logf("cbdatasource: server: %s, uprOpenName: %s, %s,"+
" vbucketState: %q (has %d vbuckets), %s",
server, uprOpenName, prefix, state, len(vbids), buf.String())
}
}

// --------------------------------------------------------------

type bucketWrapper struct {
b *couchbase.Bucket
}
Expand Down

0 comments on commit f4a0cda

Please sign in to comment.