forked from zalando/postgres-operator
/
logs_and_api.go
238 lines (191 loc) · 5.64 KB
/
logs_and_api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
package controller
import (
"fmt"
"sort"
"sync/atomic"
"github.com/sirupsen/logrus"
"github.com/zalando/postgres-operator/pkg/cluster"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
"k8s.io/apimachinery/pkg/types"
)
// ClusterStatus provides status of the cluster
func (c *Controller) ClusterStatus(team, namespace, cluster string) (*cluster.ClusterStatus, error) {
clusterName := spec.NamespacedName{
Namespace: namespace,
Name: team + "-" + cluster,
}
c.clustersMu.RLock()
cl, ok := c.clusters[clusterName]
c.clustersMu.RUnlock()
if !ok {
return nil, fmt.Errorf("could not find cluster")
}
status := cl.GetStatus()
status.Worker = c.clusterWorkerID(clusterName)
return status, nil
}
// ClusterDatabasesMap returns for each cluster the list of databases running there
func (c *Controller) ClusterDatabasesMap() map[string][]string {
m := make(map[string][]string)
// avoid modifying the cluster list while we are fetching each one of them.
c.clustersMu.RLock()
defer c.clustersMu.RUnlock()
for _, cluster := range c.clusters {
// GetSpec holds the specMu lock of a cluster
if spec, err := cluster.GetSpec(); err == nil {
for database := range spec.Spec.Databases {
m[cluster.Name] = append(m[cluster.Name], database)
}
sort.Strings(m[cluster.Name])
} else {
c.logger.Warningf("could not get the list of databases for cluster %q: %v", cluster.Name, err)
}
}
return m
}
// TeamClusterList returns team-clusters map
func (c *Controller) TeamClusterList() map[string][]spec.NamespacedName {
return c.teamClusters
}
// GetConfig returns controller config
func (c *Controller) GetConfig() *spec.ControllerConfig {
return &c.config
}
// GetOperatorConfig returns operator config
func (c *Controller) GetOperatorConfig() *config.Config {
return c.opConfig
}
// GetStatus dumps current config and status of the controller
func (c *Controller) GetStatus() *spec.ControllerStatus {
c.clustersMu.RLock()
clustersCnt := len(c.clusters)
c.clustersMu.RUnlock()
queueSizes := make(map[int]int, c.opConfig.Workers)
for workerID, queue := range c.clusterEventQueues {
queueSizes[workerID] = len(queue.ListKeys())
}
return &spec.ControllerStatus{
LastSyncTime: atomic.LoadInt64(&c.lastClusterSyncTime),
Clusters: clustersCnt,
WorkerQueueSize: queueSizes,
}
}
// ClusterLogs dumps cluster ring logs
func (c *Controller) ClusterLogs(team, namespace, name string) ([]*spec.LogEntry, error) {
clusterName := spec.NamespacedName{
Namespace: namespace,
Name: team + "-" + name,
}
c.clustersMu.RLock()
cl, ok := c.clusterLogs[clusterName]
c.clustersMu.RUnlock()
if !ok {
return nil, fmt.Errorf("could not find cluster")
}
res := make([]*spec.LogEntry, 0)
for _, e := range cl.Walk() {
logEntry := e.(*spec.LogEntry)
logEntry.ClusterName = nil
res = append(res, logEntry)
}
return res, nil
}
// WorkerLogs dumps logs of the worker
func (c *Controller) WorkerLogs(workerID uint32) ([]*spec.LogEntry, error) {
lg, ok := c.workerLogs[workerID]
if !ok {
return nil, fmt.Errorf("could not find worker")
}
res := make([]*spec.LogEntry, 0)
for _, e := range lg.Walk() {
logEntry := e.(*spec.LogEntry)
logEntry.Worker = nil
res = append(res, logEntry)
}
return res, nil
}
// Levels returns logrus levels for which hook must fire
func (c *Controller) Levels() []logrus.Level {
return logrus.AllLevels
}
// Fire is a logrus hook
func (c *Controller) Fire(e *logrus.Entry) error {
var clusterName spec.NamespacedName
v, ok := e.Data["cluster-name"]
if !ok {
return nil
}
clusterName = v.(spec.NamespacedName)
c.clustersMu.RLock()
clusterRingLog, ok := c.clusterLogs[clusterName]
c.clustersMu.RUnlock()
if !ok {
return nil
}
logEntry := &spec.LogEntry{
Time: e.Time,
Level: e.Level,
ClusterName: &clusterName,
Message: e.Message,
}
if v, hasWorker := e.Data["worker"]; hasWorker {
id := v.(uint32)
logEntry.Worker = &id
}
clusterRingLog.Insert(logEntry)
if logEntry.Worker == nil {
return nil
}
c.workerLogs[*logEntry.Worker].Insert(logEntry) // workerLogs map is immutable. No need to lock it
return nil
}
// ListQueue dumps cluster event queue of the provided worker
func (c *Controller) ListQueue(workerID uint32) (*spec.QueueDump, error) {
if workerID >= uint32(len(c.clusterEventQueues)) {
return nil, fmt.Errorf("could not find worker")
}
q := c.clusterEventQueues[workerID]
return &spec.QueueDump{
Keys: q.ListKeys(),
List: q.List(),
}, nil
}
// GetWorkersCnt returns number of the workers
func (c *Controller) GetWorkersCnt() uint32 {
return c.opConfig.Workers
}
//WorkerStatus provides status of the worker
func (c *Controller) WorkerStatus(workerID uint32) (*cluster.WorkerStatus, error) {
obj, ok := c.curWorkerCluster.Load(workerID)
if !ok || obj == nil {
return nil, nil
}
cl, ok := obj.(*cluster.Cluster)
if !ok {
return nil, fmt.Errorf("could not cast to Cluster struct")
}
return &cluster.WorkerStatus{
CurrentCluster: types.NamespacedName(util.NameFromMeta(cl.ObjectMeta)),
CurrentProcess: cl.GetCurrentProcess(),
}, nil
}
// ClusterHistory dumps history of cluster changes
func (c *Controller) ClusterHistory(team, namespace, name string) ([]*spec.Diff, error) {
clusterName := spec.NamespacedName{
Namespace: namespace,
Name: team + "-" + name,
}
c.clustersMu.RLock()
cl, ok := c.clusterHistory[clusterName]
c.clustersMu.RUnlock()
if !ok {
return nil, fmt.Errorf("could not find cluster")
}
res := make([]*spec.Diff, 0)
for _, e := range cl.Walk() {
res = append(res, e.(*spec.Diff))
}
return res, nil
}