Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SetSettings for settings not from flags #38

Merged
merged 3 commits into from
Jul 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,43 @@ func main() {
}
```

Here is a simple worker with settings:

```go
package main

import (
"fmt"
"github.com/benmanns/goworker"
)

func myFunc(queue string, args ...interface{}) error {
fmt.Printf("From %s, %v\n", queue, args)
return nil
}

func init() {
settings := goworker.WorkerSettings{
Uri: "redis://localhost:6379/",
Connections: 100,
Queues: []string{"myqueue", "delimited", "queues"},
UseNumber: true,
ExitOnComplete: false,
Concurrency: 2,
Namespace: "resque:",
Interval: 5.0,
}
goworker.SetSettings(workerSettings)
goworker.Register("MyClass", myFunc)
}

func main() {
if err := goworker.Work(); err != nil {
fmt.Println("Error:", err)
}
}
```

goworker worker functions receive the queue they are serving and a slice of interfaces. To use them as parameters to other functions, use Go type assertions to convert them into usable types.

```go
Expand Down
40 changes: 13 additions & 27 deletions flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,35 +91,21 @@ import (
"strings"
)

var (
queuesString string
queues queuesFlag
intervalFloat float64
interval intervalFlag
concurrency int
connections int
uri string
namespace string
exitOnComplete bool
isStrict bool
useNumber bool
)

// Namespace returns the namespace flag for goworker. You
// can use this with the GetConn and PutConn functions to
// operate on the same namespace that goworker uses.
func Namespace() string {
return namespace
return workerSettings.Namespace
}

