Skip to content

Commit

Permalink
feat: fix infinite scheduling loop when job gets scheduled after dead…
Browse files Browse the repository at this point in the history
…line
  • Loading branch information
acaloiaro committed Mar 27, 2024
1 parent 441a392 commit b401809
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 7 deletions.
17 changes: 15 additions & 2 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ var (
shutdownJobID = "-1" // job ID announced when triggering a shutdown
shutdownAnnouncementAllowance = 100 // ms
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
ErrConnectionStringEmpty = errors.New("connection string cannot be empty")
ErrDuplicateJob = errors.New("duplicate job")
ErrNoTransactionInContext = errors.New("context does not have a Tx set")
ErrExceededConnectionPoolTimeout = errors.New("exceeded timeout acquiring a connection from the pool")
ErrUnsupportedURIScheme = errors.New("only postgres:// and postgresql:// scheme URIs are supported, invalid connection string")
)

// PgBackend is a Postgres-based Neoq backend
Expand Down Expand Up @@ -855,6 +857,17 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) {
err = jobs.ErrJobExceededDeadline
p.logger.Debug("job deadline is in the past, skipping", slog.String("queue", job.Queue), slog.Int64("job_id", job.ID))
err = p.updateJob(ctx, err)
if err != nil {
p.logger.Error("unable to update job status", "error", err, "job_id", job.ID)
return
}

err = tx.Commit(ctx)
if err != nil {
p.logger.Error("unable to update job status", "error", err, "job_id", job.ID)
return
}

return
}

Expand Down Expand Up @@ -1027,7 +1040,7 @@ func GetPQConnectionString(connectionString string) (string, error) {
}

if dbURI.String() == "" {
return "", fmt.Errorf("connection string cannot be empty")
return "", ErrConnectionStringEmpty
}

scheme := dbURI.Scheme
Expand All @@ -1038,7 +1051,7 @@ func GetPQConnectionString(connectionString string) (string, error) {

if scheme != "postgres" && scheme != "postgresql" {
// This isn't a postgresql URI-style string (postgres://hostname/db)
return "", fmt.Errorf("only postgres and postgresql scheme URIs are supported, invalid connection string: %s", connectionString)
return "", ErrUnsupportedURIScheme
}

sslMode := "verify-ca"
Expand Down
39 changes: 34 additions & 5 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ func TestGetPQConnectionString(t *testing.T) {
{
name: "multiple hostnames",
input: "postgres://username:password@hostname1,hostname2,hostname3:5432/database",
want: "postgres://username:password@hostname1,hostname2,hostname3:5432/database?sslmode=require&x-migrations-table=neoq_schema_migrations",
want: "postgres://username:password@hostname1,hostname2,hostname3:5432/database?sslmode=require&x-migrations-table=neoq_schema_migrations", // nolint: lll
wantErr: false,
},

Expand Down Expand Up @@ -1004,8 +1004,9 @@ func TestGetPQConnectionString(t *testing.T) {
// with an error indicating that its deadline was not met
// https://github.com/acaloiaro/neoq/issues/123
func TestJobWithPastDeadline(t *testing.T) {
connString, _ := prepareAndCleanupDB(t)
connString, conn := prepareAndCleanupDB(t)
const queue = "testing"
timeoutTimer := time.After(5 * time.Second)
maxRetries := 5
done := make(chan bool)
defer close(done)
Expand All @@ -1018,7 +1019,6 @@ func TestJobWithPastDeadline(t *testing.T) {
defer nq.Shutdown(ctx)

h := handler.New(queue, func(_ context.Context) (err error) {
done <- true
return
})

Expand All @@ -1038,10 +1038,39 @@ func TestJobWithPastDeadline(t *testing.T) {
MaxRetries: &maxRetries,
})
if e != nil || jid == jobs.DuplicateJobID {
t.Error(e)
t.Error(e) // nolint: goerr113
}

if e != nil && !errors.Is(e, jobs.ErrJobExceededDeadline) {
t.Error(err)
t.Error(err) // nolint: goerr113
}

var status string
go func() {
// ensure job has failed/has the correct status
for {
err = conn.
QueryRow(context.Background(), "SELECT status FROM neoq_jobs WHERE id = $1", jid).
Scan(&status)
if err != nil {
break
}

if status == internal.JobStatusFailed {
done <- true
break
}

time.Sleep(50 * time.Millisecond)
}
}()

select {
case <-timeoutTimer:
err = jobs.ErrJobTimeout
case <-done:
}
if err != nil {
t.Errorf("job should have resulted in a status of 'failed', but its status is %s", status)
}
}

0 comments on commit b401809

Please sign in to comment.