Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v0.5.2 #219

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## Purpose

Brief description of the purpose of this change.

## Solution

High level overview of what was done. This is the roadmap for those who are going to CR.

## Semantic Versioning (check one)

- [ ] The following were changed in a non-backward compatible way and requires a major version bump:
- _[link to the breaking change in the diff]_
- [ ] Something public was added or changed in backward compatible way, this requires a minor version bump
- [ ] No public changes nor new features (backwards-compatible refactor or bug fix), so this can be included in a patch
release

## How to QA

- [ ] run the following related examples: **(fill in)**
- [ ] Other necessary steps needed to fully exercise the solution should be added here. **(fill in)**
11 changes: 11 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: 2
updates:
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "weekly"
day: sunday
commit-message:
prefix: fix
prefix-development: chore
include: scope
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test-reports/
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This is a Golang library; it doesn't produce
# any artifacts.
FROM scratch
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
SHELL := /bin/bash

test: tools
rm -rf test-reports
mkdir test-reports
go clean -testcache
go test -v 2>&1 `go list ./... | grep -v /vendor/` | go-junit-report -iocopy -set-exit-code -out test-reports/unit-test-report.xml

tools:
go install github.com/jstemmer/go-junit-report/v2@v2.0.0
1 change: 1 addition & 0 deletions aviary.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
version: 1
2 changes: 1 addition & 1 deletion benches/bench_work/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"sync/atomic"
"time"

"github.com/Workiva/work"
"github.com/gocraft/health"
"github.com/gocraft/work"
"github.com/gomodule/redigo/redis"
)

