Skip to content

Commit

Permalink
MB-17094 Inconsistent timestamps in query.log
Browse files Browse the repository at this point in the history
Change-Id: I7bd495f264dcb8461bb0c3097d0a193a872bce65
Reviewed-on: http://review.couchbase.org/59759
Reviewed-by: Gerald Sangudi <gerald@couchbase.com>
Tested-by: Sitaram Vemulapalli <sitaram.vemulapalli@couchbase.com>
  • Loading branch information
sitaramv committed Feb 11, 2016
1 parent 4402746 commit 3ea76d4
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 52 deletions.
14 changes: 7 additions & 7 deletions client.go
Expand Up @@ -29,14 +29,14 @@ import (
"errors"
"fmt"
"io"
"log"
"runtime"
"strings"
"sync"
"time"

"github.com/couchbase/gomemcached"
"github.com/couchbase/gomemcached/client" // package name is 'memcached'
"github.com/couchbase/goutils/logging"
)

// Mutation Token
Expand All @@ -57,7 +57,7 @@ func slowLog(startTime time.Time, format string, args ...interface{}) {
if elapsed := time.Now().Sub(startTime); elapsed > SlowServerCallWarningThreshold {
pc, _, _, _ := runtime.Caller(2)
caller := runtime.FuncForPC(pc).Name()
log.Printf("go-couchbase: "+format+" in "+caller+" took "+elapsed.String(), args...)
logging.Infof("go-couchbase: "+format+" in "+caller+" took "+elapsed.String(), args...)
}
}

