Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure node.Close and node.RemoveWorker properly stop local jobs #15

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Pulse uses the [Jump Consistent Hash](https://arxiv.org/abs/1406.2294) algorithm
to assign jobs to workers which provides a good balance between load balancing
and worker assignment stability.


```mermaid
%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart LR
Expand Down Expand Up @@ -216,12 +215,12 @@ flowchart TD
end
pr --1. DispatchJob--> no
no --2. Add Job--> js
js -.3. Job.-> ps
js --3. Job--> ps
ps --4. Add Job--> ws
ws -.5. Job.-> r
ws --5. Job--> r
r --6. Start Job--> u
r --7. Add Ack--> rs
rs -.7. Ack.-> nr
rs --7. Ack--> nr
nr --8. Ack Add Job Event--> js

classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
Expand Down
4 changes: 2 additions & 2 deletions pool/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
}

// returnDispatchStatus returns the start job result to the caller.
func (node *Node) returnDispatchStatus(ctx context.Context, ev *streaming.Event) {
func (node *Node) returnDispatchStatus(_ context.Context, ev *streaming.Event) {
node.lock.Lock()
defer node.lock.Unlock()

Expand Down Expand Up @@ -738,7 +738,7 @@ func (node *Node) deleteWorker(ctx context.Context, id string) error {

// workerStream retrieves the stream for a worker. It caches the result in the
// workerStreams map. Caller is responsible for locking.
func (node *Node) workerStream(ctx context.Context, id string) (*streaming.Stream, error) {
func (node *Node) workerStream(_ context.Context, id string) (*streaming.Stream, error) {
stream, ok := node.workerStreams[id]
if !ok {
s, err := streaming.NewStream(workerStreamName(id), node.rdb, soptions.WithStreamLogger(node.logger))
Expand Down
27 changes: 18 additions & 9 deletions pool/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,13 @@ func TestRemoveWorkerThenShutdown(t *testing.T) {
rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
node = newTestNode(t, ctx, rdb, testName)
worker = newTestWorker(t, ctx, node)
handler = worker.handler.(*mockHandler)
)
defer cleanup(t, rdb, true, testName)
assert.NoError(t, node.DispatchJob(ctx, testName, []byte("payload")))
assert.Eventually(t, func() bool { return len(handler.jobs) == 1 }, max, delay)
assert.NoError(t, node.RemoveWorker(ctx, worker))
assert.Eventually(t, func() bool { return len(handler.jobs) == 0 }, max, delay)
assert.NoError(t, node.Shutdown(ctx))
}

Expand All @@ -88,9 +92,14 @@ func TestClose(t *testing.T) {
testName = strings.Replace(t.Name(), "/", "_", -1)
rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
node = newTestNode(t, ctx, rdb, testName)
worker = newTestWorker(t, ctx, node)
handler = worker.handler.(*mockHandler)
)
defer cleanup(t, rdb, false, testName)
assert.NoError(t, node.DispatchJob(ctx, testName, []byte("payload")))
assert.Eventually(t, func() bool { return len(handler.jobs) == 1 }, max, delay)
assert.NoError(t, node.Close(ctx))
assert.Eventually(t, func() bool { return len(handler.jobs) == 0 }, max, delay)
}

func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name string) *Node {
Expand All @@ -106,11 +115,11 @@ func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name stri

func newTestWorker(t *testing.T, ctx context.Context, node *Node) *Worker {
t.Helper()
wm := &workerMock{jobs: make(map[string]*Job)}
wm.startFunc = func(job *Job) error { wm.jobs[job.Key] = job; return nil }
wm.stopFunc = func(key string) error { delete(wm.jobs, key); return nil }
wm.notifyFunc = func(payload []byte) error { return nil }
worker, err := node.AddWorker(ctx, wm)
handler := &mockHandler{jobs: make(map[string]*Job)}
handler.startFunc = func(job *Job) error { handler.jobs[job.Key] = job; return nil }
handler.stopFunc = func(key string) error { delete(handler.jobs, key); return nil }
handler.notifyFunc = func(payload []byte) error { return nil }
worker, err := node.AddWorker(ctx, handler)
require.NoError(t, err)
return worker
}
Expand Down Expand Up @@ -150,16 +159,16 @@ func cleanup(t *testing.T, rdb *redis.Client, checkClean bool, testName string)
assert.NoError(t, rdb.FlushDB(ctx).Err())
}

type workerMock struct {
type mockHandler struct {
startFunc func(job *Job) error
stopFunc func(key string) error
notifyFunc func(payload []byte) error
jobs map[string]*Job
}

func (w *workerMock) Start(job *Job) error { return w.startFunc(job) }
func (w *workerMock) Stop(key string) error { return w.stopFunc(key) }
func (w *workerMock) Notify(p []byte) error { return w.notifyFunc(p) }
func (w *mockHandler) Start(job *Job) error { return w.startFunc(job) }
func (w *mockHandler) Stop(key string) error { return w.stopFunc(key) }
func (w *mockHandler) Notify(p []byte) error { return w.notifyFunc(p) }

// buffer is a goroutine safe bytes.Buffer
type buffer struct {
Expand Down
2 changes: 1 addition & 1 deletion pool/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (sched *scheduler) stopJobs(ctx context.Context, plan *JobPlan) error {
}

// handleStop handles the scheduler stop signal.
func (sched *scheduler) handleStop(ctx context.Context) {
func (sched *scheduler) handleStop(_ context.Context) {
ch := sched.jobMap.Subscribe()
for ev := range ch {
if ev == rmap.EventReset {
Expand Down
17 changes: 9 additions & 8 deletions pool/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ func (w *Worker) handleEvents(c <-chan *streaming.Event) {
case evStartJob:
err = w.startJob(ctx, unmarshalJob(payload))
case evStopJob:
w.lock.Lock()
err = w.stopJob(ctx, unmarshalJobKey(payload))
w.lock.Unlock()
case evNotify:
key, payload := unmarshalNotification(payload)
err = w.notify(ctx, key, payload)
Expand Down Expand Up @@ -229,7 +231,7 @@ func (w *Worker) stopAndWait(ctx context.Context) {
}

// startJob starts a job.
func (w *Worker) startJob(ctx context.Context, job *Job) error {
func (w *Worker) startJob(_ context.Context, job *Job) error {
w.lock.Lock()
defer w.lock.Unlock()
if w.stopped {
Expand All @@ -246,12 +248,8 @@ func (w *Worker) startJob(ctx context.Context, job *Job) error {
}

// stopJob stops a job.
func (w *Worker) stopJob(ctx context.Context, key string) error {
w.lock.Lock()
defer w.lock.Unlock()
if w.stopped {
return nil
}
// worker.lock must be held when calling this method.
func (w *Worker) stopJob(_ context.Context, key string) error {
if _, ok := w.jobs[key]; !ok {
return fmt.Errorf("job %s not found", key)
}
Expand All @@ -264,7 +262,7 @@ func (w *Worker) stopJob(ctx context.Context, key string) error {
}

// notify notifies the worker with the given payload.
func (w *Worker) notify(ctx context.Context, key string, payload []byte) error {
func (w *Worker) notify(_ context.Context, key string, payload []byte) error {
w.lock.Lock()
defer w.lock.Unlock()
if w.stopped {
Expand Down Expand Up @@ -335,6 +333,9 @@ func (w *Worker) requeueJobs(ctx context.Context) {
w.lock.Lock()
defer w.lock.Unlock()
for _, job := range w.jobs {
if err := w.stopJob(ctx, job.Key); err != nil {
w.logger.Error(fmt.Errorf("failed to stop job %q: %w", job.Key, err))
}
if _, err := w.Node.poolStream.Add(ctx, evStartJob, marshalJob(job)); err != nil {
w.logger.Error(fmt.Errorf("failed to requeue job %q: %w", job.Key, err))
}
Expand Down
12 changes: 9 additions & 3 deletions scripts/test
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ pushd ${GIT_ROOT}

staticcheck ./...

# If --force is passed, add --count=1 to the go test command
if [[ "$1" == "--force" ]]; then
shift
OPTIONS="--count=1"
fi

# Run tests one package at a time to avoid Redis race conditions
go test -race goa.design/pulse/rmap/...
go test -race goa.design/pulse/streaming/...
go test -race goa.design/pulse/pool/...
go test -race goa.design/pulse/rmap/... $OPTIONS
go test -race goa.design/pulse/streaming/... $OPTIONS
go test -race goa.design/pulse/pool/... $OPTIONS

popd
Loading