func init() {
flag.StringVar(&queuesString, "queues", "", "a comma-separated list of Resque queues")
flag.StringVar(&workerSettings.QueuesString, "queues", "", "a comma-separated list of Resque queues")

flag.Float64Var(&intervalFloat, "interval", 5.0, "sleep interval when no jobs are found")
flag.Float64Var(&workerSettings.IntervalFloat, "interval", 5.0, "sleep interval when no jobs are found")

flag.IntVar(&concurrency, "concurrency", 25, "the maximum number of concurrently executing jobs")
flag.IntVar(&workerSettings.Concurrency, "concurrency", 25, "the maximum number of concurrently executing jobs")

flag.IntVar(&connections, "connections", 2, "the maximum number of connections to the Redis database")
flag.IntVar(&workerSettings.Connections, "connections", 2, "the maximum number of connections to the Redis database")

redisProvider := os.Getenv("REDIS_PROVIDER")
var redisEnvUri string
Expand All @@ -131,28 +117,28 @@ func init() {
if redisEnvUri == "" {
redisEnvUri = "redis://localhost:6379/"
}
flag.StringVar(&uri, "uri", redisEnvUri, "the URI of the Redis server")
flag.StringVar(&workerSettings.Uri, "uri", redisEnvUri, "the URI of the Redis server")

flag.StringVar(&namespace, "namespace", "resque:", "the Redis namespace")
flag.StringVar(&workerSettings.Namespace, "namespace", "resque:", "the Redis namespace")

flag.BoolVar(&exitOnComplete, "exit-on-complete", false, "exit when the queue is empty")
flag.BoolVar(&workerSettings.ExitOnComplete, "exit-on-complete", false, "exit when the queue is empty")

flag.BoolVar(&useNumber, "use-number", false, "use json.Number instead of float64 when decoding numbers in JSON. will default to true soon")
flag.BoolVar(&workerSettings.UseNumber, "use-number", false, "use json.Number instead of float64 when decoding numbers in JSON. will default to true soon")
}

func flags() error {
if !flag.Parsed() {
flag.Parse()
}
if err := queues.Set(queuesString); err != nil {
if err := workerSettings.Queues.Set(workerSettings.QueuesString); err != nil {
return err
}
if err := interval.SetFloat(intervalFloat); err != nil {
if err := workerSettings.Interval.SetFloat(workerSettings.IntervalFloat); err != nil {
return err
}
isStrict = strings.IndexRune(queuesString, '=') == -1
workerSettings.IsStrict = strings.IndexRune(workerSettings.QueuesString, '=') == -1

if !useNumber {
if !workerSettings.UseNumber {
logger.Warn("== DEPRECATION WARNING ==")
logger.Warn(" Currently, encoding/json decodes numbers as float64.")
logger.Warn(" This can cause numbers to lose precision as they are read from the Resque queue.")
Expand Down
30 changes: 25 additions & 5 deletions goworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@ var (
initialized bool
)

var workerSettings WorkerSettings

type WorkerSettings struct {
QueuesString string
Queues queuesFlag
IntervalFloat float64
Interval intervalFlag
Concurrency int
Connections int
Uri string
Namespace string
ExitOnComplete bool
IsStrict bool
UseNumber bool
}

func SetSettings(settings WorkerSettings) {
workerSettings = settings
}

// Init initializes the goworker process. This will be
// called by the Work function, but may be used by programs
// that wish to access goworker functions and configuration
Expand All @@ -39,7 +59,7 @@ func Init() error {
}
ctx = context.Background()

pool = newRedisPool(uri, connections, connections, time.Minute)
pool = newRedisPool(workerSettings.Uri, workerSettings.Connections, workerSettings.Connections, time.Minute)

initialized = true
}
Expand Down Expand Up @@ -103,16 +123,16 @@ func Work() error {

quit := signals()

poller, err := newPoller(queues, isStrict)
poller, err := newPoller(workerSettings.Queues, workerSettings.IsStrict)
if err != nil {
return err
}
jobs := poller.poll(time.Duration(interval), quit)
jobs := poller.poll(time.Duration(workerSettings.Interval), quit)

var monitor sync.WaitGroup

for id := 0; id < concurrency; id++ {
worker, err := newWorker(strconv.Itoa(id), queues)
for id := 0; id < workerSettings.Concurrency; id++ {
worker, err := newWorker(strconv.Itoa(id), workerSettings.Queues)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (p *poller) getJob(conn *RedisConn) (*Job, error) {
for _, queue := range p.queues(p.isStrict) {
logger.Debugf("Checking %s", queue)

reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", namespace, queue))
reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
if err != nil {
return nil, err
}
Expand All @@ -37,7 +37,7 @@ func (p *poller) getJob(conn *RedisConn) (*Job, error) {
job := &Job{Queue: queue}

decoder := json.NewDecoder(bytes.NewReader(reply.([]byte)))
if useNumber {
if workerSettings.UseNumber {
decoder.UseNumber()
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) <-chan *Job {
return
}
if job != nil {
conn.Send("INCR", fmt.Sprintf("%sstat:processed:%v", namespace, p))
conn.Send("INCR", fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p))
conn.Flush()
PutConn(conn)
select {
Expand All @@ -115,13 +115,13 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) <-chan *Job {
return
}

conn.Send("LPUSH", fmt.Sprintf("%squeue:%s", namespace, job.Queue), buf)
conn.Send("LPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buf)
conn.Flush()
return
}
} else {
PutConn(conn)
if exitOnComplete {
if workerSettings.ExitOnComplete {
return
}
logger.Debugf("Sleeping for %v", interval)
Expand Down
22 changes: 11 additions & 11 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,42 +34,42 @@ func (p *process) String() string {
}

func (p *process) open(conn *RedisConn) error {
conn.Send("SADD", fmt.Sprintf("%sworkers", namespace), p)
conn.Send("SET", fmt.Sprintf("%sstat:processed:%v", namespace, p), "0")
conn.Send("SET", fmt.Sprintf("%sstat:failed:%v", namespace, p), "0")
conn.Send("SADD", fmt.Sprintf("%sworkers", workerSettings.Namespace), p)
conn.Send("SET", fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p), "0")
conn.Send("SET", fmt.Sprintf("%sstat:failed:%v", workerSettings.Namespace, p), "0")
conn.Flush()

return nil
}

func (p *process) close(conn *RedisConn) error {
logger.Infof("%v shutdown", p)
conn.Send("SREM", fmt.Sprintf("%sworkers", namespace), p)
conn.Send("DEL", fmt.Sprintf("%sstat:processed:%s", namespace, p))
conn.Send("DEL", fmt.Sprintf("%sstat:failed:%s", namespace, p))
conn.Send("SREM", fmt.Sprintf("%sworkers", workerSettings.Namespace), p)
conn.Send("DEL", fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, p))
conn.Send("DEL", fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p))
conn.Flush()

return nil
}

func (p *process) start(conn *RedisConn) error {
conn.Send("SET", fmt.Sprintf("%sworker:%s:started", namespace, p), time.Now().String())
conn.Send("SET", fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p), time.Now().String())
conn.Flush()

return nil
}

func (p *process) finish(conn *RedisConn) error {
conn.Send("DEL", fmt.Sprintf("%sworker:%s", namespace, p))
conn.Send("DEL", fmt.Sprintf("%sworker:%s:started", namespace, p))
conn.Send("DEL", fmt.Sprintf("%sworker:%s", workerSettings.Namespace, p))
conn.Send("DEL", fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p))
conn.Flush()

return nil
}

func (p *process) fail(conn *RedisConn) error {
conn.Send("INCR", fmt.Sprintf("%sstat:failed", namespace))
conn.Send("INCR", fmt.Sprintf("%sstat:failed:%s", namespace, p))
conn.Send("INCR", fmt.Sprintf("%sstat:failed", workerSettings.Namespace))
conn.Send("INCR", fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p))
conn.Flush()

return nil
Expand Down
8 changes: 4 additions & 4 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (w *worker) start(conn *RedisConn, job *Job) error {
return err
}

conn.Send("SET", fmt.Sprintf("%sworker:%s", namespace, w), buffer)
conn.Send("SET", fmt.Sprintf("%sworker:%s", workerSettings.Namespace, w), buffer)
logger.Debugf("Processing %s since %s [%v]", work.Queue, work.RunAt, work.Payload.Class)

return w.process.start(conn)
Expand All @@ -57,14 +57,14 @@ func (w *worker) fail(conn *RedisConn, job *Job, err error) error {
if err != nil {
return err
}
conn.Send("RPUSH", fmt.Sprintf("%sfailed", namespace), buffer)
conn.Send("RPUSH", fmt.Sprintf("%sfailed", workerSettings.Namespace), buffer)

return w.process.fail(conn)
}

func (w *worker) succeed(conn *RedisConn, job *Job) error {
conn.Send("INCR", fmt.Sprintf("%sstat:processed", namespace))
conn.Send("INCR", fmt.Sprintf("%sstat:processed:%s", namespace, w))
conn.Send("INCR", fmt.Sprintf("%sstat:processed", workerSettings.Namespace))
conn.Send("INCR", fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, w))

return nil
}
Expand Down
6 changes: 3 additions & 3 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func TestEnqueue(t *testing.T) {
},
}

queues = []string{queueName}
useNumber = true
exitOnComplete = true
workerSettings.Queues = []string{queueName}
workerSettings.UseNumber = true
workerSettings.ExitOnComplete = true

err := Enqueue(expectedJob)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Enqueue(job *Job) error {
logger.Criticalf("Cant marshal payload on enqueue")
return err
}
err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", namespace, job.Queue), buffer)
err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer)
if err != nil {
logger.Criticalf("Cant push to queue")
return err
Expand Down