diff --git a/README.md b/README.md index 83d8d81b..9c257811 100644 --- a/README.md +++ b/README.md @@ -211,6 +211,14 @@ job, err = enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) // job, err = enqueuer.EnqueueUniqueIn("clear_cache", 300, work.Q{"object_id_": "789"}) // job != nil (diff id) ``` +Alternatively, you can provide your own key for making a job unique. When another job is enqueued with the same key as a job already in the queue, it will simply update the arguments. +```go +enqueuer := work.NewEnqueuer("my_app_namespace", redisPool) +job, err := enqueuer.EnqueueUniqueByKey("clear_cache", work.Q{"object_id_": "123"}, map[string]interface{}{"my_key": "586"}) +job, err = enqueuer.EnqueueUniqueInByKey("clear_cache", 300, work.Q{"object_id_": "789"}, map[string]interface{}{"my_key": "586"}) +``` +For information on how this map will be serialized to form a unique key, see (https://golang.org/pkg/encoding/json/#Marshal). + ### Periodic Enqueueing (Cron) You can periodically enqueue jobs on your gocraft/work cluster using your worker pool. The [scheduling specification](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once. diff --git a/enqueue.go b/enqueue.go index 31c5b0a0..0d2f763e 100644 --- a/enqueue.go +++ b/enqueue.go @@ -105,83 +105,48 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri // In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. // EnqueueUnique returns the job if it was enqueued and nil if it wasn't func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error) { - uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args) - if err != nil { - return nil, err - } + return e.EnqueueUniqueByKey(jobName, args, nil) +} - job := &Job{ - Name: jobName, - ID: makeIdentifier(), - EnqueuedAt: nowEpochSeconds(), - Args: args, - Unique: true, - } +// EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs. +func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) { + return e.EnqueueUniqueInByKey(jobName, secondsFromNow, args, nil) +} - rawJSON, err := job.serialize() +// EnqueueUniqueByKey enqueues a job unless a job is already enqueued with the same name and key, updating arguments. +// The already-enqueued job can be in the normal work queue or in the scheduled job queue. +// Once a worker begins processing a job, another job with the same name and key can be enqueued again. +// Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. +// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. +// EnqueueUniqueByKey returns the job if it was enqueued and nil if it wasn't +func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (*Job, error) { + enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap) if err != nil { return nil, err } - conn := e.Pool.Get() - defer conn.Close() - - if err := e.addToKnownJobs(conn, jobName); err != nil { - return nil, err - } - - scriptArgs := make([]interface{}, 0, 3) - scriptArgs = append(scriptArgs, e.queuePrefix+jobName) // KEY[1] - scriptArgs = append(scriptArgs, uniqueKey) // KEY[2] - scriptArgs = append(scriptArgs, rawJSON) // ARGV[1] + res, err := enqueue(nil) - res, err := redis.String(e.enqueueUniqueScript.Do(conn, scriptArgs...)) if res == "ok" && err == nil { return job, nil } return nil, err } -// EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs. -func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) { - uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, args) - if err != nil { - return nil, err - } - - job := &Job{ - Name: jobName, - ID: makeIdentifier(), - EnqueuedAt: nowEpochSeconds(), - Args: args, - Unique: true, - } - - rawJSON, err := job.serialize() +// EnqueueUniqueInByKey enqueues a job in the scheduled job queue that is unique on specified key for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs. +// Subsequent calls with same key will update arguments +func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error) { + enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap) if err != nil { return nil, err } - conn := e.Pool.Get() - defer conn.Close() - - if err := e.addToKnownJobs(conn, jobName); err != nil { - return nil, err - } - scheduledJob := &ScheduledJob{ RunAt: nowEpochSeconds() + secondsFromNow, Job: job, } - scriptArgs := make([]interface{}, 0, 4) - scriptArgs = append(scriptArgs, redisKeyScheduled(e.Namespace)) // KEY[1] - scriptArgs = append(scriptArgs, uniqueKey) // KEY[2] - scriptArgs = append(scriptArgs, rawJSON) // ARGV[1] - scriptArgs = append(scriptArgs, scheduledJob.RunAt) // ARGV[2] - - res, err := redis.String(e.enqueueUniqueInScript.Do(conn, scriptArgs...)) - + res, err := enqueue(&scheduledJob.RunAt) if res == "ok" && err == nil { return scheduledJob, nil } @@ -213,3 +178,68 @@ func (e *Enqueuer) addToKnownJobs(conn redis.Conn, jobName string) error { return nil } + +type enqueueFnType func(*int64) (string, error) + +func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (enqueueFnType, *Job, error) { + useDefaultKeys := false + if keyMap == nil { + useDefaultKeys = true + keyMap = args + } + + uniqueKey, err := redisKeyUniqueJob(e.Namespace, jobName, keyMap) + if err != nil { + return nil, nil, err + } + + job := &Job{ + Name: jobName, + ID: makeIdentifier(), + EnqueuedAt: nowEpochSeconds(), + Args: args, + Unique: true, + UniqueKey: uniqueKey, + } + + rawJSON, err := job.serialize() + if err != nil { + return nil, nil, err + } + + enqueueFn := func(runAt *int64) (string, error) { + conn := e.Pool.Get() + defer conn.Close() + + if err := e.addToKnownJobs(conn, jobName); err != nil { + return "", err + } + + scriptArgs := []interface{}{} + script := e.enqueueUniqueScript + + scriptArgs = append(scriptArgs, e.queuePrefix+jobName) // KEY[1] + scriptArgs = append(scriptArgs, uniqueKey) // KEY[2] + scriptArgs = append(scriptArgs, rawJSON) // ARGV[1] + if useDefaultKeys { + // keying on arguments so arguments can't be updated + // we'll just get them off the original job so to save space, make this "1" + scriptArgs = append(scriptArgs, "1") // ARGV[2] + } else { + // we'll use this for updated arguments since the job on the queue + // doesn't get updated + scriptArgs = append(scriptArgs, rawJSON) // ARGV[2] + } + + if runAt != nil { // Scheduled job so different job queue with additional arg + scriptArgs[0] = redisKeyScheduled(e.Namespace) // KEY[1] + scriptArgs = append(scriptArgs, *runAt) // ARGV[3] + + script = e.enqueueUniqueInScript + } + + return redis.String(script.Do(conn, scriptArgs...)) + } + + return enqueueFn, job, nil +} diff --git a/enqueue_test.go b/enqueue_test.go index 610a56d6..8ca41fa6 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -230,3 +230,127 @@ func TestEnqueueUniqueIn(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, job) } + +func TestEnqueueUniqueByKey(t *testing.T) { + var arg3 string + var arg4 string + + pool := newTestPool(":6379") + ns := "work" + cleanKeyspace(ns, pool) + enqueuer := NewEnqueuer(ns, pool) + var mutex = &sync.Mutex{} + job, err := enqueuer.EnqueueUniqueByKey("wat", Q{"a": 3, "b": "foo"}, Q{"key": "123"}) + assert.NoError(t, err) + if assert.NotNil(t, job) { + assert.Equal(t, "wat", job.Name) + assert.True(t, len(job.ID) > 10) // Something is in it + assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds + assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds + assert.Equal(t, "foo", job.ArgString("b")) + assert.EqualValues(t, 3, job.ArgInt64("a")) + assert.NoError(t, job.ArgError()) + } + + job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 3, "b": "bar"}, Q{"key": "123"}) + assert.NoError(t, err) + assert.Nil(t, job) + + job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 4, "b": "baz"}, Q{"key": "124"}) + assert.NoError(t, err) + assert.NotNil(t, job) + + job, err = enqueuer.EnqueueUniqueByKey("taw", nil, Q{"key": "125"}) + assert.NoError(t, err) + assert.NotNil(t, job) + + // Process the queues. Ensure the right number of jobs were processed + var wats, taws int64 + wp := NewWorkerPool(TestContext{}, 3, ns, pool) + wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { + mutex.Lock() + argA := job.Args["a"].(float64) + argB := job.Args["b"].(string) + if argA == 3 { + arg3 = argB + } + if argA == 4 { + arg4 = argB + } + + wats++ + mutex.Unlock() + return nil + }) + wp.JobWithOptions("taw", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { + mutex.Lock() + taws++ + mutex.Unlock() + return fmt.Errorf("ohno") + }) + wp.Start() + wp.Drain() + wp.Stop() + + assert.EqualValues(t, 2, wats) + assert.EqualValues(t, 1, taws) + + // Check that arguments got updated to new value + assert.EqualValues(t, "bar", arg3) + assert.EqualValues(t, "baz", arg4) + + // Enqueue again. Ensure we can. + job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 1, "b": "cool"}, Q{"key": "123"}) + assert.NoError(t, err) + assert.NotNil(t, job) + + job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 1, "b": "coolio"}, Q{"key": "124"}) + assert.NoError(t, err) + assert.NotNil(t, job) + + // Even though taw resulted in an error, we should still be able to re-queue it. + // This could result in multiple taws enqueued at the same time in a production system. + job, err = enqueuer.EnqueueUniqueByKey("taw", nil, Q{"key": "123"}) + assert.NoError(t, err) + assert.NotNil(t, job) +} + +func EnqueueUniqueInByKey(t *testing.T) { + pool := newTestPool(":6379") + ns := "work" + cleanKeyspace(ns, pool) + enqueuer := NewEnqueuer(ns, pool) + + // Enqueue two unique jobs -- ensure one job sticks. + job, err := enqueuer.EnqueueUniqueInByKey("wat", 300, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) + assert.NoError(t, err) + if assert.NotNil(t, job) { + assert.Equal(t, "wat", job.Name) + assert.True(t, len(job.ID) > 10) // Something is in it + assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds + assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds + assert.Equal(t, "cool", job.ArgString("b")) + assert.EqualValues(t, 1, job.ArgInt64("a")) + assert.NoError(t, job.ArgError()) + assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt) + } + + job, err = enqueuer.EnqueueUniqueInByKey("wat", 10, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) + assert.NoError(t, err) + assert.Nil(t, job) + + // Get the job + score, j := jobOnZset(pool, redisKeyScheduled(ns)) + + assert.True(t, score > time.Now().Unix()+290) // We don't want to overwrite the time + assert.True(t, score <= time.Now().Unix()+300) + + assert.Equal(t, "wat", j.Name) + assert.True(t, len(j.ID) > 10) // Something is in it + assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds + assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds + assert.Equal(t, "cool", j.ArgString("b")) + assert.EqualValues(t, 1, j.ArgInt64("a")) + assert.NoError(t, j.ArgError()) + assert.True(t, j.Unique) +} diff --git a/job.go b/job.go index 7b3a7076..7c6f5d79 100644 --- a/job.go +++ b/job.go @@ -15,6 +15,7 @@ type Job struct { EnqueuedAt int64 `json:"t"` Args map[string]interface{} `json:"args"` Unique bool `json:"unique,omitempty"` + UniqueKey string `json:"unique_key,omitempty"` // Inputs when retrying Fails int64 `json:"fails,omitempty"` // number of times this job has failed diff --git a/redis.go b/redis.go index 849b2264..417eb481 100644 --- a/redis.go +++ b/redis.go @@ -348,10 +348,13 @@ return requeuedCount // KEYS[1] = job queue to push onto // KEYS[2] = Unique job's key. Test for existence and set if we push. // ARGV[1] = job +// ARGV[2] = updated job or just a 1 if arguments don't update var redisLuaEnqueueUnique = ` -if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then +if redis.call('set', KEYS[2], ARGV[2], 'NX', 'EX', '86400') then redis.call('lpush', KEYS[1], ARGV[1]) return 'ok' +else + redis.call('set', KEYS[2], ARGV[2], 'EX', '86400') end return 'dup' ` @@ -359,11 +362,14 @@ return 'dup' // KEYS[1] = scheduled job queue // KEYS[2] = Unique job's key. Test for existence and set if we push. // ARGV[1] = job -// ARGV[2] = epoch seconds for job to be run at +// ARGV[2] = updated job or just a 1 if arguments don't update +// ARGV[3] = epoch seconds for job to be run at var redisLuaEnqueueUniqueIn = ` -if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then - redis.call('zadd', KEYS[1], ARGV[2], ARGV[1]) +if redis.call('set', KEYS[2], ARGV[2], 'NX', 'EX', '86400') then + redis.call('zadd', KEYS[1], ARGV[3], ARGV[1]) return 'ok' +else + redis.call('set', KEYS[2], ARGV[2], 'EX', '86400') end return 'dup' ` diff --git a/worker.go b/worker.go index 913d5fc8..d7578334 100644 --- a/worker.go +++ b/worker.go @@ -191,7 +191,13 @@ func (w *worker) fetchJob() (*Job, error) { func (w *worker) processJob(job *Job) { if job.Unique { - w.deleteUniqueJob(job) + updatedJob := w.getAndDeleteUniqueJob(job) + // This is to support the old way of doing it, where we used the job off the queue and just deleted the unique key + // Going forward the job on the queue will always be just a placeholder, and we will be replacing it with the + // updated job extracted here + if updatedJob != nil { + job = updatedJob + } } var runErr error jt := w.jobTypes[job.Name] @@ -213,18 +219,49 @@ func (w *worker) processJob(job *Job) { w.removeJobFromInProgress(job, fate) } -func (w *worker) deleteUniqueJob(job *Job) { - uniqueKey, err := redisKeyUniqueJob(w.namespace, job.Name, job.Args) - if err != nil { - logError("worker.delete_unique_job.key", err) +func (w *worker) getAndDeleteUniqueJob(job *Job) *Job { + var uniqueKey string + var err error + + if job.UniqueKey != "" { + uniqueKey = job.UniqueKey + } else { // For jobs put in queue prior to this change. In the future this can be deleted as there will always be a UniqueKey + uniqueKey, err = redisKeyUniqueJob(w.namespace, job.Name, job.Args) + if err != nil { + logError("worker.delete_unique_job.key", err) + return nil + } } + conn := w.pool.Get() defer conn.Close() + rawJSON, err := redis.Bytes(conn.Do("GET", uniqueKey)) + if err != nil { + logError("worker.delete_unique_job.get", err) + return nil + } + _, err = conn.Do("DEL", uniqueKey) if err != nil { logError("worker.delete_unique_job.del", err) + return nil } + + // Previous versions did not support updated arguments and just set key to 1, so in these cases we should do nothing. + // In the future this can be deleted, as we will always be getting arguments from here + if string(rawJSON) == "1" { + return nil + } + + // The job pulled off the queue was just a placeholder with no args, so replace it + jobWithArgs, err := newJob(rawJSON, job.dequeuedFrom, job.inProgQueue) + if err != nil { + logError("worker.delete_unique_job.updated_job", err) + return nil + } + + return jobWithArgs } func (w *worker) removeJobFromInProgress(job *Job, fate terminateOp) {