-
Notifications
You must be signed in to change notification settings - Fork 85
/
node_info_publisher.go
133 lines (114 loc) · 4.14 KB
/
node_info_publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package routing
import (
"context"
"sync"
"time"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/pubsub"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/rs/zerolog/log"
)
type NodeInfoPublisherIntervalConfig struct {
// Interval is the interval between publishing node info
Interval time.Duration
// During node startup, we can publish node info more frequently to speed up the discovery process.
// EagerPublishInterval is the interval between publishing node info during startup.
EagerPublishInterval time.Duration
// EagerPublishDuration is the duration of the eager publish period. After this period, the node will publish node info
// with the standard interval.
EagerPublishDuration time.Duration
}
// IsZero returns true if the interval config is zero
func (n NodeInfoPublisherIntervalConfig) IsZero() bool {
return n.Interval == 0 && n.EagerPublishInterval == 0 && n.EagerPublishDuration == 0
}
// IsEagerPublishEnabled returns true if eager publish is enabled
func (n NodeInfoPublisherIntervalConfig) IsEagerPublishEnabled() bool {
return n.EagerPublishInterval > 0 && n.EagerPublishDuration > 0
}
type NodeInfoPublisherParams struct {
PubSub pubsub.Publisher[models.NodeInfo]
NodeInfoProvider models.NodeInfoProvider
IntervalConfig NodeInfoPublisherIntervalConfig
}
type NodeInfoPublisher struct {
pubSub pubsub.Publisher[models.NodeInfo]
nodeInfoProvider models.NodeInfoProvider
intervalConfig NodeInfoPublisherIntervalConfig
stopped bool
stopChannel chan struct{}
stopOnce sync.Once
}
func NewNodeInfoPublisher(params NodeInfoPublisherParams) *NodeInfoPublisher {
p := &NodeInfoPublisher{
pubSub: params.PubSub,
nodeInfoProvider: params.NodeInfoProvider,
intervalConfig: params.IntervalConfig,
stopChannel: make(chan struct{}),
}
go func() {
if p.intervalConfig.IsEagerPublishEnabled() {
p.eagerPublishBackgroundTask()
} else {
p.standardPublishBackgroundTask()
}
}()
return p
}
// Publish publishes the node info to the pubsub topic manually and won't wait for the background task to do it.
func (n *NodeInfoPublisher) Publish(ctx context.Context) error {
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/routing.NodeInfoPublisher.publish")
defer span.End()
return n.pubSub.Publish(ctx, n.nodeInfoProvider.GetNodeInfo(ctx))
}
func (n *NodeInfoPublisher) eagerPublishBackgroundTask() {
ctx, cancel := context.WithTimeout(context.Background(), n.intervalConfig.EagerPublishDuration)
log.Ctx(ctx).Debug().Msgf("Starting eager publish background task with interval %v for %v",
n.intervalConfig.EagerPublishInterval, n.intervalConfig.EagerPublishDuration)
n.publishBackgroundTask(ctx, n.intervalConfig.EagerPublishInterval)
cancel()
// start standard publish background task after eager publish if it wasn't stopped
if !n.stopped {
log.Ctx(ctx).Debug().Msgf("Starting standard publish background task with interval %v", n.intervalConfig.Interval)
n.standardPublishBackgroundTask()
}
}
func (n *NodeInfoPublisher) standardPublishBackgroundTask() {
n.publishBackgroundTask(context.Background(), n.intervalConfig.Interval)
}
func (n *NodeInfoPublisher) publishBackgroundTask(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
func() {
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/routing.NodeInfoPublisher.publishBackgroundTask") //nolint:govet
defer span.End()
err := n.Publish(ctx)
if err != nil {
if err == context.Canceled {
log.Ctx(ctx).Debug().Msg("gracefully shutting down ipfs node due to context cancellation")
} else {
log.Ctx(ctx).Err(err).Msg("failed to publish node info")
}
}
}()
case <-n.stopChannel:
log.Ctx(ctx).Debug().Msg("stopped publishing node info")
ticker.Stop()
return
}
}
}
// Stop stops the background task that publishes the node info periodically
func (n *NodeInfoPublisher) Stop(ctx context.Context) {
if n == nil {
return
}
n.stopOnce.Do(func() {
n.stopped = true
n.stopChannel <- struct{}{}
})
}