From 2a74656643093ccdd8eb9b84b41555def561a70d Mon Sep 17 00:00:00 2001 From: qizheng09 Date: Thu, 1 Mar 2018 18:51:42 +0800 Subject: [PATCH 1/2] add updater struct --- go/updater/trainingJobUpdater.go | 81 ++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 go/updater/trainingJobUpdater.go diff --git a/go/updater/trainingJobUpdater.go b/go/updater/trainingJobUpdater.go new file mode 100644 index 00000000..86b559fa --- /dev/null +++ b/go/updater/trainingJobUpdater.go @@ -0,0 +1,81 @@ +package updater + +import ( + padv1 "github.com/PaddlePaddle/cloud/go/apis/paddlepaddle/v1" + trainingJobClient "github.com/PaddlePaddle/cloud/go/client/clientset/versioned" + log "github.com/golang/glog" + "k8s.io/client-go/kubernetes" + "time" +) + +const ( + retryTimes = 5 + convertedTimerTicker = 10 * time.Second + confirmResourceTicker = 5 * time.Second +) + +type trainingJobEventType string + +const ( + trainingJobEventDelete trainingJobEventType = "Delete" + trainingJobEventModify trainingJobEventType = "Modify" +) + +type trainingJobEvent struct { + // pet is the TrainingJobEventType of TrainingJob + pet trainingJobEventType + // The job transfer the information fo job + job *padv1.TrainingJob +} + +// TrainingJobUpdater is to manager a specific TrainingJob +type TrainingJobUpdater struct { + // Job is the job the TrainingJob manager. + job *padv1.TrainingJob + + // kubeCli is standard kubernetes client. + kubeCli kubernetes.Interface + + // TrainingJobClient is the client of TrainingJob. + trainingJobClient trainingJobClient.Interface + + // Status is the status in memory, update when TrainingJob status changed and update the CRD resource status. + status padv1.TrainingJobStatus + + // EventCh is the channel received by Controller, include Modify and Delete. + // When trainingJobEvent is Delete it will delete all resources + // The maximum is 1000. + eventCh chan *trainingJobEvent +} + +func initUpdater(job *padv1.TrainingJob, kubeCli kubernetes.Interface, trainingJobClient trainingJobClient.Interface) (*TrainingJobUpdater, + error) { + jobber := &TrainingJobUpdater{ + job: job, + kubeCli: kubeCli, + trainingJobClient: trainingJobClient, + status: job.Status, + eventCh: make(chan *trainingJobEvent, 1000), + } + return jobber, nil +} + +// NewUpdater return a trainingJobUpdater for controller. +func NewUpdater(job *padv1.TrainingJob, kubeCli kubernetes.Interface, trainingJobClient trainingJobClient.Interface) (*TrainingJobUpdater, + error) { + log.Infof("NewJobber namespace=%v name=%v", job.Namespace, job.Name) + jobber, err := initUpdater(job, kubeCli, trainingJobClient) + if err != nil { + return nil, err + } + + go jobber.start() + return jobber, nil +} + +// Start is the main process of life cycle of a TrainingJob, including create resources, event process handle and +// status convert. +func (updater *TrainingJobUpdater) start() { + // TODO(zhengqi): this will commit in the next pr + +} From 1e45a623223ec41bf8df9de671de218f7329e146 Mon Sep 17 00:00:00 2001 From: qizheng09 Date: Fri, 2 Mar 2018 14:07:16 +0800 Subject: [PATCH 2/2] update comments --- go/updater/trainingJobUpdater.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/go/updater/trainingJobUpdater.go b/go/updater/trainingJobUpdater.go index 86b559fa..dd2a5d4f 100644 --- a/go/updater/trainingJobUpdater.go +++ b/go/updater/trainingJobUpdater.go @@ -28,13 +28,13 @@ type trainingJobEvent struct { job *padv1.TrainingJob } -// TrainingJobUpdater is to manager a specific TrainingJob +// TrainingJobUpdater is used to manage a specific TrainingJob type TrainingJobUpdater struct { // Job is the job the TrainingJob manager. job *padv1.TrainingJob - // kubeCli is standard kubernetes client. - kubeCli kubernetes.Interface + // kubeClient is standard kubernetes client. + kubeClient kubernetes.Interface // TrainingJobClient is the client of TrainingJob. trainingJobClient trainingJobClient.Interface @@ -42,17 +42,17 @@ type TrainingJobUpdater struct { // Status is the status in memory, update when TrainingJob status changed and update the CRD resource status. status padv1.TrainingJobStatus - // EventCh is the channel received by Controller, include Modify and Delete. + // EventCh receives events from the controller, include Modify and Delete. // When trainingJobEvent is Delete it will delete all resources - // The maximum is 1000. + // The capacity is 1000. eventCh chan *trainingJobEvent } -func initUpdater(job *padv1.TrainingJob, kubeCli kubernetes.Interface, trainingJobClient trainingJobClient.Interface) (*TrainingJobUpdater, +func initUpdater(job *padv1.TrainingJob, kubeClient kubernetes.Interface, trainingJobClient trainingJobClient.Interface) (*TrainingJobUpdater, error) { jobber := &TrainingJobUpdater{ job: job, - kubeCli: kubeCli, + kubeClient: kubeClient, trainingJobClient: trainingJobClient, status: job.Status, eventCh: make(chan *trainingJobEvent, 1000), @@ -61,10 +61,10 @@ func initUpdater(job *padv1.TrainingJob, kubeCli kubernetes.Interface, trainingJ } // NewUpdater return a trainingJobUpdater for controller. -func NewUpdater(job *padv1.TrainingJob, kubeCli kubernetes.Interface, trainingJobClient trainingJobClient.Interface) (*TrainingJobUpdater, +func NewUpdater(job *padv1.TrainingJob, kubeClient kubernetes.Interface, trainingJobClient trainingJobClient.Interface) (*TrainingJobUpdater, error) { log.Infof("NewJobber namespace=%v name=%v", job.Namespace, job.Name) - jobber, err := initUpdater(job, kubeCli, trainingJobClient) + jobber, err := initUpdater(job, kubeClient, trainingJobClient) if err != nil { return nil, err }