diff --git a/pool/README.md b/pool/README.md index 885f1be..3716bfb 100644 --- a/pool/README.md +++ b/pool/README.md @@ -20,7 +20,6 @@ Pulse uses the [Jump Consistent Hash](https://arxiv.org/abs/1406.2294) algorithm to assign jobs to workers which provides a good balance between load balancing and worker assignment stability. - ```mermaid %%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%% flowchart LR @@ -216,12 +215,12 @@ flowchart TD end pr --1. DispatchJob--> no no --2. Add Job--> js - js -.3. Job.-> ps + js --3. Job--> ps ps --4. Add Job--> ws - ws -.5. Job.-> r + ws --5. Job--> r r --6. Start Job--> u r --7. Add Ack--> rs - rs -.7. Ack.-> nr + rs --7. Ack--> nr nr --8. Ack Add Job Event--> js classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC; diff --git a/pool/node.go b/pool/node.go index fad9bbd..2004990 100644 --- a/pool/node.go +++ b/pool/node.go @@ -526,7 +526,7 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) { } // returnDispatchStatus returns the start job result to the caller. -func (node *Node) returnDispatchStatus(ctx context.Context, ev *streaming.Event) { +func (node *Node) returnDispatchStatus(_ context.Context, ev *streaming.Event) { node.lock.Lock() defer node.lock.Unlock() @@ -738,7 +738,7 @@ func (node *Node) deleteWorker(ctx context.Context, id string) error { // workerStream retrieves the stream for a worker. It caches the result in the // workerStreams map. Caller is responsible for locking. -func (node *Node) workerStream(ctx context.Context, id string) (*streaming.Stream, error) { +func (node *Node) workerStream(_ context.Context, id string) (*streaming.Stream, error) { stream, ok := node.workerStreams[id] if !ok { s, err := streaming.NewStream(workerStreamName(id), node.rdb, soptions.WithStreamLogger(node.logger)) diff --git a/pool/node_test.go b/pool/node_test.go index ccc2605..24a8877 100644 --- a/pool/node_test.go +++ b/pool/node_test.go @@ -76,9 +76,13 @@ func TestRemoveWorkerThenShutdown(t *testing.T) { rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd}) node = newTestNode(t, ctx, rdb, testName) worker = newTestWorker(t, ctx, node) + handler = worker.handler.(*mockHandler) ) defer cleanup(t, rdb, true, testName) + assert.NoError(t, node.DispatchJob(ctx, testName, []byte("payload"))) + assert.Eventually(t, func() bool { return len(handler.jobs) == 1 }, max, delay) assert.NoError(t, node.RemoveWorker(ctx, worker)) + assert.Eventually(t, func() bool { return len(handler.jobs) == 0 }, max, delay) assert.NoError(t, node.Shutdown(ctx)) } @@ -88,9 +92,14 @@ func TestClose(t *testing.T) { testName = strings.Replace(t.Name(), "/", "_", -1) rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd}) node = newTestNode(t, ctx, rdb, testName) + worker = newTestWorker(t, ctx, node) + handler = worker.handler.(*mockHandler) ) defer cleanup(t, rdb, false, testName) + assert.NoError(t, node.DispatchJob(ctx, testName, []byte("payload"))) + assert.Eventually(t, func() bool { return len(handler.jobs) == 1 }, max, delay) assert.NoError(t, node.Close(ctx)) + assert.Eventually(t, func() bool { return len(handler.jobs) == 0 }, max, delay) } func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name string) *Node { @@ -106,11 +115,11 @@ func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name stri func newTestWorker(t *testing.T, ctx context.Context, node *Node) *Worker { t.Helper() - wm := &workerMock{jobs: make(map[string]*Job)} - wm.startFunc = func(job *Job) error { wm.jobs[job.Key] = job; return nil } - wm.stopFunc = func(key string) error { delete(wm.jobs, key); return nil } - wm.notifyFunc = func(payload []byte) error { return nil } - worker, err := node.AddWorker(ctx, wm) + handler := &mockHandler{jobs: make(map[string]*Job)} + handler.startFunc = func(job *Job) error { handler.jobs[job.Key] = job; return nil } + handler.stopFunc = func(key string) error { delete(handler.jobs, key); return nil } + handler.notifyFunc = func(payload []byte) error { return nil } + worker, err := node.AddWorker(ctx, handler) require.NoError(t, err) return worker } @@ -150,16 +159,16 @@ func cleanup(t *testing.T, rdb *redis.Client, checkClean bool, testName string) assert.NoError(t, rdb.FlushDB(ctx).Err()) } -type workerMock struct { +type mockHandler struct { startFunc func(job *Job) error stopFunc func(key string) error notifyFunc func(payload []byte) error jobs map[string]*Job } -func (w *workerMock) Start(job *Job) error { return w.startFunc(job) } -func (w *workerMock) Stop(key string) error { return w.stopFunc(key) } -func (w *workerMock) Notify(p []byte) error { return w.notifyFunc(p) } +func (w *mockHandler) Start(job *Job) error { return w.startFunc(job) } +func (w *mockHandler) Stop(key string) error { return w.stopFunc(key) } +func (w *mockHandler) Notify(p []byte) error { return w.notifyFunc(p) } // buffer is a goroutine safe bytes.Buffer type buffer struct { diff --git a/pool/scheduler.go b/pool/scheduler.go index 8617fa5..6ff192c 100644 --- a/pool/scheduler.go +++ b/pool/scheduler.go @@ -170,7 +170,7 @@ func (sched *scheduler) stopJobs(ctx context.Context, plan *JobPlan) error { } // handleStop handles the scheduler stop signal. -func (sched *scheduler) handleStop(ctx context.Context) { +func (sched *scheduler) handleStop(_ context.Context) { ch := sched.jobMap.Subscribe() for ev := range ch { if ev == rmap.EventReset { diff --git a/pool/worker.go b/pool/worker.go index 1c956e1..cab52ea 100644 --- a/pool/worker.go +++ b/pool/worker.go @@ -163,7 +163,9 @@ func (w *Worker) handleEvents(c <-chan *streaming.Event) { case evStartJob: err = w.startJob(ctx, unmarshalJob(payload)) case evStopJob: + w.lock.Lock() err = w.stopJob(ctx, unmarshalJobKey(payload)) + w.lock.Unlock() case evNotify: key, payload := unmarshalNotification(payload) err = w.notify(ctx, key, payload) @@ -229,7 +231,7 @@ func (w *Worker) stopAndWait(ctx context.Context) { } // startJob starts a job. -func (w *Worker) startJob(ctx context.Context, job *Job) error { +func (w *Worker) startJob(_ context.Context, job *Job) error { w.lock.Lock() defer w.lock.Unlock() if w.stopped { @@ -246,12 +248,8 @@ func (w *Worker) startJob(ctx context.Context, job *Job) error { } // stopJob stops a job. -func (w *Worker) stopJob(ctx context.Context, key string) error { - w.lock.Lock() - defer w.lock.Unlock() - if w.stopped { - return nil - } +// worker.lock must be held when calling this method. +func (w *Worker) stopJob(_ context.Context, key string) error { if _, ok := w.jobs[key]; !ok { return fmt.Errorf("job %s not found", key) } @@ -264,7 +262,7 @@ func (w *Worker) stopJob(ctx context.Context, key string) error { } // notify notifies the worker with the given payload. -func (w *Worker) notify(ctx context.Context, key string, payload []byte) error { +func (w *Worker) notify(_ context.Context, key string, payload []byte) error { w.lock.Lock() defer w.lock.Unlock() if w.stopped { @@ -335,6 +333,9 @@ func (w *Worker) requeueJobs(ctx context.Context) { w.lock.Lock() defer w.lock.Unlock() for _, job := range w.jobs { + if err := w.stopJob(ctx, job.Key); err != nil { + w.logger.Error(fmt.Errorf("failed to stop job %q: %w", job.Key, err)) + } if _, err := w.Node.poolStream.Add(ctx, evStartJob, marshalJob(job)); err != nil { w.logger.Error(fmt.Errorf("failed to requeue job %q: %w", job.Key, err)) } diff --git a/scripts/test b/scripts/test index 6bf64fa..2b271c9 100755 --- a/scripts/test +++ b/scripts/test @@ -9,9 +9,15 @@ pushd ${GIT_ROOT} staticcheck ./... +# If --force is passed, add --count=1 to the go test command +if [[ "$1" == "--force" ]]; then + shift + OPTIONS="--count=1" +fi + # Run tests one package at a time to avoid Redis race conditions -go test -race goa.design/pulse/rmap/... -go test -race goa.design/pulse/streaming/... -go test -race goa.design/pulse/pool/... +go test -race goa.design/pulse/rmap/... $OPTIONS +go test -race goa.design/pulse/streaming/... $OPTIONS +go test -race goa.design/pulse/pool/... $OPTIONS popd