-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnode.go
1428 lines (1277 loc) · 47.7 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package pool
import (
"context"
"errors"
"fmt"
"hash"
"hash/crc64"
"io"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/oklog/ulid/v2"
redis "github.com/redis/go-redis/v9"
"goa.design/clue/log"
"goa.design/pulse/pulse"
"goa.design/pulse/rmap"
"goa.design/pulse/streaming"
"goa.design/pulse/streaming/options"
)
type (
// Node is a pool of workers.
Node struct {
ID string
PoolName string
poolStream *streaming.Stream // pool event stream for dispatching jobs
poolSink *streaming.Sink // pool event sink
nodeStream *streaming.Stream // node event stream for receiving worker events
nodeReader *streaming.Reader // node event reader
nodeKeepAliveMap *rmap.Map // node keep-alive timestamps indexed by ID
nodeShutdownMap *rmap.Map // key is node ID that requested shutdown
workerMap *rmap.Map // worker creation times by ID
workerKeepAliveMap *rmap.Map // worker keep-alive timestamps indexed by ID
workerCleanupMap *rmap.Map // key is stale worker ID that needs cleanup
jobMap *rmap.Map // jobs by worker ID
jobPendingMap *rmap.Map // pending jobs by job key
jobPayloadMap *rmap.Map // job payloads by job key
tickerMap *rmap.Map // ticker next tick time indexed by name
workerTTL time.Duration // Worker considered dead if keep-alive not updated after this duration
workerShutdownTTL time.Duration // Worker considered dead if not shutdown after this duration
ackGracePeriod time.Duration // Wait for return status up to this duration
clientOnly bool
logger pulse.Logger
h hasher
stop chan struct{} // closed when node is stopped
closed chan struct{} // closed when node is closed
wg sync.WaitGroup // allows to wait until all goroutines exit
rdb *redis.Client
localWorkers sync.Map // workers created by this node
workerStreams sync.Map // worker streams indexed by ID
nodeStreams sync.Map // streams for worker acks indexed by ID
pendingJobChannels sync.Map // channels used to send DispatchJob results, nil if event is requeued
pendingEvents sync.Map // pending events indexed by sender and event IDs
lock sync.RWMutex
closing bool
shutdown bool
}
// hasher is the interface implemented by types that can hash keys.
hasher interface {
Hash(key string, numBuckets int64) int64
}
// jumpHash implement Jump Consistent Hash.
jumpHash struct {
h hash.Hash64
}
)
const (
// evInit is the event used to initialize a node or worker stream.
evInit string = "i"
// evStartJob is the event used to send new job to workers.
evStartJob string = "j"
// evNotify is the event used to notify a worker running a specific job.
evNotify string = "n"
// evStopJob is the event used to stop a job.
evStopJob string = "s"
// evAck is the worker event used to ack a pool event.
evAck string = "a"
// evDispatchReturn is the event used to forward the worker start return
// status to the node that dispatched the job.
evDispatchReturn string = "d"
)
// pendingEventTTL is the TTL for pending events.
var pendingEventTTL = 2 * time.Minute
// ErrJobExists is returned when attempting to dispatch a job with a key that already exists.
var ErrJobExists = errors.New("job already exists")
// AddNode adds a new node to the pool with the given name and returns it. The
// node can be used to dispatch jobs and add new workers. A node also routes
// dispatched jobs to the proper worker and acks the corresponding events once
// the worker acks the job.
//
// The options WithClientOnly can be used to create a node that can only be used
// to dispatch jobs. Such a node does not route or process jobs in the
// background.
func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...NodeOption) (*Node, error) {
o := parseOptions(opts...)
logger := o.logger
nodeID := ulid.Make().String()
if logger == nil {
logger = pulse.NoopLogger()
} else {
logger = logger.WithPrefix("pool", poolName, "node", nodeID)
}
logger.Info("options",
"client_only", o.clientOnly,
"max_queued_jobs", o.maxQueuedJobs,
"worker_ttl", o.workerTTL,
"worker_shutdown_ttl", o.workerShutdownTTL,
"ack_grace_period", o.ackGracePeriod)
nsm, err := rmap.Join(ctx, nodeShutdownMapName(poolName), rdb, rmap.WithLogger(logger))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to join shutdown replicated map %q: %w", nodeShutdownMapName(poolName), err)
}
if nsm.Len() > 0 {
return nil, fmt.Errorf("AddNode: pool %q is shutting down", poolName)
}
nkm, err := rmap.Join(ctx, nodeKeepAliveMapName(poolName), rdb, rmap.WithLogger(logger))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to join node keep-alive map %q: %w", nodeKeepAliveMapName(poolName), err)
}
if _, err := nkm.Set(ctx, nodeID, strconv.FormatInt(time.Now().UnixNano(), 10)); err != nil {
return nil, fmt.Errorf("AddNode: failed to set initial node keep-alive: %w", err)
}
poolStream, err := streaming.NewStream(poolStreamName(poolName), rdb,
options.WithStreamMaxLen(o.maxQueuedJobs),
options.WithStreamLogger(logger))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to create pool job stream %q: %w", poolStreamName(poolName), err)
}
var (
wm *rmap.Map
jm *rmap.Map
jpm *rmap.Map
jpem *rmap.Map
wkm *rmap.Map
tm *rmap.Map
wcm *rmap.Map
poolSink *streaming.Sink
nodeStream *streaming.Stream
nodeReader *streaming.Reader
closed chan struct{}
)
if !o.clientOnly {
wm, err = rmap.Join(ctx, workerMapName(poolName), rdb, rmap.WithLogger(logger))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to join pool workers replicated map %q: %w", workerMapName(poolName), err)
}
workerIDs := wm.Keys()
logger.Info("joined", "workers", workerIDs)
jm, err = rmap.Join(ctx, jobMapName(poolName), rdb, rmap.WithLogger(logger))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to join pool jobs replicated map %q: %w", jobMapName(poolName), err)
}
jpm, err = rmap.Join(ctx, jobPayloadMapName(poolName), rdb, rmap.WithLogger(logger))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to join pool job payloads replicated map %q: %w", jobPayloadMapName(poolName), err)
}
wkm, err = rmap.Join(ctx, workerKeepAliveMapName(poolName), rdb, rmap.WithLogger(logger))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to join worker keep-alive replicated map %q: %w", workerKeepAliveMapName(poolName), err)
}
tm, err = rmap.Join(ctx, tickerMapName(poolName), rdb, rmap.WithLogger(logger))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to join pool ticker replicated map %q: %w", tickerMapName(poolName), err)
}
wcm, err = rmap.Join(ctx, workerCleanupMapName(poolName), rdb, rmap.WithLogger(logger))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to join pool cleanup replicated map %q: %w", workerCleanupMapName(poolName), err)
}
// Initialize and join pending jobs map
jpem, err = rmap.Join(ctx, jobPendingMapName(poolName), rdb, rmap.WithLogger(logger))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to join pending jobs replicated map %q: %w", jobPendingMapName(poolName), err)
}
poolSink, err = poolStream.NewSink(ctx, "events",
options.WithSinkBlockDuration(o.jobSinkBlockDuration),
options.WithSinkAckGracePeriod(o.ackGracePeriod))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to create events sink for stream %q: %w", poolStreamName(poolName), err)
}
closed = make(chan struct{})
}
nodeStream, err = streaming.NewStream(nodeStreamName(poolName, nodeID), rdb, options.WithStreamLogger(logger))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to create node event stream %q: %w", nodeStreamName(poolName, nodeID), err)
}
if _, err = nodeStream.Add(ctx, evInit, []byte(nodeID)); err != nil {
return nil, fmt.Errorf("AddNode: failed to add init event to node event stream %q: %w", nodeStreamName(poolName, nodeID), err)
}
nodeReader, err = nodeStream.NewReader(ctx, options.WithReaderBlockDuration(o.jobSinkBlockDuration), options.WithReaderStartAtOldest())
if err != nil {
return nil, fmt.Errorf("AddNode: failed to create node event reader for stream %q: %w", nodeStreamName(poolName, nodeID), err)
}
p := &Node{
ID: nodeID,
PoolName: poolName,
nodeKeepAliveMap: nkm,
nodeShutdownMap: nsm,
workerMap: wm,
workerKeepAliveMap: wkm,
workerCleanupMap: wcm,
jobMap: jm,
jobPayloadMap: jpm,
jobPendingMap: jpem,
tickerMap: tm,
workerStreams: sync.Map{},
nodeStreams: sync.Map{},
pendingJobChannels: sync.Map{},
pendingEvents: sync.Map{},
poolStream: poolStream,
poolSink: poolSink,
nodeStream: nodeStream,
nodeReader: nodeReader,
clientOnly: o.clientOnly,
workerTTL: o.workerTTL,
workerShutdownTTL: o.workerShutdownTTL,
ackGracePeriod: o.ackGracePeriod,
h: jumpHash{crc64.New(crc64.MakeTable(crc64.ECMA))},
stop: make(chan struct{}),
closed: closed,
rdb: rdb,
logger: logger,
}
nch := nodeReader.Subscribe()
if o.clientOnly {
logger.Info("client-only")
p.wg.Add(3)
pulse.Go(logger, func() { p.handleNodeEvents(nch) }) // to handle job acks
pulse.Go(logger, func() { p.processInactiveNodes() })
pulse.Go(logger, func() { p.updateNodeKeepAlive() })
return p, nil
}
// create new logger context for goroutines.
logCtx := context.Background()
logCtx = log.WithContext(logCtx, ctx)
p.wg.Add(8) // Increment for all background goroutines
pulse.Go(logger, func() { p.handlePoolEvents(poolSink.Subscribe()) })
pulse.Go(logger, func() { p.handleNodeEvents(nch) })
pulse.Go(logger, func() { p.watchWorkers(logCtx) })
pulse.Go(logger, func() { p.watchShutdown(logCtx) })
pulse.Go(logger, func() { p.processInactiveNodes() })
pulse.Go(logger, func() { p.processInactiveWorkers(logCtx) })
pulse.Go(logger, func() { p.processInactiveJobs(logCtx) })
pulse.Go(logger, func() { p.updateNodeKeepAlive() })
return p, nil
}
// AddWorker adds a new worker to the pool and returns it. The worker starts
// processing jobs immediately. handler can optionally implement the
// NotificationHandler interface to handle notifications.
func (node *Node) AddWorker(ctx context.Context, handler JobHandler) (*Worker, error) {
if node.IsClosed() {
return nil, fmt.Errorf("AddWorker: pool %q is closed", node.PoolName)
}
if node.clientOnly {
return nil, fmt.Errorf("AddWorker: pool %q is client-only", node.PoolName)
}
w, err := newWorker(ctx, node, handler)
if err != nil {
return nil, err
}
node.localWorkers.Store(w.ID, w)
node.workerStreams.Store(w.ID, w.stream)
return w, nil
}
// RemoveWorker stops the worker, removes it from the pool and requeues all its
// jobs.
func (node *Node) RemoveWorker(ctx context.Context, w *Worker) error {
w.stop(ctx)
if err := w.requeueJobs(ctx); err != nil {
node.logger.Error(fmt.Errorf("RemoveWorker: failed to requeue jobs for worker %q: %w", w.ID, err))
}
node.removeWorker(ctx, w.ID)
node.localWorkers.Delete(w.ID)
node.logger.Info("removed worker", "worker", w.ID)
return nil
}
// Workers returns the list of workers running in the local node.
func (node *Node) Workers() []*Worker {
var workers []*Worker
node.localWorkers.Range(func(key, value any) bool {
w := value.(*Worker)
workers = append(workers, &Worker{
ID: w.ID,
CreatedAt: w.CreatedAt,
})
return true
})
return workers
}
// PoolWorkers returns the list of workers running in the entire pool.
func (node *Node) PoolWorkers() []*Worker {
workers := node.workerMap.Map()
poolWorkers := make([]*Worker, 0, len(workers))
for id, createdAt := range workers {
cat, err := strconv.ParseInt(createdAt, 10, 64)
if err != nil {
node.logger.Error(fmt.Errorf("PoolWorkers: failed to parse createdAt %q for worker %q: %w", createdAt, id, err))
continue
}
poolWorkers = append(poolWorkers, &Worker{ID: id, CreatedAt: time.Unix(0, cat)})
}
return poolWorkers
}
// DispatchJob dispatches a job to the worker in the pool that is assigned to
// the job key using consistent hashing.
// It returns:
// - nil if the job is successfully dispatched and started by a worker
// - ErrJobExists if a job with the same key already exists in the pool
// - an error returned by the worker's start handler if the job fails to start
// - an error if the pool is closed or if there's a failure in adding the job
//
// The method blocks until one of the above conditions is met.
func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) error {
job := marshalJob(&Job{Key: key, Payload: payload, CreatedAt: time.Now(), NodeID: node.ID})
return node.dispatchJob(ctx, key, job, false)
}
func (node *Node) dispatchJob(ctx context.Context, key string, job []byte, requeue bool) error {
if node.IsClosed() {
return fmt.Errorf("DispatchJob: pool %q is closed", node.PoolName)
}
if !requeue {
// Check if job already exists in job payloads map
if _, exists := node.jobPayloadMap.Get(key); exists {
node.logger.Info("DispatchJob: job already exists", "key", key)
return fmt.Errorf("%w: job %q", ErrJobExists, key)
}
}
// Check if there's a pending dispatch for this job
pendingTS, exists := node.jobPendingMap.Get(key)
if exists {
if node.isWithinTTL(pendingTS, 0) {
node.logger.Info("DispatchJob: job already dispatched", "key", key)
return fmt.Errorf("%w: job %q is already dispatched", ErrJobExists, key)
}
}
// Set pending timestamp using atomic operation
pendingUntil := time.Now().Add(2 * node.ackGracePeriod).UnixNano()
newTS := strconv.FormatInt(pendingUntil, 10)
if exists {
current, err := node.jobPendingMap.TestAndSet(ctx, key, pendingTS, newTS)
if err != nil {
return fmt.Errorf("DispatchJob: failed to set pending timestamp for job %q: %w", key, err)
}
if current != pendingTS {
return fmt.Errorf("%w: job %q is already being dispatched", ErrJobExists, key)
}
} else {
ok, err := node.jobPendingMap.SetIfNotExists(ctx, key, newTS)
if err != nil {
return fmt.Errorf("DispatchJob: failed to set initial pending timestamp for job %q: %w", key, err)
}
if !ok {
return fmt.Errorf("%w: job %q is already being dispatched", ErrJobExists, key)
}
}
eventID, err := node.poolStream.Add(ctx, evStartJob, job)
if err != nil {
// Clean up pending entry on failure
if _, err := node.jobPendingMap.Delete(ctx, key); err != nil {
node.logger.Error(fmt.Errorf("DispatchJob: failed to clean up pending entry for job %q: %w", key, err))
}
return fmt.Errorf("DispatchJob: failed to add job to stream %q: %w", node.poolStream.Name, err)
}
cherr := make(chan error, 1)
node.pendingJobChannels.Store(eventID, cherr)
timer := time.NewTimer(2 * node.ackGracePeriod)
defer timer.Stop()
select {
case err = <-cherr:
case <-timer.C:
err = fmt.Errorf("DispatchJob: job %q timed out, TTL: %v", key, 2*node.ackGracePeriod)
case <-ctx.Done():
err = ctx.Err()
}
node.pendingJobChannels.Delete(eventID)
close(cherr)
// Clean up pending entry
if _, err := node.jobPendingMap.Delete(ctx, key); err != nil {
node.logger.Error(fmt.Errorf("DispatchJob: failed to clean up pending entry for job %q: %w", key, err))
}
if err != nil {
node.logger.Error(fmt.Errorf("DispatchJob: failed to dispatch job: %w", err), "key", key)
return err
}
node.logger.Info("dispatched", "key", key)
return nil
}
// StopJob stops the job with the given key.
func (node *Node) StopJob(ctx context.Context, key string) error {
if node.IsClosed() {
return fmt.Errorf("StopJob: pool %q is closed", node.PoolName)
}
if _, err := node.poolStream.Add(ctx, evStopJob, marshalJobKey(key)); err != nil {
return fmt.Errorf("StopJob: failed to add stop job to stream %q: %w", node.poolStream.Name, err)
}
node.logger.Info("stop requested", "key", key)
return nil
}
// JobKeys returns the list of keys of the jobs running in the pool.
func (node *Node) JobKeys() []string {
var jobKeys []string
jobByNodes := node.jobMap.Map()
for _, jobs := range jobByNodes {
jobKeys = append(jobKeys, strings.Split(jobs, ",")...)
}
return jobKeys
}
// JobPayload returns the payload of the job with the given key.
// It returns:
// - (payload, true) if the job exists and has a payload
// - (nil, true) if the job exists but has an empty payload
// - (nil, false) if the job does not exist
func (node *Node) JobPayload(key string) ([]byte, bool) {
payload, ok := node.jobPayloadMap.Get(key)
if !ok {
return nil, false
}
if payload == "" {
return nil, true
}
return []byte(payload), true
}
// NotifyWorker notifies the worker that handles the job with the given key.
func (node *Node) NotifyWorker(ctx context.Context, key string, payload []byte) error {
if node.IsClosed() {
return fmt.Errorf("NotifyWorker: pool %q is closed", node.PoolName)
}
if _, err := node.poolStream.Add(ctx, evNotify, marshalNotification(key, payload)); err != nil {
return fmt.Errorf("NotifyWorker: failed to add notification to stream %q: %w", node.poolStream.Name, err)
}
node.logger.Info("notification sent", "key", key)
return nil
}
// Shutdown stops the pool workers gracefully across all nodes. It notifies all
// workers and waits until they are completed. Shutdown prevents the pool nodes
// from creating new workers and the pool workers from accepting new jobs. After
// Shutdown returns, the node object cannot be used anymore and should be
// discarded. One of Shutdown or Close should be called before the node is
// garbage collected unless it is client-only.
func (node *Node) Shutdown(ctx context.Context) error {
if node.IsClosed() {
return nil
}
if node.clientOnly {
return fmt.Errorf("Shutdown: client-only node cannot shutdown worker pool")
}
// Signal all nodes to shutdown.
if _, err := node.nodeShutdownMap.Set(ctx, "shutdown", node.ID); err != nil {
node.logger.Error(fmt.Errorf("Shutdown: failed to set shutdown status in shutdown map: %w", err))
}
<-node.closed // Wait for this node to be closed
node.cleanupPool(ctx)
node.logger.Info("shutdown")
return nil
}
// Close stops the node workers and closes the Redis connection but does
// not stop workers running in other nodes. It requeues all the jobs run by
// workers of the node. One of Shutdown or Close should be called before the
// node is garbage collected unless it is client-only.
func (node *Node) Close(ctx context.Context) error {
return node.close(ctx, false)
}
// IsShutdown returns true if the pool is shutdown.
func (node *Node) IsShutdown() bool {
node.lock.RLock()
defer node.lock.RUnlock()
return node.shutdown
}
// IsClosed returns true if the node is closed.
func (node *Node) IsClosed() bool {
node.lock.RLock()
defer node.lock.RUnlock()
return node.closing
}
// close stops the node and its workers, optionally requeuing jobs. If shutdown
// is true, jobs are not requeued as the pool is being shutdown. Otherwise, jobs
// are requeued to be picked up by other nodes. The method stops all workers,
// waits for background goroutines to complete, cleans up resources and closes
// connections. It is idempotent and can be called multiple times safely.
func (node *Node) close(ctx context.Context, shutdown bool) error {
node.lock.Lock()
if node.closing {
node.lock.Unlock()
return nil
}
node.closing = true
node.lock.Unlock()
// If we're shutting down then stop all the jobs.
if shutdown {
node.stopAllJobs(ctx)
}
// Stop all workers before waiting for goroutines
var wg sync.WaitGroup
node.localWorkers.Range(func(key, value any) bool {
worker := value.(*Worker)
wg.Add(1)
pulse.Go(node.logger, func() {
defer wg.Done()
worker.stop(ctx)
// Remove worker immediately to avoid job requeuing by other nodes
node.removeWorker(ctx, worker.ID)
})
return true
})
wg.Wait()
// Stop all goroutines
close(node.stop)
node.wg.Wait()
// Requeue jobs if not shutting down, after stopping goroutines to avoid receiving new jobs
if !shutdown {
if err := node.requeueAllJobs(ctx); err != nil {
node.logger.Error(fmt.Errorf("close: failed to requeue jobs: %w", err))
}
}
// Cleanup resources
node.cleanupNode(ctx)
// Signal that the node is closed
close(node.closed)
node.logger.Info("closed")
return nil
}
// stopAllJobs stops all jobs running on the node.
func (node *Node) stopAllJobs(ctx context.Context) {
var wg sync.WaitGroup
var total atomic.Int32
node.localWorkers.Range(func(key, value any) bool {
wg.Add(1)
worker := value.(*Worker)
pulse.Go(node.logger, func() {
defer wg.Done()
for _, job := range worker.Jobs() {
if err := worker.stopJob(ctx, job.Key); err != nil {
node.logger.Error(fmt.Errorf("Close: failed to stop job %q for worker %q: %w", job.Key, worker.ID, err))
}
total.Add(1)
}
})
return true
})
wg.Wait()
node.logger.Info("stopped all jobs", "total", total.Load())
}
// handlePoolEvents reads events from the pool job stream.
func (node *Node) handlePoolEvents(c <-chan *streaming.Event) {
defer node.wg.Done()
for {
select {
case ev := <-c:
if err := node.routeWorkerEvent(ev); err != nil {
node.logger.Error(fmt.Errorf("handlePoolEvents: failed to route event: %w", err))
}
case <-node.stop:
node.poolSink.Close(context.Background())
return
}
}
}
// routeWorkerEvent routes a dispatched event to the proper worker.
func (node *Node) routeWorkerEvent(ev *streaming.Event) error {
// Filter out stale events
if time.Since(ev.CreatedAt()) > pendingEventTTL {
node.logger.Debug("routeWorkerEvent: stale event, not routing", "event", ev.EventName, "id", ev.ID, "since", time.Since(ev.CreatedAt()), "TTL", pendingEventTTL)
// Ack the sink event so it does not get redelivered.
if err := node.poolSink.Ack(context.Background(), ev); err != nil {
node.logger.Error(fmt.Errorf("routeWorkerEvent: failed to ack event: %w", err), "event", ev.EventName, "id", ev.ID)
}
return nil
}
// Compute the worker ID that will handle the job.
key := unmarshalJobKey(ev.Payload)
activeWorkers := node.activeWorkers()
if len(activeWorkers) == 0 {
return fmt.Errorf("routeWorkerEvent: no active worker in pool %q", node.PoolName)
}
wid := activeWorkers[node.h.Hash(key, int64(len(activeWorkers)))]
// Stream the event to the worker corresponding to the key hash.
stream, err := node.getWorkerStream(wid)
if err != nil {
return err
}
eventID, err := stream.Add(context.Background(), ev.EventName, marshalEnvelope(node.ID, ev.Payload), options.WithOnlyIfStreamExists())
if err != nil {
return fmt.Errorf("routeWorkerEvent: failed to add event %s to worker stream %q: %w", ev.EventName, workerStreamName(wid), err)
}
node.logger.Debug("routed", "event", ev.EventName, "id", ev.ID, "worker", wid, "worker-event-id", eventID)
// Record the event in the pending events map for future ack.
node.pendingEvents.Store(pendingEventKey(wid, eventID), ev)
return nil
}
// handleNodeEvents reads events from the node event stream and acks the pending
// events that correspond to jobs that are now running or done.
func (node *Node) handleNodeEvents(c <-chan *streaming.Event) {
defer node.wg.Done()
for {
select {
case ev := <-c:
node.processNodeEvent(ev)
case <-node.stop:
node.nodeReader.Close()
return
}
}
}
// processNodeEvent processes a node event.
func (node *Node) processNodeEvent(ev *streaming.Event) {
switch ev.EventName {
case evInit:
// Event sent by pool node to initialize the node event stream.
node.logger.Debug("handleNodeEvents: received init node", "event", ev.EventName, "id", ev.ID)
case evAck:
// Event sent by worker to ack a dispatched job.
node.logger.Debug("handleNodeEvents: received ack", "event", ev.EventName, "id", ev.ID)
node.ackWorkerEvent(ev)
case evDispatchReturn:
// Event sent by pool node to node that originally dispatched the job.
node.logger.Debug("handleNodeEvents: received dispatch return", "event", ev.EventName, "id", ev.ID)
node.returnDispatchStatus(ev)
}
}
// ackWorkerEvent acks the pending event that corresponds to the acked job. If
// the event was a dispatched job then it sends a dispatch return event to the
// node that dispatched the job.
func (node *Node) ackWorkerEvent(ev *streaming.Event) {
workerID, payload := unmarshalEnvelope(ev.Payload)
ack := unmarshalAck(payload)
key := pendingEventKey(workerID, ack.EventID)
val, ok := node.pendingEvents.Load(key)
if !ok {
node.logger.Error(fmt.Errorf("ackWorkerEvent: received unknown event %s from worker %s", ack.EventID, workerID))
return
}
pending := val.(*streaming.Event)
ctx := context.Background()
// If a dispatched job then send a return event to the node that
// dispatched the job.
if pending.EventName == evStartJob {
_, nodeID := unmarshalJobKeyAndNodeID(pending.Payload)
stream, err := node.getNodeStream(nodeID)
if err != nil {
node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to create node event stream %q: %w", nodeStreamName(node.PoolName, nodeID), err))
return
}
ack.EventID = pending.ID
if _, err := stream.Add(ctx, evDispatchReturn, marshalAck(ack), options.WithOnlyIfStreamExists()); err != nil {
node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to dispatch return to stream %q: %w", nodeStreamName(node.PoolName, nodeID), err))
}
}
// Ack the sink event so it does not get redelivered.
if err := node.poolSink.Ack(ctx, pending); err != nil {
node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to ack event: %w", err), "event", pending.EventName, "id", pending.ID)
}
node.pendingEvents.Delete(key)
// Garbage collect stale events.
var staleKeys []string
node.pendingEvents.Range(func(key, value any) bool {
ev := value.(*streaming.Event)
if time.Since(ev.CreatedAt()) > pendingEventTTL {
staleKeys = append(staleKeys, key.(string))
node.logger.Error(fmt.Errorf("ackWorkerEvent: stale event, removing from pending events"), "event", ev.EventName, "id", ev.ID, "since", time.Since(ev.CreatedAt()), "TTL", pendingEventTTL)
}
return true
})
for _, key := range staleKeys {
node.pendingEvents.Delete(key)
}
}
// returnDispatchStatus returns the start job result to the caller.
func (node *Node) returnDispatchStatus(ev *streaming.Event) {
ack := unmarshalAck(ev.Payload)
val, ok := node.pendingJobChannels.Load(ack.EventID)
if !ok {
node.logger.Error(fmt.Errorf("returnDispatchStatus: received dispatch return for unknown event"), "id", ack.EventID)
return
}
node.logger.Debug("dispatch return", "event", ev.EventName, "id", ev.ID, "ack-id", ack.EventID)
if val == nil {
// Event was requeued, just clean up
node.pendingJobChannels.Delete(ack.EventID)
return
}
var err error
if ack.Error != "" {
err = errors.New(ack.Error)
}
val.(chan error) <- err
}
// watches monitors the workers replicated map and triggers job rebalancing
// when workers are added or removed from the pool.
func (node *Node) watchWorkers(ctx context.Context) {
defer node.wg.Done()
for {
select {
case <-node.stop:
return
case <-node.workerMap.Subscribe():
node.logger.Debug("watchWorkers: worker map updated")
node.handleWorkerMapUpdate(ctx)
}
}
}
// handleWorkerMapUpdate is called when the worker map is updated.
func (node *Node) handleWorkerMapUpdate(ctx context.Context) {
if node.IsClosed() {
return
}
// First cleanup the local workers that are no longer active.
node.localWorkers.Range(func(key, value any) bool {
worker := value.(*Worker)
if _, ok := node.workerMap.Get(worker.ID); !ok {
// If it's not in the worker map, then it's not active and its jobs
// have already been requeued.
node.logger.Info("handleWorkerMapUpdate: removing inactive local worker", "worker", worker.ID)
if err := node.deleteWorker(worker.ID); err != nil {
node.logger.Error(fmt.Errorf("handleWorkerMapUpdate: failed to delete inactive worker %q: %w", worker.ID, err), "worker", worker.ID)
}
worker.stop(ctx)
node.localWorkers.Delete(key)
return true
}
return true
})
// Then rebalance the jobs across the remaining active workers.
activeWorkers := node.activeWorkers()
if len(activeWorkers) == 0 {
return
}
node.localWorkers.Range(func(key, value any) bool {
worker := value.(*Worker)
worker.rebalance(ctx, activeWorkers)
return true
})
}
// watchShutdown monitors the pool shutdown map and initiates node shutdown when updated.
func (node *Node) watchShutdown(ctx context.Context) {
defer node.wg.Done()
for {
select {
case <-node.stop:
return
case <-node.nodeShutdownMap.Subscribe():
node.logger.Debug("watchShutdown: shutdown map updated")
// Handle shutdown in a separate goroutine to allow this one to exit
pulse.Go(node.logger, func() { node.handleShutdown(ctx) })
}
}
}
// handleShutdown closes the node.
func (node *Node) handleShutdown(ctx context.Context) {
if node.IsClosed() {
return
}
sm := node.nodeShutdownMap.Map()
var requestingNode string
for _, node := range sm {
// There is only one value in the map
requestingNode = node
}
node.logger.Debug("handleShutdown: shutting down", "requested-by", requestingNode)
node.close(ctx, true)
node.lock.Lock()
node.shutdown = true
node.lock.Unlock()
node.logger.Info("shutdown", "requested-by", requestingNode)
}
// processInactiveNodes periodically checks for inactive nodes and destroys their streams.
func (node *Node) processInactiveNodes() {
defer node.wg.Done()
ticker := time.NewTicker(node.workerTTL)
defer ticker.Stop()
for {
select {
case <-node.stop:
return
case <-ticker.C:
node.cleanupInactiveNodes()
}
}
}
// cleanupInactiveNodes checks for inactive nodes, destroys their streams and
// removes them from the keep-alive map.
func (node *Node) cleanupInactiveNodes() {
nodeMap := node.nodeKeepAliveMap.Map()
for nodeID, lastSeen := range nodeMap {
if nodeID == node.ID || node.isWithinTTL(lastSeen, node.workerTTL) {
continue
}
node.logger.Info("cleaning up inactive node", "node", nodeID)
// Clean up node's stream
ctx := context.Background()
stream := nodeStreamName(node.PoolName, nodeID)
if s, err := streaming.NewStream(stream, node.rdb, options.WithStreamLogger(node.logger)); err == nil {
if err := s.Destroy(ctx); err != nil {
node.logger.Error(fmt.Errorf("cleanupInactiveNodes: failed to destroy stream: %w", err))
}
}
// Remove from keep-alive map
if _, err := node.nodeKeepAliveMap.Delete(ctx, nodeID); err != nil {
node.logger.Error(fmt.Errorf("cleanupInactiveNodes: failed to delete node: %w", err))
}
}
}
// processInactiveWorkers periodically cleans up inactive workers.
func (node *Node) processInactiveWorkers(ctx context.Context) {
defer node.wg.Done()
ticker := time.NewTicker(node.workerTTL)
defer ticker.Stop()
for {
select {
case <-node.stop:
return
case <-ticker.C:
node.cleanupInactiveWorkers(ctx)
}
}
}
// cleanupInactiveWorkers ensures all jobs are assigned to active workers by performing
// two types of cleanup:
// 1. Orphaned jobs: finds and requeues jobs assigned to workers that no longer exist
// in the keep-alive map, which can happen if a worker was improperly terminated
// 2. Inactive workers: finds workers that haven't updated their keep-alive timestamp
// within workerTTL duration and requeues their jobs
//
// The cleanup process is distributed and idempotent - multiple nodes can attempt
// cleanup concurrently, but only one will succeed for each worker due to cleanup
// lock acquisition. Jobs are requeued and will be reassigned to active workers
// through consistent hashing.
func (node *Node) cleanupInactiveWorkers(ctx context.Context) {
active := node.activeWorkers()
activeMap := make(map[string]struct{})
for _, id := range active {
activeMap[id] = struct{}{}
}
// Get all workers that need cleanup (either in jobMap or workerMap)
workersToCheck := make(map[string]struct{})
for _, workerID := range node.jobMap.Keys() {
workersToCheck[workerID] = struct{}{}
}
for _, workerID := range node.workerMap.Keys() {
workersToCheck[workerID] = struct{}{}
}
// Check each worker
for workerID := range workersToCheck {
// Skip active workers
if _, ok := activeMap[workerID]; ok {
continue
}
// Skip workers being cleaned up
if cleanupTS, exists := node.workerCleanupMap.Get(workerID); exists {
if node.isWithinTTL(cleanupTS, node.workerTTL) {
node.logger.Debug("cleanupInactiveWorkers: worker already being cleaned up", "worker", workerID)
continue
}
}
// Worker needs cleanup
node.logger.Info("cleanupInactiveWorkers: found inactive worker", "worker", workerID)
node.cleanupWorker(ctx, workerID)
}
}
// cleanupWorker requeues the jobs assigned to the worker and deletes it from
// the pool.
func (node *Node) cleanupWorker(ctx context.Context, workerID string) {
// Try to acquire or clear stale cleanup lock
if !node.acquireCleanupLock(ctx, workerID) {
return
}
// Get the worker's jobs
keys, ok := node.jobMap.GetValues(workerID)
if !ok || len(keys) == 0 {
// Worker has no jobs, just delete it
if err := node.deleteWorker(workerID); err != nil {
node.logger.Error(fmt.Errorf("cleanupWorkerJobs: failed to delete worker: %w", err), "worker", workerID)
}
node.logger.Info("cleaned up worker with no jobs", "worker", workerID)
return
}
// Requeue jobs and process them
var requeued int
for _, key := range keys {
payload, ok := node.JobPayload(key)
if !ok {
node.logger.Error(fmt.Errorf("requeueWorkerJobs: failed to get job payload"), "job", key, "worker", workerID)
requeued++ // We will never be able to requeue this job
continue
}
job := &Job{Key: key, Payload: []byte(payload), CreatedAt: time.Now(), NodeID: node.ID}
if err := node.dispatchJob(ctx, job.Key, marshalJob(job), true); err != nil {
node.logger.Error(fmt.Errorf("requeueWorkerJobs: failed to requeue job: %w", err), "job", job.Key, "worker", workerID)
continue
}
requeued++
}
if len(keys) != requeued {
node.logger.Info("partially requeued stale worker jobs", "requeued", requeued, "jobs", len(keys), "worker", workerID)