-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathpaddle_operator.go
126 lines (104 loc) · 4.02 KB
/
paddle_operator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"flag"
"os"
"time"
log "github.com/inconshreveable/log15"
"github.com/wangkuiyi/candy"
"k8s.io/api/core/v1"
extcli "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
paddleclientset "github.com/baidu/paddle-on-k8s-operator/pkg/client/clientset/versioned"
"github.com/baidu/paddle-on-k8s-operator/pkg/client/clientset/versioned/scheme"
paddleinformers "github.com/baidu/paddle-on-k8s-operator/pkg/client/informers/externalversions"
paddlecontroller "github.com/baidu/paddle-on-k8s-operator/pkg/controller"
"github.com/baidu/paddle-on-k8s-operator/pkg/signals"
)
var (
leaseDuration = 15 * time.Second
renewDuration = 5 * time.Second
retryPeriod = 3 * time.Second
)
func main() {
masterURL := flag.String("master", "", "Address of a kube master.")
kubeConfig := flag.String("kubeconfig", "", "Path to a kube config. Only required if out-of-cluster.")
autoClean := flag.Bool("autoclean", false, "Auto clean pods after terminating job, default false.")
maxLoadDesired := flag.Float64("max_load_desired", 0.97, `Keep the cluster max resource usage around
this value, jobs will scale down if total request is over this level.`)
restartLimit := flag.Int("restartlimit", 5, "Pserver pull image error limit.")
inCluster := flag.Bool("incluster", false, "Controller runs in cluster or out of cluster.")
logLevel := flag.Int("loglevel", 4, "Log level of operator.")
outter := flag.Bool("outter", false, "If this is a opensource version.")
threadNum := flag.Int("thread", 10, "Thread num of work")
flag.Parse()
stopCh := signals.SetupSignalHandler()
handler := log.LvlFilterHandler(log.Lvl(*logLevel), log.StdoutHandler)
log.Root().SetHandler(handler)
//cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeConfig)
var cfg *rest.Config = nil
var err error = nil
if *inCluster {
cfg, err = rest.InClusterConfig()
} else {
cfg, err = clientcmd.BuildConfigFromFlags(*masterURL, *kubeConfig)
}
candy.Must(err)
kubeClient, err := kubernetes.NewForConfig(cfg)
candy.Must(err)
extapiClient, err := extcli.NewForConfig(cfg)
candy.Must(err)
paddleClient, err := paddleclientset.NewForConfig(cfg)
candy.Must(err)
hostname, err := os.Hostname()
candy.Must(err)
run := func(stop <-chan struct{}) {
log.Info("I won the leader election", "hostname", hostname)
paddleInformer := paddleinformers.NewSharedInformerFactory(paddleClient, time.Second*10)
controller := paddlecontroller.New(kubeClient, extapiClient, paddleClient, paddleInformer, *autoClean,
*restartLimit, *outter)
go paddleInformer.Start(stopCh)
if controller.Run(*threadNum, *maxLoadDesired, stopCh); err != nil {
log.Error("Error running paddle trainingjob controller", "error", err.Error())
return
}
}
stop := func() {
log.Error("I lost the leader election", "hostname", hostname)
return
}
leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(cfg, "leader-election"))
if err != nil {
log.Error("Error building leader election clientset", "error", err.Error())
return
}
// Prepare event clients.
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "trainingjob-controller"})
lock := &resourcelock.EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "trainingjob-controller",
},
Client: leaderElectionClient.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: hostname,
EventRecorder: recorder,
},
}
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: leaseDuration,
RenewDeadline: renewDuration,
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: stop,
},
})
}