/
monitor.go
155 lines (126 loc) · 4.1 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package monitor
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"context"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/sirupsen/logrus"
"k8s.io/client-go/rest"
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/database"
"github.com/Azure/ARO-RP/pkg/database/cosmosdb"
"github.com/Azure/ARO-RP/pkg/env"
"github.com/Azure/ARO-RP/pkg/metrics"
"github.com/Azure/ARO-RP/pkg/proxy"
"github.com/Azure/ARO-RP/pkg/util/bucket"
"github.com/Azure/ARO-RP/pkg/util/heartbeat"
"github.com/Azure/ARO-RP/pkg/util/liveconfig"
)
type monitor struct {
baseLog *logrus.Entry
dialer proxy.Dialer
dbMonitors database.Monitors
dbOpenShiftClusters database.OpenShiftClusters
dbSubscriptions database.Subscriptions
m metrics.Emitter
clusterm metrics.Emitter
mu sync.RWMutex
docs map[string]*cacheDoc
subs map[string]*api.SubscriptionDocument
env env.Interface
isMaster bool
bucketCount int
buckets map[int]struct{}
lastBucketlist atomic.Value //time.Time
lastChangefeed atomic.Value //time.Time
startTime time.Time
liveConfig liveconfig.Manager
hiveShardConfigs map[int]*rest.Config
shardMutex sync.RWMutex
}
type Runnable interface {
Run(context.Context) error
}
func NewMonitor(log *logrus.Entry, dialer proxy.Dialer, dbMonitors database.Monitors, dbOpenShiftClusters database.OpenShiftClusters, dbSubscriptions database.Subscriptions, m, clusterm metrics.Emitter, liveConfig liveconfig.Manager, e env.Interface) Runnable {
return &monitor{
baseLog: log,
dialer: dialer,
dbMonitors: dbMonitors,
dbOpenShiftClusters: dbOpenShiftClusters,
dbSubscriptions: dbSubscriptions,
m: m,
clusterm: clusterm,
docs: map[string]*cacheDoc{},
subs: map[string]*api.SubscriptionDocument{},
env: e,
bucketCount: bucket.Buckets,
buckets: map[int]struct{}{},
startTime: time.Now(),
liveConfig: liveConfig,
hiveShardConfigs: map[int]*rest.Config{},
}
}
func (mon *monitor) Run(ctx context.Context) error {
_, err := mon.dbMonitors.Create(ctx, &api.MonitorDocument{
ID: "master",
})
if err != nil && !cosmosdb.IsErrorStatusCode(err, http.StatusPreconditionFailed) {
return err
}
// fill the cache from the database change feed
go mon.changefeed(ctx, mon.baseLog.WithField("component", "changefeed"), nil)
t := time.NewTicker(10 * time.Second)
defer t.Stop()
go heartbeat.EmitHeartbeat(mon.baseLog, mon.m, "monitor.heartbeat", nil, mon.checkReady)
for {
// register ourself as a monitor
err = mon.dbMonitors.MonitorHeartbeat(ctx)
if err != nil {
mon.baseLog.Error(err)
}
// try to become master and share buckets across registered monitors
err = mon.master(ctx)
if err != nil {
mon.baseLog.Error(err)
}
// read our bucket allocation from the master
err = mon.listBuckets(ctx)
if err != nil {
mon.baseLog.Error(err)
} else {
mon.lastBucketlist.Store(time.Now())
}
<-t.C
}
}
// checkReady checks the ready status of the monitor to make it consistent
// across the /healthz/ready endpoint and emitted metrics. We wait for 2
// minutes before indicating health. This ensures that there will be a gap in
// our health metric if we crash or restart.
func (mon *monitor) checkReady() bool {
lastBucketTime, ok := mon.lastBucketlist.Load().(time.Time)
if !ok {
return false
}
lastChangefeedTime, ok := mon.lastChangefeed.Load().(time.Time)
if !ok {
return false
}
return (time.Since(lastBucketTime) < time.Minute) && // did we list buckets successfully recently?
(time.Since(lastChangefeedTime) < time.Minute) && // did we process the change feed recently?
(time.Since(mon.startTime) > 2*time.Minute) // are we running for at least 2 minutes?
}
func (mon *monitor) getHiveShardConfig(shard int) (*rest.Config, bool) {
mon.shardMutex.RLock()
hiveRestConfig, exists := mon.hiveShardConfigs[shard]
mon.shardMutex.RUnlock()
return hiveRestConfig, exists
}
func (mon *monitor) setHiveShardConfig(shard int, config *rest.Config) {
mon.shardMutex.Lock()
mon.hiveShardConfigs[shard] = config
mon.shardMutex.Unlock()
}