Skip to content

Commit

Permalink
Add LockJobByID function
Browse files Browse the repository at this point in the history
  • Loading branch information
Delores Diei committed Sep 9, 2021
1 parent e726647 commit be53e70
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 0 deletions.
38 changes: 38 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,44 @@ LIMIT 1 FOR UPDATE SKIP LOCKED`, queue, now).Scan(
return nil, fmt.Errorf("could not lock a job (rollback result: %v): %w", rbErr, err)
}

// LockJobByID attempts to retrieve a specific Job from the database.
// If the job is found, it will be locked on the transactional level, so other workers
// will be skipping it. If the job is not found, an error will be returned
//
// Because Gue uses transaction-level locks, we have to hold the
// same transaction throughout the process of getting the job, working it,
// deleting it, and releasing the lock.
//
// After the Job has been worked, you must call either Done() or Error() on it
// in order to commit transaction to persist Job changes (remove or update it).
func (c *Client) LockJobByID(ctx context.Context, id int64) (*Job, error) {
tx, err := c.pool.Begin(ctx)
if err != nil {
return nil, err
}

j := Job{pool: c.pool, tx: tx, backoff: c.backoff}

err = tx.QueryRow(ctx, `SELECT job_id, queue, priority, run_at, job_type, args, error_count
FROM gue_jobs
WHERE job_id = $1 FOR UPDATE SKIP LOCKED`, id).Scan(
&j.ID,
&j.Queue,
&j.Priority,
&j.RunAt,
&j.Type,
(*json.RawMessage)(&j.Args),
&j.ErrorCount,
)
if err == nil {
return &j, nil
}

rbErr := tx.Rollback(ctx)

return nil, fmt.Errorf("could not lock the job (rollback result: %v): %w", rbErr, err)
}

func newID() string {
hasher := md5.New()
// nolint:errcheck
Expand Down
111 changes: 111 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,117 @@ func testLockJobCustomQueue(t *testing.T, connPool adapter.ConnPool) {
require.NoError(t, err)
}

func TestLockJobByID(t *testing.T) {
t.Run("pgx/v3", func(t *testing.T) {
testLockJobByID(t, adapterTesting.OpenTestPoolPGXv3(t))
})
t.Run("pgx/v4", func(t *testing.T) {
testLockJobByID(t, adapterTesting.OpenTestPoolPGXv4(t))
})
t.Run("lib/pq", func(t *testing.T) {
testLockJobByID(t, adapterTesting.OpenTestPoolLibPQ(t))
})
t.Run("go-pg/v10", func(t *testing.T) {
testLockJobByID(t, adapterTesting.OpenTestPoolGoPGv10(t))
})
}

func testLockJobByID(t *testing.T, connPool adapter.ConnPool) {
c := NewClient(connPool)
ctx := context.Background()

newJob := &Job{
Type: "MyJob",
}
err := c.Enqueue(ctx, newJob)
require.NoError(t, err)
require.Greater(t, newJob.ID, int64(0))

j, err := c.LockJobByID(ctx, newJob.ID)
require.NoError(t, err)

require.NotNil(t, j.tx)
require.NotNil(t, j.pool)
defer func() {
err := j.Done(ctx)
assert.NoError(t, err)
}()

// check values of returned Job
assert.Equal(t, newJob.ID, j.ID)
assert.Equal(t, defaultQueueName, j.Queue)
assert.Equal(t, int16(0), j.Priority)
assert.False(t, j.RunAt.IsZero())
assert.Equal(t, newJob.Type, j.Type)
assert.Equal(t, []byte(`[]`), j.Args)
assert.Equal(t, int32(0), j.ErrorCount)
assert.NotEqual(t, pgtype.Present, j.LastError.Status)
}

func TestLockJobByIDAlreadyLocked(t *testing.T) {
t.Run("pgx/v3", func(t *testing.T) {
testLockJobByIDAlreadyLocked(t, adapterTesting.OpenTestPoolPGXv3(t))
})
t.Run("pgx/v4", func(t *testing.T) {
testLockJobByIDAlreadyLocked(t, adapterTesting.OpenTestPoolPGXv4(t))
})
t.Run("lib/pq", func(t *testing.T) {
testLockJobByIDAlreadyLocked(t, adapterTesting.OpenTestPoolLibPQ(t))
})
t.Run("go-pg/v10", func(t *testing.T) {
testLockJobByIDAlreadyLocked(t, adapterTesting.OpenTestPoolGoPGv10(t))
})
}

func testLockJobByIDAlreadyLocked(t *testing.T, connPool adapter.ConnPool) {
c := NewClient(connPool)
ctx := context.Background()

newJob := &Job{
Type: "MyJob",
}

err := c.Enqueue(ctx, newJob)
require.NoError(t, err)

j, err := c.LockJobByID(ctx, newJob.ID)
require.NoError(t, err)

defer func() {
err := j.Done(ctx)
assert.NoError(t, err)
}()
require.NotNil(t, j)

j2, err := c.LockJobByID(ctx, newJob.ID)
require.Error(t, err)
require.Nil(t, j2)
}

func TestLockJobByIDNoJob(t *testing.T) {
t.Run("pgx/v3", func(t *testing.T) {
testLockJobByIDNoJob(t, adapterTesting.OpenTestPoolPGXv3(t))
})
t.Run("pgx/v4", func(t *testing.T) {
testLockJobByIDNoJob(t, adapterTesting.OpenTestPoolPGXv4(t))
})
t.Run("lib/pq", func(t *testing.T) {
testLockJobByIDNoJob(t, adapterTesting.OpenTestPoolLibPQ(t))
})
t.Run("go-pg/v10", func(t *testing.T) {
testLockJobByIDNoJob(t, adapterTesting.OpenTestPoolGoPGv10(t))
})
}

func testLockJobByIDNoJob(t *testing.T, connPool adapter.ConnPool) {
c := NewClient(connPool)
ctx := context.Background()

j, err := c.LockJobByID(ctx, 0)
require.Error(t, err)
require.Nil(t, j)
}

func TestJobTx(t *testing.T) {
t.Run("pgx/v3", func(t *testing.T) {
testJobTx(t, adapterTesting.OpenTestPoolPGXv3(t))
Expand Down

0 comments on commit be53e70

Please sign in to comment.