Skip to content

Commit

Permalink
fix: Read open dir mark correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
radeksimko committed Jun 28, 2022
1 parent 50ef638 commit ea86b87
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 6 deletions.
16 changes: 10 additions & 6 deletions internal/state/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,20 @@ func (js *JobStore) EnqueueJob(newJob job.Job) (job.ID, error) {

newJobID := job.ID(fmt.Sprintf("%d", atomic.AddUint64(&js.lastJobId, 1)))

err = txn.Insert(js.tableName, &ScheduledJob{
sJob := &ScheduledJob{
ID: newJobID,
Job: newJob,
IsDirOpen: isDirOpen(txn, newJob.Dir),
State: StateQueued,
})
}

err = txn.Insert(js.tableName, sJob)
if err != nil {
return "", err
}

js.logger.Printf("JOBS: Enqueueing new job %q: %q for %q", newJobID, newJob.Type, newJob.Dir)
js.logger.Printf("JOBS: Enqueueing new job %q: %q for %q (IsDirOpen: %t)",
sJob.ID, sJob.Type, sJob.Dir, sJob.IsDirOpen)

txn.Commit()

Expand Down Expand Up @@ -204,7 +207,7 @@ func (js *JobStore) AwaitNextJob(ctx context.Context, priority job.JobPriority)
defer js.nextJobLowPrioMu.Unlock()
default:
// This should never happen
return "", job.Job{}, fmt.Errorf("unexpected priority: %#v", priority)
panic(fmt.Sprintf("unexpected priority: %#v", priority))
}

return js.awaitNextJob(ctx, priority)
Expand Down Expand Up @@ -245,12 +248,13 @@ func (js *JobStore) awaitNextJob(ctx context.Context, priority job.JobPriority)
return "", job.Job{}, err
}

js.logger.Printf("JOBS: Dispatching next job %q: %q for %q", sJob.ID, sJob.Type, sJob.Dir)
js.logger.Printf("JOBS: Dispatching next job %q (scheduler prio: %d, job prio: %d, isDirOpen: %t): %q for %q",
sJob.ID, priority, sJob.Priority, sJob.IsDirOpen, sJob.Type, sJob.Dir)
return sJob.ID, sJob.Job, nil
}

func isDirOpen(txn *memdb.Txn, dirHandle document.DirHandle) bool {
docObj, err := txn.First(documentsTableName, "id", dirHandle)
docObj, err := txn.First(documentsTableName, "dir", dirHandle)
if err != nil {
return false
}
Expand Down
46 changes: 46 additions & 0 deletions internal/state/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,52 @@ func TestJobStore_EnqueueJob(t *testing.T) {
}
}

func TestJobStore_EnqueueJob_openDir(t *testing.T) {
ss, err := NewStateStore()
if err != nil {
t.Fatal(err)
}

dirHandle := document.DirHandleFromPath("/test-1")

err = ss.DocumentStore.OpenDocument(document.Handle{Dir: dirHandle, Filename: "test.tf"}, "test", 0, []byte{})
if err != nil {
t.Fatal(err)
}

id, err := ss.JobStore.EnqueueJob(job.Job{
Func: func(ctx context.Context) error {
return nil
},
Dir: dirHandle,
Type: "test-type",
})
if err != nil {
t.Fatal(err)
}

// verify that job for open dir comes is treated as high priority
ctx := context.Background()
ctx, cancelFunc := context.WithTimeout(ctx, 250*time.Millisecond)
t.Cleanup(cancelFunc)
nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.HighPriority)
if err != nil {
t.Fatal(err)
}

if nextId != id {
t.Fatalf("expected next job ID %q, given: %q", id, nextId)
}

if j.Dir != dirHandle {
t.Fatalf("expected next job dir %q, given: %q", dirHandle, j.Dir)
}

if j.Type != "test-type" {
t.Fatalf("expected next job dir %q, given: %q", "test-type", j.Type)
}
}

func BenchmarkJobStore_EnqueueJob_basic(b *testing.B) {
ss, err := NewStateStore()
if err != nil {
Expand Down

0 comments on commit ea86b87

Please sign in to comment.