Skip to content

Commit

Permalink
feat(worker): suppor request and handle job
Browse files Browse the repository at this point in the history
Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
  • Loading branch information
appleboy committed Apr 9, 2022
1 parent 9018340 commit a3998eb
Showing 1 changed file with 21 additions and 38 deletions.
59 changes: 21 additions & 38 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package redisdb
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -153,16 +152,6 @@ func (w *Worker) Shutdown() error {
return nil
}

// Capacity for channel
func (w *Worker) Capacity() int {
return 0
}

// Usage for count of channel usage
func (w *Worker) Usage() int {
return 0
}

// Queue send notification to queue
func (w *Worker) Queue(job core.QueuedMessage) error {
if atomic.LoadInt32(&w.stopFlag) == 1 {
Expand All @@ -182,41 +171,35 @@ func (w *Worker) Queue(job core.QueuedMessage) error {

// Run start the worker
func (w *Worker) Run(task core.QueuedMessage) error {
for {
// check queue status
select {
case <-w.stop:
return nil
default:
}
data, _ := task.(queue.Job)

select {
case m, ok := <-w.channel:
select {
case <-w.stop:
return nil
default:
}
if err := w.handle(data); err != nil {
return err
}

return nil
}

// Request a new task
func (w *Worker) Request() (core.QueuedMessage, error) {
clock := 0
loop:
for {
select {
case task, ok := <-w.channel:
if !ok {
return fmt.Errorf("redis pubsub: channel=%s closed", w.opts.channelName)
return nil, queue.ErrQueueHasBeenClosed
}

var data queue.Job
if err := json.Unmarshal([]byte(m.Payload), &data); err != nil {
w.opts.logger.Error("json unmarshal error: ", err)
continue
}
if err := w.handle(data); err != nil {
w.opts.logger.Error("handle job error: ", err)
_ = json.Unmarshal([]byte(task.Payload), &data)
return data, nil
case <-time.After(1 * time.Second):
if clock == 5 {
break loop
}
case <-w.stop:
return nil
clock += 1
}
}
}

// Request a new task
func (w *Worker) Request() (core.QueuedMessage, error) {
return nil, queue.ErrNoTaskInQueue
}

0 comments on commit a3998eb

Please sign in to comment.