Permalink
Browse files

Fix concurrency issues

Some commands, including the common delete, were circumventing synchronization.
  • Loading branch information...
Evan Shaw
Evan Shaw committed Nov 18, 2011
1 parent f8c9d23 commit ba78f8a05463addf4c16ef8f5dc83eb5297b42c0
Showing with 91 additions and 11 deletions.
  1. +21 −11 beanstalk.go
  2. +70 −0 beanstalk_test.go
View
@@ -166,6 +166,16 @@ func (c *Conn) Close() os.Error {
return c.textConn.Close()
}
+// anyCmd sends a command that doesn't depend on tube watched or tube used.
+func (c *Conn) anyCmd(response, format string, a ...interface{}) *result {
+ id := c.textConn.Next()
+ c.textConn.StartRequest(id)
+ defer c.textConn.EndRequest(id)
+ c.textConn.StartResponse(id)
+ defer c.textConn.EndResponse(id)
+ return c.cmd(response, format, a...)
+}
+
func (c *Conn) consumerCmd(watching map[string]bool, response, format string, a ...interface{}) *result {
id := c.textConn.Next()
c.textConn.StartRequest(id)
@@ -411,19 +421,19 @@ func (r result) checkForList(c *Conn) ([]string, os.Error) {
// Get a copy of the specified job.
func (c *Conn) Peek(id uint64) (*Job, os.Error) {
- return c.cmd("FOUND", "peek %d", id).checkForJob(c)
+ return c.anyCmd("FOUND", "peek %d", id).checkForJob(c)
}
func (c *Conn) Stats() (map[string]string, os.Error) {
- return c.cmd("OK", "stats").checkForDict(c)
+ return c.anyCmd("OK", "stats").checkForDict(c)
}
func (c *Conn) ListTubes() ([]string, os.Error) {
- return c.cmd("OK", "list-tubes").checkForList(c)
+ return c.anyCmd("OK", "list-tubes").checkForList(c)
}
func (c *Conn) ListTubesWatched() ([]string, os.Error) {
- return c.cmd("OK", "list-tubes-watched").checkForList(c)
+ return c.anyCmd("OK", "list-tubes-watched").checkForList(c)
}
var nameChars = []byte("-ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789\\+/;.$_()")
@@ -502,7 +512,7 @@ func (t Tube) PeekBuried() (*Job, os.Error) {
func (t Tube) Stats() (map[string]string, os.Error) {
// Note: do not use t.cmd -- this doesn't depend on the "currently
// used" tube.
- return t.c.cmd("OK", "stats-tube %s", t.Name).checkForDict(t.c)
+ return t.c.anyCmd("OK", "stats-tube %s", t.Name).checkForDict(t.c)
}
// Kick up to n jobs in tube t.
@@ -514,32 +524,32 @@ func (t Tube) Kick(n uint64) (uint64, os.Error) {
func (t Tube) Pause(µs uint64) os.Error {
// Note: do not use t.cmd -- this doesn't depend on the "currently
// used" tube.
- r := t.c.cmd("PAUSED", "pause-tube %s %d", t.Name, µs)
+ r := t.c.anyCmd("PAUSED", "pause-tube %s %d", t.Name, µs)
return r.checkForWord(t.c)
}
// Delete job j.
func (j Job) Delete() os.Error {
- return j.c.cmd("DELETED", "delete %d", j.Id).checkForWord(j.c)
+ return j.c.anyCmd("DELETED", "delete %d", j.Id).checkForWord(j.c)
}
// Touch job j.
func (j Job) Touch() os.Error {
- return j.c.cmd("TOUCHED", "touch %d", j.Id).checkForWord(j.c)
+ return j.c.anyCmd("TOUCHED", "touch %d", j.Id).checkForWord(j.c)
}
// Bury job j and change its priority to pri.
func (j Job) Bury(pri uint32) os.Error {
- return j.c.cmd("BURIED", "bury %d %d", j.Id, pri).checkForWord(j.c)
+ return j.c.anyCmd("BURIED", "bury %d %d", j.Id, pri).checkForWord(j.c)
}
// Release job j, changing its priority to pri and its delay to delay.
func (j Job) Release(pri uint32, delay uint64) os.Error {
- r := j.c.cmd("RELEASED", "release %d %d %d", j.Id, pri, delay)
+ r := j.c.anyCmd("RELEASED", "release %d %d %d", j.Id, pri, delay)
return r.checkForWord(j.c)
}
// Get statistics on job j.
func (j Job) Stats() (map[string]string, os.Error) {
- return j.c.cmd("OK", "stats-job %d", j.Id).checkForDict(j.c)
+ return j.c.anyCmd("OK", "stats-job %d", j.Id).checkForDict(j.c)
}
View
@@ -356,6 +356,76 @@ func TestCommands(t *testing.T) {
}
}
+// These tests should be run with gotest's test.cpu flag > 1. Otherwise they're
+// not testing everything.
+const goroutines = 100
+
+func TestConcurrentPut(t *testing.T) {
+ done := make(chan bool)
+ reply := strings.Repeat("INSERTED 1\r\n", goroutines)
+ rw, buf := responder(reply)
+ c := newConn("<fake>", rw, nil)
+
+ for i := 0; i < goroutines; i++ {
+ go func() {
+ id, err := c.Put("body", 0, 0, 0)
+ if err != nil {
+ t.Errorf("received error: %v", err)
+ }
+ if id != 1 {
+ t.Errorf("expected ID 1, got: %v", id)
+ }
+ done <- true
+ }()
+ }
+ for i := 0; i < goroutines; i++ {
+ <-done
+ }
+ expected := strings.Repeat("put 0 0 0 4\r\nbody\r\n", goroutines)
+ if buf.String() != expected {
+ t.Errorf("expected commands:\n%q\ngot:\n%q", expected, buf.String())
+ }
+}
+
+func TestConcurrentReserve(t *testing.T) {
+ done := make(chan bool)
+ reply := strings.Repeat("RESERVED 1 4\r\nbody\r\nDELETED\r\n", goroutines)
+ rw, buf := responder(reply)
+ c := newConn("<fake>", rw, nil)
+
+ for i := 0; i < goroutines; i++ {
+ go func() {
+ job, err := c.Reserve()
+ if err != nil {
+ t.Errorf("error on reserve: %v", err)
+ }
+ if job == nil {
+ t.Errorf("job is nil")
+ done <- true
+ return
+ }
+ if job.Id != 1 {
+ t.Errorf("expected ID 1, got: %v", job.Id)
+ }
+ if job.Body != "body" {
+ t.Errorf("expected body %q, got %q", "body", job.Body)
+ }
+ err = job.Delete()
+ if err != nil {
+ t.Errorf("error on delete: %v", err)
+ }
+ done <- true
+ }()
+ }
+ for i := 0; i < goroutines; i++ {
+ <-done
+ }
+ expected := strings.Repeat("reserve-with-timeout 4000000000000000\r\ndelete 1\r\n", goroutines)
+ if buf.String() != expected {
+ t.Errorf("expected commands:\n%q\ngot:\n%q", expected, buf.String())
+ }
+}
+
// tests which should result in errors
var errorTests = []struct {
name string

0 comments on commit ba78f8a

Please sign in to comment.