-
Notifications
You must be signed in to change notification settings - Fork 260
/
cluster_logs.go
135 lines (117 loc) · 3.53 KB
/
cluster_logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
Copyright The CloudNativePG Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logs
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cloudnative-pg/internal/cmd/plugin"
"github.com/cloudnative-pg/cloudnative-pg/pkg/utils/logs"
)
// clusterLogs contains the options and context to retrieve cluster logs
type clusterLogs struct {
ctx context.Context
clusterName string
namespace string
timestamp bool
tailLines int64
outputFile string
follow bool
client kubernetes.Interface
}
func getCluster(cl clusterLogs) (*cnpgv1.Cluster, error) {
var cluster cnpgv1.Cluster
err := plugin.Client.Get(cl.ctx,
types.NamespacedName{Namespace: cl.namespace, Name: cl.clusterName},
&cluster)
return &cluster, err
}
func getStreamClusterLogs(cluster *cnpgv1.Cluster, cl clusterLogs) logs.ClusterStreamingRequest {
var sinceTime *metav1.Time
var tail *int64
if cl.timestamp {
sinceTime = &metav1.Time{Time: time.Now().UTC()}
}
if cl.tailLines >= 0 {
tail = &cl.tailLines
}
return logs.ClusterStreamingRequest{
Cluster: cluster,
Options: &corev1.PodLogOptions{
Timestamps: cl.timestamp,
Follow: cl.follow,
SinceTime: sinceTime,
TailLines: tail,
},
Client: cl.client,
}
}
// followCluster will tail all pods in the cluster, and will watch for any
// new pods
//
// It will write lines to standard-out, and will only return when there are
// no pods left, or it is interrupted by the user
func followCluster(cl clusterLogs) error {
cluster, err := getCluster(cl)
if err != nil {
return fmt.Errorf("could not get cluster: %w", err)
}
streamClusterLogs := getStreamClusterLogs(cluster, cl)
return streamClusterLogs.SingleStream(cl.ctx, os.Stdout)
}
// saveClusterLogs will tail all pods in the cluster, and read their logs
// until the present time, then exit.
//
// It will write lines to standard-out, or to a file if the `file` argument
// is provided.
func saveClusterLogs(cl clusterLogs) error {
cluster, err := getCluster(cl)
if err != nil {
return fmt.Errorf("could not get cluster: %w", err)
}
var output io.Writer = os.Stdout
if cl.outputFile != "" {
outputFile, err := os.Create(filepath.Clean(cl.outputFile))
if err != nil {
return fmt.Errorf("could not create file: %w", err)
}
output = outputFile
defer func() {
errF := outputFile.Sync()
if errF != nil && err == nil {
err = fmt.Errorf("could not flush file: %w", errF)
}
errF = outputFile.Close()
if errF != nil && err == nil {
err = fmt.Errorf("could not close file: %w", errF)
}
}()
}
streamClusterLogs := getStreamClusterLogs(cluster, cl)
err = streamClusterLogs.SingleStream(cl.ctx, output)
if err != nil {
return fmt.Errorf("could not stream the logs: %w", err)
}
if cl.outputFile != "" {
fmt.Printf("Successfully written logs to \"%s\"\n", cl.outputFile)
}
return nil
}