-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
99 lines (88 loc) · 3.19 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main
import (
//"fmt"
"time"
"sync"
"bytes"
//"strings"
// "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
watch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
log "github.com/sirupsen/logrus"
"net/http"
)
func main() {
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
pods, err := clientset.CoreV1().Pods("default").List(metav1.ListOptions{ LabelSelector: "watch=need" }) // get all pods need to be watched
if err != nil {
panic(err.Error())
}
log.Infof("%d pods need to be watched\n", len(pods.Items))
pods_to_watch := []*v1.Pod{}
for _, item := range pods.Items {
pods_to_watch = append(pods_to_watch, &item)
}
event_listener, err := clientset.CoreV1().Pods("default").Watch(metav1.ListOptions{ LabelSelector: "watch=need" })
if err != nil {
panic(err.Error())
}
var mtx sync.Mutex
go func() {
for e := range event_listener.ResultChan() {
if e.Type == watch.Deleted { // remove pod from watch list
pod := e.Object.(*v1.Pod)
for i, item := range pods_to_watch {
if item.GetName() == pod.GetName() {
mtx.Lock()
pods_to_watch = append(pods_to_watch[:i], pods_to_watch[i+1:]...)
log.Infof("remove pod %s from watch queue", e.Object.(*v1.Pod).GetName())
mtx.Unlock()
break
}
}
}
}
}()
log.Info("watching starts...")
for {
mtx.Lock()
for _, pod := range pods_to_watch {
if pod.Status.Phase == "Running" {
var buffer bytes.Buffer
buffer.WriteString("http://")
buffer.WriteString(pod.Status.PodIP)
buffer.WriteString(":")
buffer.WriteString(pod.Labels["port"])
buffer.WriteString("/?ping=")
buffer.WriteString(pod.Labels["ping"]) // read endpoint for ping purpose
ping_url := buffer.String()
log.Infof("ping url: %s", ping_url)
_, err := http.Get(ping_url) // ping, then reset the pod on failure, will set a configurabe threshold
if err != nil {
// pod.Reset()
panic(err.Error())
log.Infof("reset pod %s due to ping failture", pod.GetName())
} else {
log.Infof("ping passed, pod %s is fine.", pod.GetName())
}
} else if pod.Status.Phase == "Failed" {
//pod.Reset()
log.Infof("reset pod %s due to stauts failture", pod.GetName())
} else {
log.Infof("pod: %s, phase: %s", pod.GetName(), pod.Status.Phase)
}
}
mtx.Unlock()
time.Sleep(30 * time.Second) // will make interval configurable
}
}