Skip to content

Commit

Permalink
Fixed example scripts.
Browse files Browse the repository at this point in the history
  • Loading branch information
prataprc committed Nov 24, 2014
1 parent 2d9d584 commit 8205990
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 54 deletions.
47 changes: 11 additions & 36 deletions examples/upr_bench/bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ var options struct {
}

var done = make(chan bool, 16)
var rch = make(chan []interface{}, 10000)

func argParse() string {
var buckets string
Expand Down Expand Up @@ -76,14 +75,14 @@ func usage() {

func main() {
cluster := argParse()
ch := make(chan *couchbase.UprFeed, 10)
rch := make(chan []interface{}, 10000)
for _, bucket := range options.buckets {
go startBucket(cluster, bucket, ch)
go startBucket(cluster, bucket, rch)
}
receive(ch)
receive(rch)
}

func startBucket(cluster, bucketn string, ch chan *couchbase.UprFeed) int {
func startBucket(cluster, bucketn string, rch chan []interface{}) int {
defer func() {
if r := recover(); r != nil {
fmt.Printf("%s:\n%s\n", r, debug.Stack())
Expand All @@ -99,16 +98,11 @@ func startBucket(cluster, bucketn string, ch chan *couchbase.UprFeed) int {
mf(err, "- upr")

vbnos := listOfVbnos(options.maxVbno)

flogs, err := b.GetFailoverLogs(vbnos)
mf(err, "- upr failoverlogs")

if options.printflogs {
printFlogs(vbnos, flogs)
}

ch <- uprFeed

go startUpr(uprFeed, flogs)

for {
Expand All @@ -132,23 +126,7 @@ func startUpr(uprFeed *couchbase.UprFeed, flogs couchbase.FailoverLog) {
}
}

func endUpr(uprFeed *couchbase.UprFeed, vbnos []uint16) error {
for _, vbno := range vbnos {
if err := uprFeed.UprCloseStream(vbno, uint16(0)); err != nil {
mf(err, "- UprCloseStream()")
return err
}
}
return nil
}

func mf(err error, msg string) {
if err != nil {
log.Fatalf("%v: %v", msg, err)
}
}

func receive(ch chan *couchbase.UprFeed) {
func receive(rch chan []interface{}) {
// bucket -> Opcode -> #count
counts := make(map[string]map[mcd.CommandCode]int)

Expand All @@ -158,13 +136,9 @@ func receive(ch chan *couchbase.UprFeed) {
}

finTimeout := time.After(time.Millisecond * time.Duration(options.duration))
uprFeeds := make([]*couchbase.UprFeed, 0)
loop:
for {
select {
case uprFeed := <-ch:
uprFeeds = append(uprFeeds, uprFeed)

case msg, ok := <-rch:
if ok == false {
break loop
Expand All @@ -189,14 +163,9 @@ loop:
common.Infof("\n")

case <-finTimeout:
for _, uprFeed := range uprFeeds {
endUpr(uprFeed, listOfVbnos(options.maxVbno))
}
break loop
}
}
fmt.Println("sleep wait ....")
time.Sleep(10000 * time.Millisecond)
}

func sprintCounts(counts map[mcd.CommandCode]int) string {
Expand Down Expand Up @@ -226,3 +195,9 @@ func printFlogs(vbnos []uint16, flogs couchbase.FailoverLog) {
}
common.Infof("\n")
}

func mf(err error, msg string) {
if err != nil {
log.Fatalf("%v: %v", msg, err)
}
}
71 changes: 53 additions & 18 deletions examples/upr_feed/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net/url"
"os"
"runtime/pprof"
"strconv"
"strings"
"time"

mcd "github.com/couchbase/gomemcached"
Expand All @@ -25,17 +27,22 @@ var options struct {
maxVb int
tick int
debug bool
closeVbs []string
cpuprofile string
memprofile string
}

func argParse() {
var closeVbs string

flag.StringVar(&options.bucket, "bucket", "default",
"bucket to connect to (defaults to username)")
"bucket to connect to")
flag.IntVar(&options.maxVb, "maxvb", 1024,
"number configured vbuckets")
flag.IntVar(&options.tick, "tick", 1000,
"timer tick in mS to log information")
flag.StringVar(&closeVbs, "close", "",
"comma separated list of vbucket numbers")
flag.BoolVar(&options.debug, "debug", false,
"number configured vbuckets")
flag.StringVar(&options.cpuprofile, "cpuprofile", "",
Expand All @@ -51,6 +58,7 @@ func argParse() {
} else {
options.clusterAddr = args[0]
}
options.closeVbs = strings.Split(closeVbs, ",")
}

func usage() {
Expand Down Expand Up @@ -100,42 +108,80 @@ func main() {
return
}

opaque := uint16(10)
// request stream for all vbuckets
for i := 0; i < options.maxVb; i++ {
err := feed.UprRequestStream(
uint16(i) /*vbno*/, uint16(0) /*opaque*/, 0 /*flag*/, 0, /*vbuuid*/
uint16(i) /*vbno*/, opaque, 0 /*flag*/, 0, /*vbuuid*/
0 /*seqStart*/, 0xFFFFFFFFFFFFFFFF /*seqEnd*/, 0 /*snaps*/, 0)
if err != nil {
fmt.Printf("%s", err.Error())
}
}

// observe the mutations from the channel.
events(feed, 2000)

opaque += 1
if len(options.closeVbs) > 0 {
for _, vb := range options.closeVbs {
if len(vb) > 0 {
vbno, err := strconv.Atoi(vb)
if err != nil {
log.Fatal(err)
}
if err := feed.UprCloseStream(uint16(vbno), opaque); err != nil {
log.Printf("error while closing stream %d: %v", vbno, err)
}
}
}
}

events(feed, 100000)

feed.Close()
}

func events(feed *couchbase.UprFeed, timeoutMs int) {
var timeout <-chan time.Time

mutations := 0
done := true
tick := time.Tick(time.Duration(options.tick) * time.Millisecond)
var mutations = 0
if timeoutMs > 0 {
timeout = time.Tick(time.Duration(timeoutMs) * time.Millisecond)
}

loop:
for {
select {
case e := <-feed.C:
if e.Opcode == mcd.UPR_MUTATION {
mutations += 1
} else {
log.Printf("Received {%s, %d(vb), %d(opq), %s}\n",
e.Opcode, e.VBucket, e.Opaque, e.Status)
}
handleEvent(e)
done = false

case <-tick:
log.Printf("Mutation count %d", mutations)
if timeout == nil && done {
break loop
}
done = true

case <-timeout:
break loop
}
}
feed.Close()
}

func handleEvent(e *mc.UprEvent) {
if e.Opcode == mcd.UPR_MUTATION && options.debug {
log.Printf("got mutation %s", e.Value)
}
if e.Opcode == mcd.UPR_STREAMEND {
log.Printf("received Stream end for vbucket %d", e.VBucket)
}
}

func addKVset(b *couchbase.Bucket, count int) {
Expand Down Expand Up @@ -178,14 +224,3 @@ func mf(err error, msg string) {
log.Fatalf("%v: %v", msg, err)
}
}

// after receving 1000 mutations close some streams
//if callOnce == false {
// for i := 0; i < options.maxVb; i = i + 4 {
// log.Printf("closing stream for vbucket %d", i)
// if err := feed.UprCloseStream(uint16(i), uint16(0)); err != nil {
// log.Printf("received error while closing stream %d", i)
// }
// }
// callOnce = true
//}

0 comments on commit 8205990

Please sign in to comment.