Expand Down
28 changes: 14 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type WorkerObservation struct {

// If IsBusy:
JobName string `json:"job_name"`
JobID string `json:"job_id"`
JobGuid string `json:"job_guid"`
StartedAt int64 `json:"started_at"`
ArgsJSON string `json:"args_json"`
Checkin string `json:"checkin"`
Expand Down Expand Up @@ -179,8 +179,8 @@ func (c *Client) WorkerObservations() ([]*WorkerObservation, error) {
var err error
if key == "job_name" {
ob.JobName = value
} else if key == "job_id" {
ob.JobID = value
} else if key == "job_guid" {
ob.JobGuid = value
} else if key == "started_at" {
ob.StartedAt, err = strconv.ParseInt(value, 10, 64)
} else if key == "args" {
Expand Down Expand Up @@ -352,8 +352,8 @@ func (c *Client) DeadJobs(page uint) ([]*DeadJob, int64, error) {
}

// DeleteDeadJob deletes a dead job from Redis.
func (c *Client) DeleteDeadJob(diedAt int64, jobID string) error {
ok, _, err := c.deleteZsetJob(redisKeyDead(c.namespace), diedAt, jobID)
func (c *Client) DeleteDeadJob(diedAt int64, jobGuid string) error {
ok, _, err := c.deleteZsetJob(redisKeyDead(c.namespace), diedAt, jobGuid)
if err != nil {
return err
}
Expand All @@ -364,7 +364,7 @@ func (c *Client) DeleteDeadJob(diedAt int64, jobID string) error {
}

// RetryDeadJob retries a dead job. The job will be re-queued on the normal work queue for eventual processing by a worker.
func (c *Client) RetryDeadJob(diedAt int64, jobID string) error {
func (c *Client) RetryDeadJob(diedAt int64, jobGuid string) error {
// Get queues for job names
queues, err := c.Queues()
if err != nil {
Expand All @@ -388,7 +388,7 @@ func (c *Client) RetryDeadJob(diedAt int64, jobID string) error {
args = append(args, redisKeyJobsPrefix(c.namespace)) // ARGV[1]
args = append(args, nowEpochSeconds())
args = append(args, diedAt)
args = append(args, jobID)
args = append(args, jobGuid)

conn := c.pool.Get()
defer conn.Close()
Expand Down Expand Up @@ -466,8 +466,8 @@ func (c *Client) DeleteAllDeadJobs() error {
}

// DeleteScheduledJob deletes a job in the scheduled queue.
func (c *Client) DeleteScheduledJob(scheduledFor int64, jobID string) error {
ok, jobBytes, err := c.deleteZsetJob(redisKeyScheduled(c.namespace), scheduledFor, jobID)
func (c *Client) DeleteScheduledJob(scheduledFor int64, jobGuid string) error {
ok, jobBytes, err := c.deleteZsetJob(redisKeyScheduled(c.namespace), scheduledFor, jobGuid)
if err != nil {
return err
}
Expand Down Expand Up @@ -504,8 +504,8 @@ func (c *Client) DeleteScheduledJob(scheduledFor int64, jobID string) error {
}

// DeleteRetryJob deletes a job in the retry queue.
func (c *Client) DeleteRetryJob(retryAt int64, jobID string) error {
ok, _, err := c.deleteZsetJob(redisKeyRetry(c.namespace), retryAt, jobID)
func (c *Client) DeleteRetryJob(retryAt int64, jobGuid string) error {
ok, _, err := c.deleteZsetJob(redisKeyRetry(c.namespace), retryAt, jobGuid)
if err != nil {
return err
}
Expand All @@ -515,14 +515,14 @@ func (c *Client) DeleteRetryJob(retryAt int64, jobID string) error {
return nil
}

// deleteZsetJob deletes the job in the specified zset (dead, retry, or scheduled queue). zsetKey is like "work:dead" or "work:scheduled". The function deletes all jobs with the given jobID with the specified zscore (there should only be one, but in theory there could be bad data). It will return if at least one job is deleted and if
func (c *Client) deleteZsetJob(zsetKey string, zscore int64, jobID string) (bool, []byte, error) {
// deleteZsetJob deletes the job in the specified zset (dead, retry, or scheduled queue). zsetKey is like "work:dead" or "work:scheduled". The function deletes all jobs with the given jobGuid with the specified zscore (there should only be one, but in theory there could be bad data). It will return if at least one job is deleted and if
func (c *Client) deleteZsetJob(zsetKey string, zscore int64, jobGuid string) (bool, []byte, error) {
script := redis.NewScript(1, redisLuaDeleteSingleCmd)

args := make([]interface{}, 0, 1+2)
args = append(args, zsetKey) // KEY[1]
args = append(args, zscore) // ARGV[1]
args = append(args, jobID) // ARGV[2]
args = append(args, jobGuid) // ARGV[2]

conn := c.pool.Get()
defer conn.Close()
Expand Down
26 changes: 13 additions & 13 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ func TestClientWorkerObservations(t *testing.T) {
assert.True(t, ob.IsBusy)
assert.Equal(t, `{"a":3,"b":4}`, ob.ArgsJSON)
assert.True(t, (nowEpochSeconds()-ob.StartedAt) <= 3)
assert.True(t, ob.JobID != "")
assert.True(t, ob.JobGuid != "")
} else if ob.JobName == "wat" {
watCount++
assert.True(t, ob.IsBusy)
assert.Equal(t, `{"a":1,"b":2}`, ob.ArgsJSON)
assert.True(t, (nowEpochSeconds()-ob.StartedAt) <= 3)
assert.True(t, ob.JobID != "")
assert.True(t, ob.JobGuid != "")
} else {
assert.False(t, ob.IsBusy)
}
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestClientDeadJobs(t *testing.T) {
assert.EqualValues(t, 1, count)

// Delete it!
err = client.DeleteDeadJob(deadJob.DiedAt, deadJob.ID)
err = client.DeleteDeadJob(deadJob.DiedAt, deadJob.Guid)
assert.NoError(t, err)

jobs, count, err = client.DeadJobs(1)
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestClientDeleteDeadJob(t *testing.T) {

tot := count
for _, j := range jobs {
err = client.DeleteDeadJob(j.DiedAt, j.ID)
err = client.DeleteDeadJob(j.DiedAt, j.Guid)
assert.NoError(t, err)
_, count, err = client.DeadJobs(1)
assert.NoError(t, err)
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestClientRetryDeadJob(t *testing.T) {

tot := count
for _, j := range jobs {
err = client.RetryDeadJob(j.DiedAt, j.ID)
err = client.RetryDeadJob(j.DiedAt, j.Guid)
assert.NoError(t, err)
_, count, err = client.DeadJobs(1)
assert.NoError(t, err)
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestClientRetryDeadJobWithArgs(t *testing.T) {
failAt := int64(12347)
job := &Job{
Name: name,
ID: makeIdentifier(),
Guid: makeIdentifier(),
EnqueuedAt: encAt,
Args: map[string]interface{}{"a": "wat"},
Fails: 3,
Expand All @@ -443,7 +443,7 @@ func TestClientRetryDeadJobWithArgs(t *testing.T) {
}

client := NewClient(ns, pool)
err = client.RetryDeadJob(failAt, job.ID)
err = client.RetryDeadJob(failAt, job.Guid)
assert.NoError(t, err)

job1 := getQueuedJob(ns, pool, name)
Expand Down Expand Up @@ -552,7 +552,7 @@ func TestClientRetryAllDeadJobsBig(t *testing.T) {
for i := 0; i < 10000; i++ {
job := &Job{
Name: "wat1",
ID: makeIdentifier(),
Guid: makeIdentifier(),
EnqueuedAt: 12345,
Args: nil,
Fails: 3,
Expand All @@ -573,7 +573,7 @@ func TestClientRetryAllDeadJobsBig(t *testing.T) {
// Add a dead job with a non-existent queue:
job := &Job{
Name: "dontexist",
ID: makeIdentifier(),
Guid: makeIdentifier(),
EnqueuedAt: 12345,
Args: nil,
Fails: 3,
Expand Down Expand Up @@ -623,7 +623,7 @@ func TestClientDeleteScheduledJob(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, j)

err = client.DeleteScheduledJob(j.RunAt, j.ID)
err = client.DeleteScheduledJob(j.RunAt, j.Guid)
assert.NoError(t, err)
assert.EqualValues(t, 0, zsetSize(pool, redisKeyScheduled(ns)))
}
Expand All @@ -640,7 +640,7 @@ func TestClientDeleteScheduledUniqueJob(t *testing.T) {
assert.NotNil(t, j)

client := NewClient(ns, pool)
err = client.DeleteScheduledJob(j.RunAt, j.ID)
err = client.DeleteScheduledJob(j.RunAt, j.Guid)
assert.NoError(t, err)
assert.EqualValues(t, 0, zsetSize(pool, redisKeyScheduled(ns)))

Expand Down Expand Up @@ -677,7 +677,7 @@ func TestClientDeleteRetryJob(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(jobs))
if assert.EqualValues(t, 1, count) {
err = client.DeleteRetryJob(jobs[0].RetryAt, job.ID)
err = client.DeleteRetryJob(jobs[0].RetryAt, job.Guid)
assert.NoError(t, err)
assert.EqualValues(t, 0, zsetSize(pool, redisKeyRetry(ns)))
}
Expand All @@ -686,7 +686,7 @@ func TestClientDeleteRetryJob(t *testing.T) {
func insertDeadJob(ns string, pool *redis.Pool, name string, encAt, failAt int64) *Job {
job := &Job{
Name: name,
ID: makeIdentifier(),
Guid: makeIdentifier(),
EnqueuedAt: encAt,
Args: nil,
Fails: 3,
Expand Down
2 changes: 1 addition & 1 deletion cmd/workenqueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os"
"time"

"github.com/gocraft/work"
"github.com/Workiva/work"
"github.com/gomodule/redigo/redis"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/workfakedata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"math/rand"
"time"

"github.com/gocraft/work"
"github.com/Workiva/work"
"github.com/gomodule/redigo/redis"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/workwebui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strconv"
"time"

"github.com/gocraft/work/webui"
"github.com/Workiva/work/webui"
"github.com/gomodule/redigo/redis"
)

Expand Down
2 changes: 1 addition & 1 deletion dead_pool_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) {
// both locks should be at 0
assert.EqualValues(t, 0, getInt64(pool, lock1))
assert.EqualValues(t, 0, getInt64(pool, lock2))
// worker pool ID 2 removed from both lock info hashes
// worker pool Guid 2 removed from both lock info hashes
v, err = conn.Do("HGET", lockInfo1, workerPoolID2)
assert.Nil(t, v)
v, err = conn.Do("HGET", lockInfo2, workerPoolID2)
Expand Down
28 changes: 22 additions & 6 deletions enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ type Enqueuer struct {
mtx sync.RWMutex
}

type BeforeEnqueueFunc func(job *Job) error

var noOpBeforeEnqueue = func(*Job) error {
return nil
}

// NewEnqueuer creates a new enqueuer with the specified Redis namespace and Redis pool.
func NewEnqueuer(namespace string, pool *redis.Pool) *Enqueuer {
if pool == nil {
Expand All @@ -35,12 +41,12 @@ func NewEnqueuer(namespace string, pool *redis.Pool) *Enqueuer {
}
}

// Enqueue will enqueue the specified job name and arguments. The args param can be nil if no args ar needed.
// Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com"})
func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, error) {
// CheckAndEnqueue will enqueue the specified job name and arguments if and only if beforeEnqueue does not return an error. The args param can be nil if no args are needed.
// Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com"}, func(j *Job) error { return db.Save(job) } )
func (e *Enqueuer) CheckAndEnqueue(jobName string, args map[string]interface{}, beforeEnqueue BeforeEnqueueFunc) (*Job, error) {
job := &Job{
Name: jobName,
ID: makeIdentifier(),
Guid: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
}
Expand All @@ -50,6 +56,10 @@ func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, e
return nil, err
}

if err := beforeEnqueue(job); err != nil {
return nil, err
}

conn := e.Pool.Get()
defer conn.Close()

Expand All @@ -64,11 +74,17 @@ func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, e
return job, nil
}

// Enqueue will enqueue the specified job name and arguments. The args param can be nil if no args are needed.
// Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com"})
func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, error) {
return e.CheckAndEnqueue(jobName, args, noOpBeforeEnqueue)
}

// EnqueueIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds.
func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) {
job := &Job{
Name: jobName,
ID: makeIdentifier(),
Guid: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
}
Expand Down Expand Up @@ -195,7 +211,7 @@ func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{},

job := &Job{
Name: jobName,
ID: makeIdentifier(),
Guid: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
Unique: true,
Expand Down
Loading