/
service.go
141 lines (118 loc) · 3.33 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package joblogger
import (
"context"
"io"
"os"
"path/filepath"
"sync"
"time"
pkgerrors "github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
type Option func(logger *JobLogger)
type JobLogger struct {
out io.Writer
clusters []string
interval time.Duration
jobSetId string
queue string
namespace string
podMap sync.Map
clientsets map[string]*kubernetes.Clientset
}
func New(clusters []string, namespace string, opts ...Option) (*JobLogger, error) {
jl := &JobLogger{
out: os.Stdout,
clusters: clusters,
namespace: namespace,
interval: 5 * time.Second,
clientsets: make(map[string]*kubernetes.Clientset, len(clusters)),
}
for _, opt := range opts {
opt(jl)
}
if err := jl.validateConfig(); err != nil {
return nil, pkgerrors.WithMessage(err, "error validating supplied configuration")
}
return jl, nil
}
func WithOutput(output io.Writer) Option {
return func(srv *JobLogger) {
srv.out = output
}
}
func WithJobSetId(jobSetId string) Option {
return func(srv *JobLogger) {
srv.jobSetId = jobSetId
}
}
func WithQueue(queue string) Option {
return func(srv *JobLogger) {
srv.queue = queue
}
}
func (srv *JobLogger) validateConfig() error {
if srv.namespace == "" {
return pkgerrors.New("namespace must be configured")
}
return nil
}
func (srv *JobLogger) Run(ctx context.Context) error {
if len(srv.clusters) == 0 {
return pkgerrors.New("no executor clusters configured to scrape for logs")
}
kubeconfig, err := getKubeConfigPath()
if err != nil {
return pkgerrors.WithMessage(err, "cannot find kubeconfig path")
}
if _, err := os.Stat(kubeconfig); err != nil {
return pkgerrors.WithMessagef(err, "error checking does kubeconfig file exist at %s", kubeconfig)
}
g, ctx := errgroup.WithContext(ctx)
for i := range srv.clusters {
kubectx := srv.clusters[i]
clientset, err := newClientsetForKubectx(kubectx, kubeconfig)
if err != nil {
return pkgerrors.WithMessagef(err, "error creating clientset for kubectx %s", kubectx)
}
srv.clientsets[kubectx] = clientset
g.Go(func() error {
return srv.runWatcher(ctx, kubectx, clientset)
})
}
g.Go(func() error {
return srv.runScraper(ctx)
})
return g.Wait()
}
func getKubeConfigPath() (string, error) {
if path, exists := os.LookupEnv("KUBECONFIG"); exists {
return path, nil
} else if home := homedir.HomeDir(); home != "" {
return filepath.Join(home, ".kube", "config"), nil
} else {
return "", pkgerrors.New("neither $KUBECONFIG or $HOME envvars are configured")
}
}
func buildConfigFromFlags(kubectx, kubeconfigPath string) (*rest.Config, error) {
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&clientcmd.ConfigOverrides{
CurrentContext: kubectx,
}).ClientConfig()
}
func newClientsetForKubectx(kubectx, kubeconfig string) (*kubernetes.Clientset, error) {
config, err := buildConfigFromFlags(kubectx, kubeconfig)
if err != nil {
return nil, pkgerrors.WithMessagef(err, "error creating config for kubecontext: %s", kubectx)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, pkgerrors.WithMessage(err, "error creating clientset")
}
return clientset, nil
}