Skip to content

Properly delete stale workers with no job #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose/docker-compose-redis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ services:
redis:
image: "redis"
container_name: "pulse-redis"
command: redis-server --save 20 1 --loglevel warning --requirepass ${REDIS_PASSWORD}
command: redis-server --save "" --loglevel warning --requirepass ${REDIS_PASSWORD}
ports:
- "6379:6379"
48 changes: 31 additions & 17 deletions pool/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,23 @@ func (node *Node) handleWorkerMapUpdate(ctx context.Context) {
if node.closing {
return
}
// First cleanup the local workers that are no longer active.
for _, worker := range node.localWorkers {
if _, ok := node.workerMap.Get(worker.ID); !ok {
// If it's not in the worker map, then it's not active and its jobs
// have already been requeued.
node.logger.Info("handleWorkerMapUpdate: removing inactive local worker", "worker", worker.ID)
worker.stopAndWait(ctx)
for i, w := range node.localWorkers {
if worker.ID == w.ID {
node.localWorkers = append(node.localWorkers[:i], node.localWorkers[i+1:]...)
break
}
}
}
}

// Then rebalance the jobs across the remaining active workers.
activeWorkers := node.activeWorkers()
if len(activeWorkers) == 0 {
return
Expand Down Expand Up @@ -727,10 +744,11 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {
continue
}
lastSeen := time.Unix(0, lsi)
if time.Since(lastSeen) <= node.workerTTL {
lsd := time.Since(lastSeen)
if lsd <= node.workerTTL {
continue
}
node.logger.Debug("processInactiveWorkers: removing worker", "worker", id)
node.logger.Debug("processInactiveWorkers: removing worker", "worker", id, "last-seen", lsd, "ttl", node.workerTTL)

// Use optimistic locking to set the keep-alive timestamp to a value
// in the future so that another node does not also requeue the jobs.
Expand All @@ -747,21 +765,15 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {

keys, ok := node.jobsMap.GetValues(id)
if !ok {
continue // worker is already being deleted
// Worker has no jobs, so delete it right away.
if err := node.deleteWorker(ctx, id); err != nil {
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to delete worker %q: %w", id, err), "worker", id)
}
continue
}
mustRequeue := len(keys)
requeued := make(map[string]chan error)
for _, key := range keys {
payload, ok := node.jobPayloadsMap.Get(key)
if !ok {
node.logger.Error(fmt.Errorf("processInactiveWorkers: payload for job not found"), "job", key, "worker", id)
// No need to keep the job around if the payload is not found.
if _, _, err := node.jobsMap.RemoveValues(ctx, id, key); err != nil {
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to remove job %q from jobs map: %w", key, err), "job", key, "worker", id)
}
mustRequeue--
continue
}
payload, _ := node.jobPayloadsMap.Get(key) // Some jobs have no payload
job := &Job{
Key: key,
Payload: []byte(payload),
Expand All @@ -776,10 +788,11 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {
requeued[job.Key] = cherr
}

if len(requeued) != mustRequeue {
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to requeue all inactive jobs: %d/%d, will retry later", len(requeued), mustRequeue), "worker", id)
allRequeued := len(requeued) == len(keys)
if !allRequeued {
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to requeue all inactive jobs: %d/%d, will retry later", len(requeued), len(keys)), "worker", id)
}
go node.processRequeuedJobs(ctx, id, requeued, len(requeued) == mustRequeue)
go node.processRequeuedJobs(ctx, id, requeued, allRequeued)
}
}

Expand Down Expand Up @@ -866,6 +879,7 @@ func (node *Node) activeWorkers() []string {

// deleteWorker removes a worker from the pool deleting the worker stream.
func (node *Node) deleteWorker(ctx context.Context, id string) error {
node.logger.Debug("deleteWorker: deleting worker", "worker", id)
if _, err := node.keepAliveMap.Delete(ctx, id); err != nil {
node.logger.Error(fmt.Errorf("deleteWorker: failed to delete worker %q from keep-alive map: %w", id, err))
}
Expand Down
2 changes: 2 additions & 0 deletions pool/ticker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func TestNewTicker(t *testing.T) {
assert.WithinDuration(t, startTime.Add(tickDuration), firstTick, time.Second, "First tick should occur after approximately one tick duration")

// Verify next tick time and duration
ticker.lock.Lock()
nextTickTime, tickerDuration := deserialize(ticker.next)
ticker.lock.Unlock()
assert.WithinDuration(t, startTime.Add(tickDuration), nextTickTime, time.Second, "Next tick time should be approximately one tick duration from start")
assert.Equal(t, tickDuration, tickerDuration, "Ticker duration should match the specified duration")

Expand Down
89 changes: 86 additions & 3 deletions pool/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool

import (
"context"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -32,9 +33,7 @@ func TestWorkerRequeueJobs(t *testing.T) {

// Emulate the worker failing by preventing it from refreshing its keepalive
// This means we can't cleanup cleanly, hence "false" in CleanupRedis
worker.lock.Lock()
worker.stopped = true
worker.lock.Unlock()
worker.stopAndWait(ctx)

// Create a new worker to pick up the requeued job
newWorker := newTestWorker(t, ctx, node)
Expand All @@ -50,4 +49,88 @@ func TestWorkerRequeueJobs(t *testing.T) {
require.Eventually(t, func() bool {
return len(newWorker.Jobs()) == 2
}, time.Second, delay, "job was not requeued")

// Cleanup
assert.NoError(t, node.Shutdown(ctx))
}

func TestStaleWorkerCleanupInNode(t *testing.T) {
var (
ctx = ptesting.NewTestContext(t)
testName = strings.Replace(t.Name(), "/", "_", -1)
rdb = ptesting.NewRedisClient(t)
node = newTestNode(t, ctx, rdb, testName)
)
defer ptesting.CleanupRedis(t, rdb, false, testName)

// Create one active worker
activeWorker := newTestWorker(t, ctx, node)
t.Log("active worker", activeWorker.ID)

// Create five stale workers
staleWorkers := make([]*Worker, 5)
for i := 0; i < 5; i++ {
staleWorkers[i] = newTestWorker(t, ctx, node)
staleWorkers[i].stop(ctx)
// Set the last seen time to a past time
_, err := node.keepAliveMap.Set(ctx, staleWorkers[i].ID, strconv.FormatInt(time.Now().Add(-2*node.workerTTL).UnixNano(), 10))
assert.NoError(t, err)
}

// Wait for the cleanup process to run
time.Sleep(3 * node.workerTTL)

// Check if only the active worker remains
workers := node.activeWorkers()
assert.Len(t, workers, 1, "There should be only one worker remaining")
assert.Contains(t, workers, activeWorker.ID, "The active worker should still exist")

// Cleanup
assert.NoError(t, node.Shutdown(ctx))
}

func TestStaleWorkerCleanupAcrossNodes(t *testing.T) {
var (
ctx = ptesting.NewTestContext(t)
testName = strings.Replace(t.Name(), "/", "_", -1)
rdb = ptesting.NewRedisClient(t)
node1 = newTestNode(t, ctx, rdb, testName+"_1")
node2 = newTestNode(t, ctx, rdb, testName+"_2")
)
defer ptesting.CleanupRedis(t, rdb, false, testName)

// Create one active worker on node1
activeWorker := newTestWorker(t, ctx, node1)

// Create five stale workers on node2
staleWorkers := make([]*Worker, 5)
for i := 0; i < 5; i++ {
staleWorkers[i] = newTestWorker(t, ctx, node2)
staleWorkers[i].stop(ctx)
// Set the last seen time to a past time
_, err := node2.keepAliveMap.Set(ctx, staleWorkers[i].ID, strconv.FormatInt(time.Now().Add(-2*node2.workerTTL).UnixNano(), 10))
assert.NoError(t, err)
}

// Wait for the cleanup process to run
time.Sleep(3 * node2.workerTTL)

// Check if only the active worker remains on node1
workers1 := node1.activeWorkers()
assert.Len(t, workers1, 1, "There should be only one worker remaining on node1")
assert.Contains(t, workers1, activeWorker.ID, "The active worker should still exist on node1")

// Check if all workers have been removed from node2
workers2 := node2.activeWorkers()
assert.Len(t, workers2, 0, "There should be no workers remaining on node2")

// Verify that stale workers are not in the worker map of node2
for _, worker := range staleWorkers {
_, exists := node2.workerMap.Get(worker.ID)
assert.False(t, exists, "Stale worker %s should not exist in the worker map of node2", worker.ID)
}

// Cleanup
assert.NoError(t, node1.Shutdown(ctx))
assert.NoError(t, node2.Shutdown(ctx))
}
19 changes: 19 additions & 0 deletions scripts/stop-redis
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env bash
set -e

# Get the root directory of the git repository
GIT_ROOT=$(git rev-parse --show-toplevel)

# Change to the git root directory
pushd "${GIT_ROOT}"

# Source common utilities and environment variables
# shellcheck source=utils/common.sh
source ./scripts/utils/common.sh
source .env

# Stop the Redis Docker container
docker compose -p redis -f docker-compose/docker-compose-redis.yaml down

# Return to the original directory
popd
1 change: 0 additions & 1 deletion streaming/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ func (s *Sink) newConsumer(ctx context.Context, stream *Stream) (string, error)
}
return "", fmt.Errorf("failed to set sink keep-alive for new consumer %s: %w", consumer, err)
}
s.logger.Debug("created new consumer", "consumer", consumer)
return consumer, nil
}

Expand Down