forked from spotahome/kooper
/
main.go
134 lines (115 loc) · 3.59 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package main
import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
"time"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"github.com/adevjoe/kooper/v2/controller"
"github.com/adevjoe/kooper/v2/log"
kooperlogrus "github.com/adevjoe/kooper/v2/log/logrus"
)
var (
concurrentWorkers int
sleepMS int
intervalS int
retries int
disableResync bool
)
func initFlags() error {
fg := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
fg.IntVar(&concurrentWorkers, "concurrency", 3, "The number of concurrent event handling")
fg.IntVar(&sleepMS, "sleep-ms", 25, "The number of milliseconds to sleep on each event handling")
fg.IntVar(&intervalS, "interval-s", 45, "The number of seconds to for reconciliation loop intervals")
fg.IntVar(&retries, "retries", 3, "The number of retries in case of error")
fg.BoolVar(&disableResync, "disable-resync", false, "Disables the resync")
err := fg.Parse(os.Args[1:])
if err != nil {
return err
}
return nil
}
func sleep() {
time.Sleep(time.Duration(sleepMS) * time.Millisecond)
}
func run() error {
// Initialize logger.
logger := kooperlogrus.New(logrus.NewEntry(logrus.New())).
WithKV(log.KV{"example": "controller-concurrency-handling"})
// Init flags.
if err := initFlags(); err != nil {
return fmt.Errorf("error parsing arguments: %w", err)
}
// Get k8s client.
k8scfg, err := rest.InClusterConfig()
if err != nil {
// No in cluster? letr's try locally
kubehome := filepath.Join(homedir.HomeDir(), ".kube", "config")
k8scfg, err = clientcmd.BuildConfigFromFlags("", kubehome)
if err != nil {
return fmt.Errorf("error loading kubernetes configuration: %w", err)
}
}
k8scli, err := kubernetes.NewForConfig(k8scfg)
if err != nil {
return fmt.Errorf("error creating kubernetes client: %w", err)
}
// Create our retriever so the controller knows how to get/listen for pod events.
retr := controller.MustRetrieverFromListerWatcher(&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return k8scli.CoreV1().Pods("").List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return k8scli.CoreV1().Pods("").Watch(context.TODO(), options)
},
})
// Our domain logic that will print every add/sync/update and delete event we.
hand := controller.HandlerFunc(func(_ context.Context, obj runtime.Object) error {
pod := obj.(*corev1.Pod)
sleep()
logger.Infof("Pod added: %s/%s", pod.Namespace, pod.Name)
return nil
})
// Create the controller.
cfg := &controller.Config{
Name: "controller-concurrency-handling",
Handler: hand,
Retriever: retr,
Logger: logger,
ProcessingJobRetries: retries,
ResyncInterval: time.Duration(intervalS) * time.Second,
ConcurrentWorkers: concurrentWorkers,
DisableResync: disableResync,
}
ctrl, err := controller.New(cfg)
if err != nil {
return fmt.Errorf("could not create controller: %w", err)
}
// Start our controller.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := ctrl.Run(ctx); err != nil {
return fmt.Errorf("error running controller: %w", err)
}
return nil
}
func main() {
err := run()
if err != nil {
fmt.Fprintf(os.Stderr, "error running app: %s", err)
os.Exit(1)
}
os.Exit(0)
}