Skip to content

Commit

Permalink
Add to GH-47 test program
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Feb 8, 2016
1 parent f1286f2 commit a573bbb
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
4 changes: 2 additions & 2 deletions async.go
Expand Up @@ -52,11 +52,11 @@ func (a *Async) done(err error) {
a.Error = err
}
if a.Done != nil {
logDebug("[Cluster]", "signaling a.Done channel with '%s'", a.Command.Name())
logDebug("[Async]", "signaling a.Done channel with '%s'", a.Command.Name())
a.Done <- a.Command
}
if a.Wait != nil {
logDebug("[Cluster]", "signaling a.Wait WaitGroup for '%s'", a.Command.Name())
logDebug("[Async]", "signaling a.Wait WaitGroup for '%s'", a.Command.Name())
a.Wait.Done()
}
}
52 changes: 48 additions & 4 deletions examples/gh-47/main.go
Expand Up @@ -47,7 +47,7 @@ func ErrExit(err error) {
}

func init() {
c := 1024
c := 524288
b := make([]byte, c)
_, err := rand.Read(b)
if err != nil {
Expand All @@ -57,7 +57,7 @@ func init() {
}

func keepAlive(c *riak.Cluster, sc chan struct{}) {
tck := time.NewTicker(time.Second * 5)
tck := time.NewTicker(time.Second * 1)
dc := make(chan riak.Command, minConnections)

defer func() {
Expand Down Expand Up @@ -103,7 +103,7 @@ func keepAlive(c *riak.Cluster, sc chan struct{}) {
}

func storeData(c *riak.Cluster, sc chan struct{}) {
tck := time.NewTicker(time.Millisecond * 500)
tck := time.NewTicker(time.Millisecond * 125)
dc := make(chan riak.Command, minConnections)

defer func() {
Expand Down Expand Up @@ -155,6 +155,50 @@ func storeData(c *riak.Cluster, sc chan struct{}) {
}
}

func listKeys(c *riak.Cluster, sc chan struct{}) {
tck := time.NewTicker(time.Millisecond * 500)
dc := make(chan riak.Command, minConnections)

defer func() {
tck.Stop()
close(dc)
}()

LogDebug("[GH-47/ListKeys]", "Starting worker process")
defer LogDebug("[GH-47/ListKeys]", "Stopped worker process")

for !stopping {
select {
case <-sc:
LogDebug("[GH-47/ListKeys]", "Stopping worker process")
stopping = true
break
case cmd := <-dc:
lk := cmd.(*riak.ListKeysCommand)
kc := 0
if lk.Response != nil {
kc = len(lk.Response.Keys)
}
LogDebug("[GH-47/ListKeys]", "%v completed, keys: %d", cmd.Name(), kc)
case <-tck.C:
svc, err := riak.NewListKeysCommandBuilder().
WithBucket("gh-47").
WithStreaming(false).
Build()
if err != nil {
ErrExit(err)
}
a := &riak.Async{
Command: svc,
Done: dc,
}
if err := c.ExecuteAsync(a); err != nil {
LogErr("[GH-47/ListKeys]", err)
}
}
}
}

func main() {
riak.EnableDebugLogging = true

Expand Down Expand Up @@ -207,7 +251,7 @@ func main() {

go keepAlive(c, sc)
go storeData(c, sc)
// TODO go fetchData(c, sc)
go listKeys(c, sc)

defer func() {
stopping = true
Expand Down

0 comments on commit a573bbb

Please sign in to comment.