Expand Down Expand Up @@ -213,7 +213,7 @@ func (b *Bucket) doBulkGet(vb uint16, keys []string,

if len(b.VBServerMap().VBucketMap) < int(vb) {
//fatal
log.Printf("go-couchbase: vbmap smaller than requested vbucket number. vb %d vbmap len %d", vb, len(b.VBServerMap().VBucketMap))
logging.Errorf("go-couchbase: vbmap smaller than requested vbucket number. vb %d vbmap len %d", vb, len(b.VBServerMap().VBucketMap))
err := fmt.Errorf("vbmap smaller than requested vbucket")
ech <- err
return
Expand All @@ -224,7 +224,7 @@ func (b *Bucket) doBulkGet(vb uint16, keys []string,

if masterID < 0 {
// fatal
log.Printf("No master node available for vb %d", vb)
logging.Errorf("No master node available for vb %d", vb)
err := fmt.Errorf("No master node available for vb %d", vb)
ech <- err
return
Expand All @@ -237,14 +237,14 @@ func (b *Bucket) doBulkGet(vb uint16, keys []string,
conn, err := pool.Get()
if err != nil {
if isAuthError(err) {
log.Printf(" Fatal Auth Error %v", err)
logging.Errorf(" Fatal Auth Error %v", err)
ech <- err
return err
} else if isConnError(err) {
// for a connection error, refresh right away
b.Refresh()
}
log.Printf("Pool Get returned %v", err)
logging.Infof("Pool Get returned %v", err)
// retry
return nil
}
Expand All @@ -271,7 +271,7 @@ func (b *Bucket) doBulkGet(vb uint16, keys []string,
return nil
}

log.Printf("Connection Error: %s. Refreshing bucket", err.Error())
logging.Errorf("Connection Error: %s. Refreshing bucket", err.Error())
b.Refresh()
// retry
return nil
Expand Down
4 changes: 2 additions & 2 deletions conn_pool.go
Expand Up @@ -2,7 +2,7 @@ package couchbase

import (
"errors"
"log"
"github.com/couchbase/goutils/logging"
"time"

"github.com/couchbase/gomemcached"
Expand Down Expand Up @@ -65,7 +65,7 @@ func defaultMkConn(host string, ah AuthHandler) (*memcached.Client, error) {
if EnableMutationToken == true {
res, err := conn.EnableMutationToken()
if err != nil || res.Status != gomemcached.SUCCESS {
log.Printf("Warning: Unable to enable mutation token %v", err)
logging.Warnf("Unable to enable mutation token %v", err)
}
}

Expand Down
14 changes: 7 additions & 7 deletions ddocs.go
Expand Up @@ -4,8 +4,8 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/couchbase/goutils/logging"
"io/ioutil"
"log"
"net/http"
)

Expand Down Expand Up @@ -124,7 +124,7 @@ func (b *Bucket) PutDDoc(docname string, value interface{}) error {

lastNode = selectedNode

log.Printf(" Trying with selected node %d", selectedNode)
logging.Infof(" Trying with selected node %d", selectedNode)
j, err := json.Marshal(value)
if err != nil {
return err
Expand All @@ -149,7 +149,7 @@ func (b *Bucket) PutDDoc(docname string, value interface{}) error {
body, _ := ioutil.ReadAll(res.Body)
Err = fmt.Errorf("error installing view: %v / %s",
res.Status, body)
log.Printf(" Error in PutDDOC %v. Retrying...", Err)
logging.Errorf(" Error in PutDDOC %v. Retrying...", Err)
res.Body.Close()
b.Refresh()
continue
Expand Down Expand Up @@ -182,7 +182,7 @@ func (b *Bucket) GetDDoc(docname string, into interface{}) error {
}

lastNode = selectedNode
log.Printf(" Trying with selected node %d", selectedNode)
logging.Infof(" Trying with selected node %d", selectedNode)

req, err := http.NewRequest("GET", ddocU, nil)
if err != nil {
Expand All @@ -202,7 +202,7 @@ func (b *Bucket) GetDDoc(docname string, into interface{}) error {
body, _ := ioutil.ReadAll(res.Body)
Err = fmt.Errorf("error reading view: %v / %s",
res.Status, body)
log.Printf(" Error in GetDDOC %v Retrying...", Err)
logging.Errorf(" Error in GetDDOC %v Retrying...", Err)
b.Refresh()
res.Body.Close()
continue
Expand Down Expand Up @@ -240,7 +240,7 @@ func (b *Bucket) DeleteDDoc(docname string) error {
}

lastNode = selectedNode
log.Printf(" Trying with selected node %d", selectedNode)
logging.Infof(" Trying with selected node %d", selectedNode)

req, err := http.NewRequest("DELETE", ddocU, nil)
if err != nil {
Expand All @@ -259,7 +259,7 @@ func (b *Bucket) DeleteDDoc(docname string) error {
if res.StatusCode != 200 {
body, _ := ioutil.ReadAll(res.Body)
Err = fmt.Errorf("error deleting view : %v / %s", res.Status, body)
log.Printf(" Error in DeleteDDOC %v. Retrying ... ", Err)
logging.Errorf(" Error in DeleteDDOC %v. Retrying ... ", Err)
b.Refresh()
res.Body.Close()
continue
Expand Down
10 changes: 5 additions & 5 deletions observe.go
Expand Up @@ -2,7 +2,7 @@ package couchbase

import (
"fmt"
"log"
"github.com/couchbase/goutils/logging"
"sync"
)

Expand Down Expand Up @@ -102,7 +102,7 @@ func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (
return fmt.Errorf("Not enough healthy nodes in the cluster"), false
}

log.Printf("Node list %v", nodes)
logging.Infof("Node list %v", nodes)

if b.ds.Observe >= ObserveReplicateOne {
// create a job for each host
Expand Down Expand Up @@ -178,7 +178,7 @@ func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (
}

case Err := <-errChan:
log.Printf("Error in Observe/Persist %v", Err.err)
logging.Errorf("Error in Observe/Persist %v", Err.err)
err = fmt.Errorf("Error in Observe/Persist job %v", Err.err)
nj--
ObservePersistPool.Put(Err.job)
Expand Down Expand Up @@ -232,7 +232,7 @@ func (b *Bucket) OPJobPoll() {

job.resultChan <- job
case <-OPJobDone:
log.Printf("Observe Persist Poller exitting")
logging.Infof("Observe Persist Poller exitting")
ok = false
}
}
Expand All @@ -243,7 +243,7 @@ func (b *Bucket) GetNodeList(vb uint16) []string {

vbm := b.VBServerMap()
if len(vbm.VBucketMap) < int(vb) {
log.Printf("vbmap smaller than vblist")
logging.Infof("vbmap smaller than vblist")
return nil
}

Expand Down
16 changes: 8 additions & 8 deletions pools.go
Expand Up @@ -7,9 +7,9 @@ import (
"errors"
"fmt"
atomic "github.com/couchbase/go-couchbase/platform"
"github.com/couchbase/goutils/logging"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"net/url"
Expand Down Expand Up @@ -203,11 +203,11 @@ type NodeServices struct {
}

type BucketNotFoundError struct {
bucket string
bucket string
}

func (e *BucketNotFoundError) Error() string {
return fmt.Sprint("No bucket named " + e.bucket)
return fmt.Sprint("No bucket named " + e.bucket)
}

type BucketAuth struct {
Expand Down Expand Up @@ -263,8 +263,8 @@ func (b Bucket) HealthyNodes() []Node {
nodes = append(nodes, n)
}
if n.Status != "healthy" { // log non-healthy node
log.Printf("Non-healthy node; node details:")
log.Printf("Hostname=%v, Status=%v, CouchAPIBase=%v, ThisNode=%v", n.Hostname, n.Status, n.CouchAPIBase, n.ThisNode)
logging.Infof("Non-healthy node; node details:")
logging.Infof("Hostname=%v, Status=%v, CouchAPIBase=%v, ThisNode=%v", n.Hostname, n.Status, n.CouchAPIBase, n.ThisNode)
}
}

Expand Down Expand Up @@ -832,15 +832,15 @@ func (b *Bucket) Close() {

func bucketFinalizer(b *Bucket) {
if b.connPools != nil {
log.Printf("Warning: Finalizing a bucket with active connections.")
logging.Warnf("Finalizing a bucket with active connections.")
}
}

// GetBucket gets a bucket from within this pool.
func (p *Pool) GetBucket(name string) (*Bucket, error) {
rv, ok := p.BucketMap[name]
if !ok {
return nil, &BucketNotFoundError{bucket:name}
return nil, &BucketNotFoundError{bucket: name}
}
runtime.SetFinalizer(&rv, bucketFinalizer)
err := rv.Refresh()
Expand All @@ -854,7 +854,7 @@ func (p *Pool) GetBucket(name string) (*Bucket, error) {
func (p *Pool) GetBucketWithAuth(bucket, username, password string) (*Bucket, error) {
rv, ok := p.BucketMap[bucket]
if !ok {
return nil, &BucketNotFoundError{bucket:bucket}
return nil, &BucketNotFoundError{bucket: bucket}
}
runtime.SetFinalizer(&rv, bucketFinalizer)
rv.ah = newBucketAuth(username, password, bucket)
Expand Down
14 changes: 7 additions & 7 deletions streaming.go
Expand Up @@ -3,9 +3,9 @@ package couchbase
import (
"encoding/json"
"fmt"
"github.com/couchbase/goutils/logging"
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
Expand Down Expand Up @@ -63,7 +63,7 @@ func (b *Bucket) RunBucketUpdater(notify NotifyFn) {
if notify != nil {
notify(b.Name, err)
}
log.Printf(" Bucket Updater exited with err %v", err)
logging.Errorf(" Bucket Updater exited with err %v", err)
}
}()
}
Expand Down Expand Up @@ -93,7 +93,7 @@ func (b *Bucket) UpdateBucket() error {
for {

if failures == MAX_RETRY_COUNT {
log.Printf(" Maximum failures reached. Exiting loop...")
logging.Errorf(" Maximum failures reached. Exiting loop...")
return fmt.Errorf("Max failures reached. Last Error %v", returnErr)
}

Expand All @@ -106,7 +106,7 @@ func (b *Bucket) UpdateBucket() error {
node := nodes[(startNode)%len(nodes)]

streamUrl := fmt.Sprintf("http://%s/pools/default/bucketsStreaming/%s", node.Hostname, b.Name)
log.Printf(" Trying with %s", streamUrl)
logging.Infof(" Trying with %s", streamUrl)
req, err := http.NewRequest("GET", streamUrl, nil)
if err != nil {
return err
Expand All @@ -124,7 +124,7 @@ func (b *Bucket) UpdateBucket() error {

if res.StatusCode != 200 {
bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
log.Printf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod)
logging.Errorf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod)
res.Body.Close()
returnErr = fmt.Errorf("Failed to connect to host. Status %v Body %s", res.StatusCode, bod)
failures++
Expand All @@ -138,7 +138,7 @@ func (b *Bucket) UpdateBucket() error {

err := dec.Decode(&tmpb)
if err != nil {
log.Printf(" Unable to decode response %v", err)
logging.Errorf(" Unable to decode response %v", err)
returnErr = err
res.Body.Close()
break
Expand Down Expand Up @@ -186,7 +186,7 @@ func (b *Bucket) UpdateBucket() error {
atomic.StorePointer(&b.nodeList, unsafe.Pointer(&tmpb.NodesJSON))
b.Unlock()

log.Printf("Got new configuration for bucket %s", b.Name)
logging.Infof("Got new configuration for bucket %s", b.Name)

}
// we are here because of an error
Expand Down
8 changes: 4 additions & 4 deletions tap.go
@@ -1,7 +1,7 @@
package couchbase

import (
"log"
"github.com/couchbase/goutils/logging"
"time"

"github.com/couchbase/gomemcached/client"
Expand Down Expand Up @@ -66,7 +66,7 @@ func (feed *TapFeed) run() {
}

// On error, try to refresh the bucket in case the list of nodes changed:
log.Printf("go-couchbase: TAP connection lost; reconnecting to bucket %q in %v",
logging.Infof("go-couchbase: TAP connection lost; reconnecting to bucket %q in %v",
feed.bucket.Name, retryInterval)
err := feed.bucket.Refresh()
bucketOK = err == nil
Expand All @@ -88,7 +88,7 @@ func (feed *TapFeed) connectToNodes() (killSwitch chan bool, err error) {
var singleFeed *memcached.TapFeed
singleFeed, err = serverConn.StartTapFeed(feed.args)
if err != nil {
log.Printf("go-couchbase: Error connecting to tap feed of %s: %v", serverConn.host, err)
logging.Errorf("go-couchbase: Error connecting to tap feed of %s: %v", serverConn.host, err)
feed.closeNodeFeeds()
return
}
Expand All @@ -105,7 +105,7 @@ func (feed *TapFeed) forwardTapEvents(singleFeed *memcached.TapFeed, killSwitch
case event, ok := <-singleFeed.C:
if !ok {
if singleFeed.Error != nil {
log.Printf("go-couchbase: Tap feed from %s failed: %v", host, singleFeed.Error)
logging.Errorf("go-couchbase: Tap feed from %s failed: %v", host, singleFeed.Error)
}
killSwitch <- true
return
Expand Down

0 comments on commit 3ea76d4

Please sign in to comment.