Skip to content

Commit

Permalink
add fetcher and prioritySampler interfaces, change fetchJob
Browse files Browse the repository at this point in the history
  • Loading branch information
Vadim Inshakov committed Feb 1, 2023
1 parent ce9fbe8 commit 19083cb
Showing 1 changed file with 66 additions and 6 deletions.
72 changes: 66 additions & 6 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ import (

const fetchKeysPerJobType = 6

type fetcher interface {
Do(c redis.Conn, keysAndArgs ...interface{}) (interface{}, error)
}

type prioritySampler interface {
Add(priority uint, redisJobs string, redisJobsInProg string, redisJobsPaused string, redisJobsLock string, redisJobsLockInfo string, redisJobsMaxConcurrency string)
Sample() []sampleItem
GetSamples() []sampleItem
}

type worker struct {
workerID string
poolID string
Expand All @@ -20,7 +30,7 @@ type worker struct {
middleware []*middlewareHandler
contextType reflect.Type

redisFetchScript *redis.Script
redisFetchScript fetcher
sampler prioritySampler
*observer

Expand Down Expand Up @@ -64,9 +74,9 @@ func newWorker(namespace string, poolID string, pool Pool, contextType reflect.T
// note: can't be called while the thing is started
func (w *worker) updateMiddlewareAndJobTypes(middleware []*middlewareHandler, jobTypes map[string]*jobType) {
w.middleware = middleware
sampler := prioritySampler{}
sampler := &prioritySamplerInPlaceImpl{}
for _, jt := range jobTypes {
sampler.add(jt.Priority,
sampler.Add(jt.Priority,
redisKeyJobs(w.namespace, jt.Name),
redisKeyJobsInProgress(w.namespace, w.poolID, jt.Name),
redisKeyJobsPaused(w.namespace, jt.Name),
Expand Down Expand Up @@ -140,14 +150,64 @@ func (w *worker) loop() {
}
}

func (w *worker) fetchJobOld() (*Job, error) {
// resort queues
// NOTE: we could optimize this to only resort every second, or something.
w.sampler.Sample()
samples := w.sampler.GetSamples()
numKeys := len(samples) * fetchKeysPerJobType
var scriptArgs = make([]interface{}, 0, numKeys+1)

for _, s := range samples {
scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg, s.redisJobsPaused, s.redisJobsLock, s.redisJobsLockInfo, s.redisJobsMaxConcurrency) // KEYS[1-6 * N]
}
scriptArgs = append(scriptArgs, w.poolID) // ARGV[1]
conn := w.pool.Get()
defer conn.Close()

values, err := redis.Values(w.redisFetchScript.Do(conn, scriptArgs...))
if err == redis.ErrNil {
return nil, nil
} else if err != nil {
return nil, err
}

if len(values) != 3 {
return nil, fmt.Errorf("need 3 elements back")
}

rawJSON, ok := values[0].([]byte)
if !ok {
return nil, fmt.Errorf("response msg not bytes")
}

dequeuedFrom, ok := values[1].([]byte)
if !ok {
return nil, fmt.Errorf("response queue not bytes")
}

inProgQueue, ok := values[2].([]byte)
if !ok {
return nil, fmt.Errorf("response in prog not bytes")
}

job, err := newJobOld(rawJSON, dequeuedFrom, inProgQueue)
if err != nil {
return nil, err
}

return job, nil
}

func (w *worker) fetchJob() (*Job, error) {
// resort queues
// NOTE: we could optimize this to only resort every second, or something.
w.sampler.sample()
numKeys := len(w.sampler.samples) * fetchKeysPerJobType
w.sampler.Sample()
samples := w.sampler.GetSamples()
numKeys := len(samples) * fetchKeysPerJobType
var scriptArgs = make([]interface{}, 0, numKeys+1)

for _, s := range w.sampler.samples {
for _, s := range samples {
scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg, s.redisJobsPaused, s.redisJobsLock, s.redisJobsLockInfo, s.redisJobsMaxConcurrency) // KEYS[1-6 * N]
}
scriptArgs = append(scriptArgs, w.poolID) // ARGV[1]
Expand Down

0 comments on commit 19083cb

Please sign in to comment.