Permalink
Browse files

`cmdstalk -all` watches newly created tubes every 10 seconds.

  • Loading branch information...
pda committed May 7, 2014
1 parent 5c8d328 commit 431ac5fc3a34b32384e05fbab03a6de82ab7f572
Showing with 51 additions and 18 deletions.
  1. +5 −2 README.md
  2. +46 −3 broker/broker_dispatcher.go
  3. +0 −13 bs/bs.go
View
@@ -45,7 +45,11 @@ cmdstalk -help
# -cmd="": Command to run in worker.
# -tubes=[default]: Comma separated list of tubes.
cmdstalk -cmd="/path/to/your/worker --your=flags --here" --tubes="your-tube"
# Watch three specific tubes.
cmdstalk -cmd="/path/to/your/worker --your=flags --here" -tubes="one,two,three"
# Watch all current and future tubes.
cmdstalk -all -cmd="cat"
```
@@ -79,7 +83,6 @@ TODO
* Retry back-off delay.
* Configurable concurrency per tube.
* Ship linux binary; GitHub releases?
* Poll for new tubes created after launch.
[beanstalkd]: http://kr.github.io/beanstalkd/
@@ -1,23 +1,35 @@
package broker
import (
"github.com/99designs/cmdstalk/bs"
"time"
"github.com/kr/beanstalk"
)
const (
// ListTubeDelay is the time between sending list-tube to beanstalkd
// to discover and watch newly created tubes.
ListTubeDelay = 10 * time.Second
)
type BrokerDispatcher struct {
address string
cmd string
conn *beanstalk.Conn
tubeSet map[string]bool
}
func NewBrokerDispatcher(address, cmd string) *BrokerDispatcher {
return &BrokerDispatcher{
address: address,
cmd: cmd,
tubeSet: make(map[string]bool),
}
}
// RunTube runs a broker for the specified tube.
func (bd *BrokerDispatcher) RunTube(tube string) {
bd.tubeSet[tube] = true
go func() {
b := New(bd.address, tube, bd.cmd, nil)
b.Run(nil)
@@ -32,6 +44,37 @@ func (bd *BrokerDispatcher) RunTubes(tubes []string) {
}
// RunAllTubes polls beanstalkd, running a broker as new tubes are created.
func (bd *BrokerDispatcher) RunAllTubes() {
bd.RunTubes(bs.MustConnectAndListTubes(bd.address))
func (bd *BrokerDispatcher) RunAllTubes() (err error) {
conn, err := beanstalk.Dial("tcp", bd.address)
if err == nil {
bd.conn = conn
} else {
return
}
go func() {
ticker := time.Tick(ListTubeDelay)
for _ = range ticker {
if e := bd.watchNewTubes(); e != nil {
// ignore error
}
}
}()
return
}
func (bd *BrokerDispatcher) watchNewTubes() (err error) {
tubes, err := bd.conn.ListTubes()
if err != nil {
return
}
for _, tube := range tubes {
if !bd.tubeSet[tube] {
bd.RunTube(tube)
}
}
return
}
View
@@ -16,19 +16,6 @@ const (
DeadlineSoonDelay = 1 * time.Second
)
// Connect to beanstalkd and list tubes. Panic on error.
func MustConnectAndListTubes(address string) []string {
conn, err := beanstalk.Dial("tcp", address)
if err != nil {
panic(err)
}
tubes, err := conn.ListTubes()
if err != nil {
panic(err)
}
return tubes
}
// reserve-with-timeout until there's a job or something panic-worthy.
// Handles beanstalk.ErrTimeout by retrying immediately.
// Handles beanstalk.ErrDeadline by sleeping DeadlineSoonDelay before retry.

0 comments on commit 431ac5f

Please sign in to comment.