-
Notifications
You must be signed in to change notification settings - Fork 168
/
handler.go
148 lines (121 loc) · 4.09 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package pod
import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"github.com/caicloud/cyclone/pkg/util/k8s"
"github.com/caicloud/cyclone/pkg/workflow/controller"
"github.com/caicloud/cyclone/pkg/workflow/controller/handlers"
)
// Handler ...
type Handler struct {
ClusterClient kubernetes.Interface
Client k8s.Interface
}
// Ensure *Handler has implemented handlers.Interface interface.
var _ handlers.Interface = (*Handler)(nil)
const (
// finalizerPod is the cyclone related finalizer key for kubernetes pod.
finalizerPod string = "pod.cyclone.dev/finalizer"
)
// NewHandler ...
func NewHandler(client k8s.Interface, clusterClient kubernetes.Interface) *Handler {
return &Handler{
Client: client,
ClusterClient: clusterClient,
}
}
// Reconcile compares the actual state with the desired, and attempts to
// converge the two.
func (h *Handler) Reconcile(obj interface{}) (res controller.Result, err error) {
// If Workflow Controller got restarted, previous started pods would be
// observed by controller with create event. We need to handle update in
// this case as well. Otherwise WorkflowRun may stuck in running state.
originPod, ok := obj.(*corev1.Pod)
if !ok {
log.WithField("obj", obj).Warning("Expect Pod, got unknown type resource")
return res, fmt.Errorf("unknown resource type")
}
pod := originPod.DeepCopy()
return res, h.onUpdate(pod)
}
func (h *Handler) onUpdate(pod *corev1.Pod) error {
log.WithField("name", pod.Name).Debug("Observed pod updated")
// Check whether it's GC pod.
if IsGCPod(pod) {
GCPodUpdated(h.ClusterClient, pod)
return nil
}
// For stage pod, create operator to handle it.
operator, err := NewOperator(h.ClusterClient, h.Client, pod)
if err != nil {
log.Error("Create operator error: ", err)
return err
}
if err := operator.OnUpdated(); err != nil {
log.WithField("pod", pod.Name).Error("process updated pod error: ", err)
return err
}
return nil
}
// finalize ...
func (h *Handler) finalize(pod *corev1.Pod) error {
operator, err := NewOperator(h.ClusterClient, h.Client, pod)
if err != nil {
log.Error("Create operator error: ", err)
return err
}
if err := operator.OnDelete(); err != nil {
log.WithField("pod", pod.Name).Error("process deleted pod error: ", err)
return err
}
return nil
}
// AddFinalizer adds a finalizer to the object and update the object to the Kubernetes.
func (h *Handler) AddFinalizer(obj interface{}) (bool, error) {
originPod, ok := obj.(*corev1.Pod)
if !ok {
log.WithField("obj", obj).Warning("Expect Pod, got unknown type resource")
return false, fmt.Errorf("unknown resource type")
}
// Check whether it's workload pod.
if !IsWorkloadPod(originPod) {
return false, nil
}
if sets.NewString(originPod.Finalizers...).Has(finalizerPod) {
return false, nil
}
log.WithField("name", originPod.Name).Debug("Start to add finalizer for pod")
pod := originPod.DeepCopy()
pod.ObjectMeta.Finalizers = append(pod.ObjectMeta.Finalizers, finalizerPod)
_, err := h.ClusterClient.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
return true, err
}
// HandleFinalizer does the finalizer key representing things.
func (h *Handler) HandleFinalizer(obj interface{}) error {
originPod, ok := obj.(*corev1.Pod)
if !ok {
log.WithField("obj", obj).Warning("Expect Pod, got unknown type resource")
return fmt.Errorf("unknown resource type")
}
// Check whether it's workload pod.
if !IsWorkloadPod(originPod) {
return nil
}
if !sets.NewString(originPod.Finalizers...).Has(finalizerPod) {
return nil
}
log.WithField("name", originPod.Name).Debug("Start to process finalizer for pod")
// Handler finalizer
pod := originPod.DeepCopy()
if err := h.finalize(pod); err != nil {
return nil
}
pod.ObjectMeta.Finalizers = sets.NewString(pod.ObjectMeta.Finalizers...).Delete(finalizerPod).UnsortedList()
_, err := h.ClusterClient.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
return err
}