From bb50cc6ab9cedcb711f765aecdfb837cf53096ef Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 17 Sep 2020 18:16:26 -0700 Subject: [PATCH 01/14] arch png --- Makefile | 7 +- conf/queues.yaml | 29 +++++- go.mod | 5 +- go.sum | 2 + pkg/cache/context.go | 6 ++ pkg/cache/node.go | 12 +++ pkg/callback/scheduler_callback.go | 85 ++++++++++++++++++ pkg/common/events/events.go | 17 ++++ pkg/conf/schedulerconf.go | 2 +- pkg/dispatcher/dispatcher.go | 3 + pkg/federation/federation_events.go | 22 +++++ pkg/federation/member.go | 132 ++++++++++++++++++++++++++++ pkg/shim/main.go | 8 ++ pkg/shim/scheduler.go | 2 + 14 files changed, 320 insertions(+), 12 deletions(-) create mode 100644 pkg/federation/federation_events.go create mode 100644 pkg/federation/member.go diff --git a/Makefile b/Makefile index d8e6160a6..f81bff5d5 100644 --- a/Makefile +++ b/Makefile @@ -125,12 +125,7 @@ sched_image: scheduler @coreSHA=$$(go list -m "github.com/apache/incubator-yunikorn-core" | cut -d "-" -f5) ; \ siSHA=$$(go list -m "github.com/apache/incubator-yunikorn-scheduler-interface" | cut -d "-" -f6) ; \ shimSHA=$$(git rev-parse --short=12 HEAD) ; \ - docker build ./deployments/image/configmap -t ${REGISTRY}/yunikorn:scheduler-${VERSION} \ - --label "yunikorn-core-revision=$${coreSHA}" \ - --label "yunikorn-scheduler-interface-revision=$${siSHA}" \ - --label "yunikorn-k8shim-revision=$${shimSHA}" \ - --label "BuildTimeStamp=${DATE}" \ - --label "Version=${VERSION}" + docker build ./deployments/image/configmap -t yunikorn/yunikorn-member:latest @mv -f ./deployments/image/configmap/Dockerfile.bkp ./deployments/image/configmap/Dockerfile @rm -f ./deployments/image/configmap/${BINARY} @rm -rf ./deployments/image/configmap/admission-controller-init-scripts/ diff --git a/conf/queues.yaml b/conf/queues.yaml index 2fd1213a3..aef126e70 100644 --- a/conf/queues.yaml +++ b/conf/queues.yaml @@ -17,10 +17,31 @@ partitions: - name: default - placementrules: - - name: tag - value: namespace - create: true queues: - name: root submitacl: '*' + queues: + - name: advertisement + resources: + guaranteed: + memory: 500000 + vcore: 50000 + max: + memory: 800000 + vcore: 80000 + - name: search + resources: + guaranteed: + memory: 400000 + vcore: 40000 + max: + memory: 600000 + vcore: 60000 + - name: sandbox + resources: + guaranteed: + memory: 100000 + vcore: 10000 + max: + memory: 100000 + vcore: 10000 diff --git a/go.mod b/go.mod index 4ae3fcbc2..b2c918abd 100644 --- a/go.mod +++ b/go.mod @@ -23,12 +23,13 @@ go 1.12 require ( github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200817155620-c19d2b8660d8 github.com/apache/incubator-yunikorn-core v0.0.0-20200827055746-57d663e73cb1 - github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200827013520-ec2f681ecb5b + github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319 github.com/google/uuid v1.1.1 github.com/looplab/fsm v0.1.0 github.com/onsi/ginkgo v1.11.0 github.com/onsi/gomega v1.7.0 go.uber.org/zap v1.13.0 + google.golang.org/grpc v1.26.0 gopkg.in/yaml.v2 v2.2.8 gotest.tools v2.2.0+incompatible k8s.io/api v0.16.13 @@ -61,3 +62,5 @@ replace ( k8s.io/metrics => k8s.io/metrics v0.16.13 k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.16.13 ) + +replace github.com/apache/incubator-yunikorn-scheduler-interface => /Users/wyang/workspace/github/wyang/incubator-yunikorn-scheduler-interface diff --git a/go.sum b/go.sum index aa2171b7e..c042ab8c8 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/apache/incubator-yunikorn-core v0.0.0-20200827055746-57d663e73cb1 h1: github.com/apache/incubator-yunikorn-core v0.0.0-20200827055746-57d663e73cb1/go.mod h1:0fxPfo3PViD/Bd6uuczucdi+tp3uH+vjhZ9Sjymx+uc= github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200827013520-ec2f681ecb5b h1:giSmjG7Ay/8hzql/oG1fNcij8rAsXOsLmHN5d40ZqYs= github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200827013520-ec2f681ecb5b/go.mod h1:ObMs03XFbnmpGD81jYvdUDEVZbHvz8W6dWH5nGDCjc0= +github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319 h1:I12nCcXdHe6W4oysVejFHfQDy0Ix0r+ZHdfooyEoldo= +github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319/go.mod h1:ObMs03XFbnmpGD81jYvdUDEVZbHvz8W6dWH5nGDCjc0= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 6035e9733..a7d256f6c 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -508,6 +508,12 @@ func (ctx *Context) GetApplication(appID string) interfaces.ManagedApp { return nil } +func (ctx *Context) GetNode(nodeID string) *SchedulerNode { + ctx.lock.RLock() + defer ctx.lock.RUnlock() + return ctx.nodes.getNode(nodeID) +} + func (ctx *Context) RemoveApplication(appID string) error { ctx.lock.Lock() defer ctx.lock.Unlock() diff --git a/pkg/cache/node.go b/pkg/cache/node.go index d399d282e..692c4f8a6 100644 --- a/pkg/cache/node.go +++ b/pkg/cache/node.go @@ -248,3 +248,15 @@ func (n *SchedulerNode) enterState(event *fsm.Event) { zap.String("destination", event.Dst), zap.String("event", event.Event)) } + +func (n *SchedulerNode) GetCapacity() *si.Resource { + n.lock.RLock() + defer n.lock.RUnlock() + return n.capacity +} + +func (n *SchedulerNode) GetOccupiedResources() *si.Resource { + n.lock.RLock() + defer n.lock.RUnlock() + return n.occupied +} diff --git a/pkg/callback/scheduler_callback.go b/pkg/callback/scheduler_callback.go index d0dc27716..05df0771a 100644 --- a/pkg/callback/scheduler_callback.go +++ b/pkg/callback/scheduler_callback.go @@ -21,6 +21,10 @@ package callback import ( "fmt" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants" + "github.com/apache/incubator-yunikorn-k8shim/pkg/conf" + "github.com/apache/incubator-yunikorn-k8shim/pkg/federation" "go.uber.org/zap" "github.com/apache/incubator-yunikorn-k8shim/pkg/cache" @@ -53,6 +57,30 @@ func (callback *AsyncRMCallback) RecvUpdateResponse(response *si.UpdateResponse) NodeID: node.NodeID, Event: events.NodeAccepted, }) + + confirmedNode := callback.context.GetNode(node.NodeID) + if confirmedNode != nil { + log.Logger().Info("reporting new node to the head server", + zap.String("nodeID", node.NodeID)) + dispatcher.Dispatch(federation.AsyncUpdateFederationEvent{ + ConfirmedRequests: []*si.UpdateRequest{ + { + NewSchedulableNodes: []*si.NewNodeInfo{ + { + NodeID: node.NodeID, + Attributes: map[string]string{ "compute-node": "yes"}, + SchedulableResource: confirmedNode.GetCapacity(), + OccupiedResource: confirmedNode.GetOccupiedResources(), + }, + }, + RmID: conf.GetSchedulerConf().ClusterID, + }, + }, + }) + } else { + log.Logger().Error("node accepted in sub cluster, but not found in cache", + zap.String("nodeID", node.NodeID)) + } } for _, node := range response.RejectedNodes { @@ -75,6 +103,31 @@ func (callback *AsyncRMCallback) RecvUpdateResponse(response *si.UpdateResponse) ev := cache.NewSimpleApplicationEvent(app.GetApplicationID(), events.AcceptApplication) dispatcher.Dispatch(ev) } + + if appInfo := callback.context.GetApplication(app.ApplicationID); appInfo != nil { + log.Logger().Info("reporting new application to the head server", + zap.String("appID", app.ApplicationID)) + dispatcher.Dispatch(federation.AsyncUpdateFederationEvent{ + ConfirmedRequests: []*si.UpdateRequest{ + { + NewApplications: []*si.AddApplicationRequest{ + { + ApplicationID: app.ApplicationID, + QueueName: appInfo.GetQueue(), + PartitionName: "default", + Ugi: &si.UserGroupInformation{ + User: "anonymous", + }, + }, + }, + RmID: conf.GetSchedulerConf().ClusterID, + }, + }, + }) + } else { + log.Logger().Error("app accepted in sub cluster, but not found in cache", + zap.String("appID", app.ApplicationID)) + } } for _, app := range response.RejectedApplications { @@ -100,6 +153,38 @@ func (callback *AsyncRMCallback) RecvUpdateResponse(response *si.UpdateResponse) if app := callback.context.GetApplication(alloc.ApplicationID); app != nil { ev := cache.NewAllocateTaskEvent(app.GetApplicationID(), alloc.AllocationKey, alloc.UUID, alloc.NodeID) dispatcher.Dispatch(ev) + + if updatedNode := callback.context.GetNode(alloc.NodeID); updatedNode != nil { + if task, err := app.GetTask(alloc.AllocationKey); err == nil { + dispatcher.Dispatch(federation.AsyncUpdateFederationEvent{ + ConfirmedRequests: []*si.UpdateRequest{ + { + UpdatedNodes: []*si.UpdateNodeInfo{ + { + NodeID: alloc.NodeID, + Attributes: map[string]string{"compute-node": "yes"}, + SchedulableResource: updatedNode.GetCapacity(), + OccupiedResource: updatedNode.GetOccupiedResources(), + ExistingAllocations: []*si.Allocation{ + { + AllocationKey: string(task.GetTaskPod().UID), + UUID: string(task.GetTaskPod().UID), + ResourcePerAlloc: common.GetPodResource(task.GetTaskPod()), + QueueName: app.GetQueue(), + NodeID: alloc.NodeID, + ApplicationID: app.GetApplicationID(), + PartitionName: constants.DefaultPartition, + }, + }, + Action: si.UpdateNodeInfo_UPDATE, + }, + }, + RmID: conf.GetSchedulerConf().ClusterID, + }, + }, + }) + } + } } } diff --git a/pkg/common/events/events.go b/pkg/common/events/events.go index 9b85b53c6..3a5ef5137 100644 --- a/pkg/common/events/events.go +++ b/pkg/common/events/events.go @@ -18,6 +18,8 @@ package events +import "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" + const EnterState = "enter_state" //---------------------------------------------- @@ -148,3 +150,18 @@ type SchedulerNodeEvent interface { // state machines' callbacks when doing state transition GetArgs() []interface{} } + +type FederationEventType string + +const ( + Update FederationEventType = "Update" +) + +type FederationEvent interface { + // the type of this event + GetEvent() FederationEventType + + GetConfirmedUpdates() []*si.UpdateRequest + + GetArgs() []interface{} +} diff --git a/pkg/conf/schedulerconf.go b/pkg/conf/schedulerconf.go index cdc52f14b..91df37992 100644 --- a/pkg/conf/schedulerconf.go +++ b/pkg/conf/schedulerconf.go @@ -135,7 +135,7 @@ func initConfigs() { predicateList := flag.String("predicates", "", fmt.Sprintf("comma-separated list of predicates, valid predicates are: %s, "+ "the program will exit if any invalid predicates exist.", predicates.Ordering())) - operatorPluginList := flag.String("operatorPlugins", "general,"+constants.AppManagerHandlerName, + operatorPluginList := flag.String("operatorPlugins", "general", "comma-separated list of operator plugin names, currently, only \"spark-operator-service\""+ "and"+constants.AppManagerHandlerName+"is supported.") diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index a22755c99..c8c13181c 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -42,6 +42,7 @@ const ( EventTypeNode EventTypeScheduler EventTypeAppStatus + EventTypeFederation ) var ( @@ -186,6 +187,8 @@ func Start() { select { case event := <-getDispatcher().eventChan: switch v := event.(type) { + case events.FederationEvent: + getEventHandler(EventTypeFederation)(v) case events.ApplicationStatusEvent: getEventHandler(EventTypeAppStatus)(v) case events.ApplicationEvent: diff --git a/pkg/federation/federation_events.go b/pkg/federation/federation_events.go new file mode 100644 index 000000000..d258bd96a --- /dev/null +++ b/pkg/federation/federation_events.go @@ -0,0 +1,22 @@ +package federation + +import ( + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/events" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" +) + +type AsyncUpdateFederationEvent struct { + ConfirmedRequests []*si.UpdateRequest +} + +func (a AsyncUpdateFederationEvent) GetEvent() events.FederationEventType { + return events.Update +} + +func (a AsyncUpdateFederationEvent) GetArgs() []interface{} { + return nil +} + +func (a AsyncUpdateFederationEvent) GetConfirmedUpdates() []*si.UpdateRequest { + return a.ConfirmedRequests +} diff --git a/pkg/federation/member.go b/pkg/federation/member.go new file mode 100644 index 000000000..380f92b89 --- /dev/null +++ b/pkg/federation/member.go @@ -0,0 +1,132 @@ +package federation + +import ( + "context" + "os" + "sync" + "time" + + "github.com/apache/incubator-yunikorn-k8shim/pkg/conf" + "github.com/apache/incubator-yunikorn-k8shim/pkg/log" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +var once sync.Once +var member *MembershipService + +type MembershipService struct { + headAddress string + client si.SchedulerClient + reportingChan chan *si.UpdateRequest +} + + +func GetMembershipService() *MembershipService { + once.Do(func() { + headAddr := os.Getenv("YUNIKORN_HEAD_ADDRESS") + // Set up a connection to the server. + conn, err := grpc.Dial(headAddr, grpc.WithInsecure()) + if err != nil { + log.Logger().Fatal("failed to connect to the server", + zap.Error(err)) + } + + member = &MembershipService { + headAddress: headAddr, + client: si.NewSchedulerClient(conn), + reportingChan: make(chan *si.UpdateRequest, 1024*1024), + } + }) + return member +} + +// start membership service +func (m *MembershipService) Start() { + // start a go routine that sends updates to the head in gRPC stream + go m.asyncUpdate() +} + +func (m *MembershipService) ReconcileMembership() error { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // register the RM first + if _, err := m.client.RegisterResourceManager(ctx, &si.RegisterResourceManagerRequest{ + RmID: conf.GetSchedulerConf().ClusterID, + Version: "v1", + PolicyGroup: "queues", + }); err != nil { + log.Logger().Error("could not register to the head", + zap.Error(err)) + return err + } + + log.Logger().Info("membership registered", + zap.String("headAddress", m.headAddress)) + return nil +} + +// func (m *MembershipService) asyncUpdate() { +// ctx, cancel := context.WithCancel(context.Background()) +// cleanupFn := func() { +// log.Logger().Warn("release TCP connection") +// cancel() +// } +// defer cleanupFn() +// stream, err := m.client.Update(ctx) +// if err != nil { +// log.Logger().Fatal("failed to get the stream", +// zap.Error(err)) +// } +// +// for { +// select { +// case report :=<- m.reportingChan: +// if err := stream.Send(report); err != nil { +// log.Logger().Error("failed to send UpdateRequest to the server head", +// zap.Any("request", report), +// zap.Error(err)) +// +// } else { +// log.Logger().Info("reporting updates to the head server") +// } +// } +// } +// } + +func (m *MembershipService) asyncUpdate() { + for { + select { + case report :=<- m.reportingChan: + { + ctx := context.Background() + stream, err := m.client.Update(ctx) + if err != nil { + log.Logger().Fatal("failed to get the stream", + zap.Error(err)) + } + if err := stream.Send(report); err != nil { + log.Logger().Error("failed to send UpdateRequest to the server head", + zap.Any("request", report), + zap.Error(err)) + + } else { + log.Logger().Info("reporting updates to the head server") + } + } + } + } +} + +func (m *MembershipService) FederationEventHandler() func(obj interface{}) { + return func(obj interface{}) { + if event, ok := obj.(AsyncUpdateFederationEvent); ok { + for _, update := range event.GetConfirmedUpdates() { + log.Logger().Info("Enqueue the update event in the reporting channel") + m.reportingChan <- update + } + } + } +} \ No newline at end of file diff --git a/pkg/shim/main.go b/pkg/shim/main.go index 3123e154b..96a345f7b 100644 --- a/pkg/shim/main.go +++ b/pkg/shim/main.go @@ -23,6 +23,7 @@ import ( "os/signal" "syscall" + "github.com/apache/incubator-yunikorn-k8shim/pkg/federation" "go.uber.org/zap" "github.com/apache/incubator-yunikorn-core/pkg/api" @@ -44,6 +45,13 @@ func main() { serviceContext := entrypoint.StartAllServices() + // federation + member := federation.GetMembershipService() + // register itself to the head server + if err := member.ReconcileMembership(); err == nil { + member.Start() + } + if sa, ok := serviceContext.RMProxy.(api.SchedulerAPI); ok { ss := newShimScheduler(sa, conf.GetSchedulerConf()) ss.run() diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go index 63f8565ba..dfbc02cbd 100644 --- a/pkg/shim/scheduler.go +++ b/pkg/shim/scheduler.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/apache/incubator-yunikorn-k8shim/pkg/federation" "github.com/looplab/fsm" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/wait" @@ -111,6 +112,7 @@ func newShimSchedulerInternal(ctx *cache.Context, apiFactory client.APIProvider, dispatcher.RegisterEventHandler(dispatcher.EventTypeNode, ctx.SchedulerNodeEventHandler()) dispatcher.RegisterEventHandler(dispatcher.EventTypeScheduler, ss.SchedulerEventHandler()) dispatcher.RegisterEventHandler(dispatcher.EventTypeAppStatus, am.ApplicationStateUpdateEventHandler()) + dispatcher.RegisterEventHandler(dispatcher.EventTypeFederation, federation.GetMembershipService().FederationEventHandler()) return ss } From 6c51d43da50acbf3532aff89d5b65c0b572c4a5e Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 17 Sep 2020 18:17:35 -0700 Subject: [PATCH 02/14] Revert "arch png" This reverts commit bb50cc6ab9cedcb711f765aecdfb837cf53096ef. --- Makefile | 7 +- conf/queues.yaml | 29 +----- go.mod | 5 +- go.sum | 2 - pkg/cache/context.go | 6 -- pkg/cache/node.go | 12 --- pkg/callback/scheduler_callback.go | 85 ------------------ pkg/common/events/events.go | 17 ---- pkg/conf/schedulerconf.go | 2 +- pkg/dispatcher/dispatcher.go | 3 - pkg/federation/federation_events.go | 22 ----- pkg/federation/member.go | 132 ---------------------------- pkg/shim/main.go | 8 -- pkg/shim/scheduler.go | 2 - 14 files changed, 12 insertions(+), 320 deletions(-) delete mode 100644 pkg/federation/federation_events.go delete mode 100644 pkg/federation/member.go diff --git a/Makefile b/Makefile index f81bff5d5..d8e6160a6 100644 --- a/Makefile +++ b/Makefile @@ -125,7 +125,12 @@ sched_image: scheduler @coreSHA=$$(go list -m "github.com/apache/incubator-yunikorn-core" | cut -d "-" -f5) ; \ siSHA=$$(go list -m "github.com/apache/incubator-yunikorn-scheduler-interface" | cut -d "-" -f6) ; \ shimSHA=$$(git rev-parse --short=12 HEAD) ; \ - docker build ./deployments/image/configmap -t yunikorn/yunikorn-member:latest + docker build ./deployments/image/configmap -t ${REGISTRY}/yunikorn:scheduler-${VERSION} \ + --label "yunikorn-core-revision=$${coreSHA}" \ + --label "yunikorn-scheduler-interface-revision=$${siSHA}" \ + --label "yunikorn-k8shim-revision=$${shimSHA}" \ + --label "BuildTimeStamp=${DATE}" \ + --label "Version=${VERSION}" @mv -f ./deployments/image/configmap/Dockerfile.bkp ./deployments/image/configmap/Dockerfile @rm -f ./deployments/image/configmap/${BINARY} @rm -rf ./deployments/image/configmap/admission-controller-init-scripts/ diff --git a/conf/queues.yaml b/conf/queues.yaml index aef126e70..2fd1213a3 100644 --- a/conf/queues.yaml +++ b/conf/queues.yaml @@ -17,31 +17,10 @@ partitions: - name: default + placementrules: + - name: tag + value: namespace + create: true queues: - name: root submitacl: '*' - queues: - - name: advertisement - resources: - guaranteed: - memory: 500000 - vcore: 50000 - max: - memory: 800000 - vcore: 80000 - - name: search - resources: - guaranteed: - memory: 400000 - vcore: 40000 - max: - memory: 600000 - vcore: 60000 - - name: sandbox - resources: - guaranteed: - memory: 100000 - vcore: 10000 - max: - memory: 100000 - vcore: 10000 diff --git a/go.mod b/go.mod index b2c918abd..4ae3fcbc2 100644 --- a/go.mod +++ b/go.mod @@ -23,13 +23,12 @@ go 1.12 require ( github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200817155620-c19d2b8660d8 github.com/apache/incubator-yunikorn-core v0.0.0-20200827055746-57d663e73cb1 - github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319 + github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200827013520-ec2f681ecb5b github.com/google/uuid v1.1.1 github.com/looplab/fsm v0.1.0 github.com/onsi/ginkgo v1.11.0 github.com/onsi/gomega v1.7.0 go.uber.org/zap v1.13.0 - google.golang.org/grpc v1.26.0 gopkg.in/yaml.v2 v2.2.8 gotest.tools v2.2.0+incompatible k8s.io/api v0.16.13 @@ -62,5 +61,3 @@ replace ( k8s.io/metrics => k8s.io/metrics v0.16.13 k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.16.13 ) - -replace github.com/apache/incubator-yunikorn-scheduler-interface => /Users/wyang/workspace/github/wyang/incubator-yunikorn-scheduler-interface diff --git a/go.sum b/go.sum index c042ab8c8..aa2171b7e 100644 --- a/go.sum +++ b/go.sum @@ -46,8 +46,6 @@ github.com/apache/incubator-yunikorn-core v0.0.0-20200827055746-57d663e73cb1 h1: github.com/apache/incubator-yunikorn-core v0.0.0-20200827055746-57d663e73cb1/go.mod h1:0fxPfo3PViD/Bd6uuczucdi+tp3uH+vjhZ9Sjymx+uc= github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200827013520-ec2f681ecb5b h1:giSmjG7Ay/8hzql/oG1fNcij8rAsXOsLmHN5d40ZqYs= github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200827013520-ec2f681ecb5b/go.mod h1:ObMs03XFbnmpGD81jYvdUDEVZbHvz8W6dWH5nGDCjc0= -github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319 h1:I12nCcXdHe6W4oysVejFHfQDy0Ix0r+ZHdfooyEoldo= -github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319/go.mod h1:ObMs03XFbnmpGD81jYvdUDEVZbHvz8W6dWH5nGDCjc0= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= diff --git a/pkg/cache/context.go b/pkg/cache/context.go index a7d256f6c..6035e9733 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -508,12 +508,6 @@ func (ctx *Context) GetApplication(appID string) interfaces.ManagedApp { return nil } -func (ctx *Context) GetNode(nodeID string) *SchedulerNode { - ctx.lock.RLock() - defer ctx.lock.RUnlock() - return ctx.nodes.getNode(nodeID) -} - func (ctx *Context) RemoveApplication(appID string) error { ctx.lock.Lock() defer ctx.lock.Unlock() diff --git a/pkg/cache/node.go b/pkg/cache/node.go index 692c4f8a6..d399d282e 100644 --- a/pkg/cache/node.go +++ b/pkg/cache/node.go @@ -248,15 +248,3 @@ func (n *SchedulerNode) enterState(event *fsm.Event) { zap.String("destination", event.Dst), zap.String("event", event.Event)) } - -func (n *SchedulerNode) GetCapacity() *si.Resource { - n.lock.RLock() - defer n.lock.RUnlock() - return n.capacity -} - -func (n *SchedulerNode) GetOccupiedResources() *si.Resource { - n.lock.RLock() - defer n.lock.RUnlock() - return n.occupied -} diff --git a/pkg/callback/scheduler_callback.go b/pkg/callback/scheduler_callback.go index 05df0771a..d0dc27716 100644 --- a/pkg/callback/scheduler_callback.go +++ b/pkg/callback/scheduler_callback.go @@ -21,10 +21,6 @@ package callback import ( "fmt" - "github.com/apache/incubator-yunikorn-k8shim/pkg/common" - "github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants" - "github.com/apache/incubator-yunikorn-k8shim/pkg/conf" - "github.com/apache/incubator-yunikorn-k8shim/pkg/federation" "go.uber.org/zap" "github.com/apache/incubator-yunikorn-k8shim/pkg/cache" @@ -57,30 +53,6 @@ func (callback *AsyncRMCallback) RecvUpdateResponse(response *si.UpdateResponse) NodeID: node.NodeID, Event: events.NodeAccepted, }) - - confirmedNode := callback.context.GetNode(node.NodeID) - if confirmedNode != nil { - log.Logger().Info("reporting new node to the head server", - zap.String("nodeID", node.NodeID)) - dispatcher.Dispatch(federation.AsyncUpdateFederationEvent{ - ConfirmedRequests: []*si.UpdateRequest{ - { - NewSchedulableNodes: []*si.NewNodeInfo{ - { - NodeID: node.NodeID, - Attributes: map[string]string{ "compute-node": "yes"}, - SchedulableResource: confirmedNode.GetCapacity(), - OccupiedResource: confirmedNode.GetOccupiedResources(), - }, - }, - RmID: conf.GetSchedulerConf().ClusterID, - }, - }, - }) - } else { - log.Logger().Error("node accepted in sub cluster, but not found in cache", - zap.String("nodeID", node.NodeID)) - } } for _, node := range response.RejectedNodes { @@ -103,31 +75,6 @@ func (callback *AsyncRMCallback) RecvUpdateResponse(response *si.UpdateResponse) ev := cache.NewSimpleApplicationEvent(app.GetApplicationID(), events.AcceptApplication) dispatcher.Dispatch(ev) } - - if appInfo := callback.context.GetApplication(app.ApplicationID); appInfo != nil { - log.Logger().Info("reporting new application to the head server", - zap.String("appID", app.ApplicationID)) - dispatcher.Dispatch(federation.AsyncUpdateFederationEvent{ - ConfirmedRequests: []*si.UpdateRequest{ - { - NewApplications: []*si.AddApplicationRequest{ - { - ApplicationID: app.ApplicationID, - QueueName: appInfo.GetQueue(), - PartitionName: "default", - Ugi: &si.UserGroupInformation{ - User: "anonymous", - }, - }, - }, - RmID: conf.GetSchedulerConf().ClusterID, - }, - }, - }) - } else { - log.Logger().Error("app accepted in sub cluster, but not found in cache", - zap.String("appID", app.ApplicationID)) - } } for _, app := range response.RejectedApplications { @@ -153,38 +100,6 @@ func (callback *AsyncRMCallback) RecvUpdateResponse(response *si.UpdateResponse) if app := callback.context.GetApplication(alloc.ApplicationID); app != nil { ev := cache.NewAllocateTaskEvent(app.GetApplicationID(), alloc.AllocationKey, alloc.UUID, alloc.NodeID) dispatcher.Dispatch(ev) - - if updatedNode := callback.context.GetNode(alloc.NodeID); updatedNode != nil { - if task, err := app.GetTask(alloc.AllocationKey); err == nil { - dispatcher.Dispatch(federation.AsyncUpdateFederationEvent{ - ConfirmedRequests: []*si.UpdateRequest{ - { - UpdatedNodes: []*si.UpdateNodeInfo{ - { - NodeID: alloc.NodeID, - Attributes: map[string]string{"compute-node": "yes"}, - SchedulableResource: updatedNode.GetCapacity(), - OccupiedResource: updatedNode.GetOccupiedResources(), - ExistingAllocations: []*si.Allocation{ - { - AllocationKey: string(task.GetTaskPod().UID), - UUID: string(task.GetTaskPod().UID), - ResourcePerAlloc: common.GetPodResource(task.GetTaskPod()), - QueueName: app.GetQueue(), - NodeID: alloc.NodeID, - ApplicationID: app.GetApplicationID(), - PartitionName: constants.DefaultPartition, - }, - }, - Action: si.UpdateNodeInfo_UPDATE, - }, - }, - RmID: conf.GetSchedulerConf().ClusterID, - }, - }, - }) - } - } } } diff --git a/pkg/common/events/events.go b/pkg/common/events/events.go index 3a5ef5137..9b85b53c6 100644 --- a/pkg/common/events/events.go +++ b/pkg/common/events/events.go @@ -18,8 +18,6 @@ package events -import "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" - const EnterState = "enter_state" //---------------------------------------------- @@ -150,18 +148,3 @@ type SchedulerNodeEvent interface { // state machines' callbacks when doing state transition GetArgs() []interface{} } - -type FederationEventType string - -const ( - Update FederationEventType = "Update" -) - -type FederationEvent interface { - // the type of this event - GetEvent() FederationEventType - - GetConfirmedUpdates() []*si.UpdateRequest - - GetArgs() []interface{} -} diff --git a/pkg/conf/schedulerconf.go b/pkg/conf/schedulerconf.go index 91df37992..cdc52f14b 100644 --- a/pkg/conf/schedulerconf.go +++ b/pkg/conf/schedulerconf.go @@ -135,7 +135,7 @@ func initConfigs() { predicateList := flag.String("predicates", "", fmt.Sprintf("comma-separated list of predicates, valid predicates are: %s, "+ "the program will exit if any invalid predicates exist.", predicates.Ordering())) - operatorPluginList := flag.String("operatorPlugins", "general", + operatorPluginList := flag.String("operatorPlugins", "general,"+constants.AppManagerHandlerName, "comma-separated list of operator plugin names, currently, only \"spark-operator-service\""+ "and"+constants.AppManagerHandlerName+"is supported.") diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index c8c13181c..a22755c99 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -42,7 +42,6 @@ const ( EventTypeNode EventTypeScheduler EventTypeAppStatus - EventTypeFederation ) var ( @@ -187,8 +186,6 @@ func Start() { select { case event := <-getDispatcher().eventChan: switch v := event.(type) { - case events.FederationEvent: - getEventHandler(EventTypeFederation)(v) case events.ApplicationStatusEvent: getEventHandler(EventTypeAppStatus)(v) case events.ApplicationEvent: diff --git a/pkg/federation/federation_events.go b/pkg/federation/federation_events.go deleted file mode 100644 index d258bd96a..000000000 --- a/pkg/federation/federation_events.go +++ /dev/null @@ -1,22 +0,0 @@ -package federation - -import ( - "github.com/apache/incubator-yunikorn-k8shim/pkg/common/events" - "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" -) - -type AsyncUpdateFederationEvent struct { - ConfirmedRequests []*si.UpdateRequest -} - -func (a AsyncUpdateFederationEvent) GetEvent() events.FederationEventType { - return events.Update -} - -func (a AsyncUpdateFederationEvent) GetArgs() []interface{} { - return nil -} - -func (a AsyncUpdateFederationEvent) GetConfirmedUpdates() []*si.UpdateRequest { - return a.ConfirmedRequests -} diff --git a/pkg/federation/member.go b/pkg/federation/member.go deleted file mode 100644 index 380f92b89..000000000 --- a/pkg/federation/member.go +++ /dev/null @@ -1,132 +0,0 @@ -package federation - -import ( - "context" - "os" - "sync" - "time" - - "github.com/apache/incubator-yunikorn-k8shim/pkg/conf" - "github.com/apache/incubator-yunikorn-k8shim/pkg/log" - "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" - "go.uber.org/zap" - "google.golang.org/grpc" -) - -var once sync.Once -var member *MembershipService - -type MembershipService struct { - headAddress string - client si.SchedulerClient - reportingChan chan *si.UpdateRequest -} - - -func GetMembershipService() *MembershipService { - once.Do(func() { - headAddr := os.Getenv("YUNIKORN_HEAD_ADDRESS") - // Set up a connection to the server. - conn, err := grpc.Dial(headAddr, grpc.WithInsecure()) - if err != nil { - log.Logger().Fatal("failed to connect to the server", - zap.Error(err)) - } - - member = &MembershipService { - headAddress: headAddr, - client: si.NewSchedulerClient(conn), - reportingChan: make(chan *si.UpdateRequest, 1024*1024), - } - }) - return member -} - -// start membership service -func (m *MembershipService) Start() { - // start a go routine that sends updates to the head in gRPC stream - go m.asyncUpdate() -} - -func (m *MembershipService) ReconcileMembership() error { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // register the RM first - if _, err := m.client.RegisterResourceManager(ctx, &si.RegisterResourceManagerRequest{ - RmID: conf.GetSchedulerConf().ClusterID, - Version: "v1", - PolicyGroup: "queues", - }); err != nil { - log.Logger().Error("could not register to the head", - zap.Error(err)) - return err - } - - log.Logger().Info("membership registered", - zap.String("headAddress", m.headAddress)) - return nil -} - -// func (m *MembershipService) asyncUpdate() { -// ctx, cancel := context.WithCancel(context.Background()) -// cleanupFn := func() { -// log.Logger().Warn("release TCP connection") -// cancel() -// } -// defer cleanupFn() -// stream, err := m.client.Update(ctx) -// if err != nil { -// log.Logger().Fatal("failed to get the stream", -// zap.Error(err)) -// } -// -// for { -// select { -// case report :=<- m.reportingChan: -// if err := stream.Send(report); err != nil { -// log.Logger().Error("failed to send UpdateRequest to the server head", -// zap.Any("request", report), -// zap.Error(err)) -// -// } else { -// log.Logger().Info("reporting updates to the head server") -// } -// } -// } -// } - -func (m *MembershipService) asyncUpdate() { - for { - select { - case report :=<- m.reportingChan: - { - ctx := context.Background() - stream, err := m.client.Update(ctx) - if err != nil { - log.Logger().Fatal("failed to get the stream", - zap.Error(err)) - } - if err := stream.Send(report); err != nil { - log.Logger().Error("failed to send UpdateRequest to the server head", - zap.Any("request", report), - zap.Error(err)) - - } else { - log.Logger().Info("reporting updates to the head server") - } - } - } - } -} - -func (m *MembershipService) FederationEventHandler() func(obj interface{}) { - return func(obj interface{}) { - if event, ok := obj.(AsyncUpdateFederationEvent); ok { - for _, update := range event.GetConfirmedUpdates() { - log.Logger().Info("Enqueue the update event in the reporting channel") - m.reportingChan <- update - } - } - } -} \ No newline at end of file diff --git a/pkg/shim/main.go b/pkg/shim/main.go index 96a345f7b..3123e154b 100644 --- a/pkg/shim/main.go +++ b/pkg/shim/main.go @@ -23,7 +23,6 @@ import ( "os/signal" "syscall" - "github.com/apache/incubator-yunikorn-k8shim/pkg/federation" "go.uber.org/zap" "github.com/apache/incubator-yunikorn-core/pkg/api" @@ -45,13 +44,6 @@ func main() { serviceContext := entrypoint.StartAllServices() - // federation - member := federation.GetMembershipService() - // register itself to the head server - if err := member.ReconcileMembership(); err == nil { - member.Start() - } - if sa, ok := serviceContext.RMProxy.(api.SchedulerAPI); ok { ss := newShimScheduler(sa, conf.GetSchedulerConf()) ss.run() diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go index dfbc02cbd..63f8565ba 100644 --- a/pkg/shim/scheduler.go +++ b/pkg/shim/scheduler.go @@ -22,7 +22,6 @@ import ( "sync" "time" - "github.com/apache/incubator-yunikorn-k8shim/pkg/federation" "github.com/looplab/fsm" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/wait" @@ -112,7 +111,6 @@ func newShimSchedulerInternal(ctx *cache.Context, apiFactory client.APIProvider, dispatcher.RegisterEventHandler(dispatcher.EventTypeNode, ctx.SchedulerNodeEventHandler()) dispatcher.RegisterEventHandler(dispatcher.EventTypeScheduler, ss.SchedulerEventHandler()) dispatcher.RegisterEventHandler(dispatcher.EventTypeAppStatus, am.ApplicationStateUpdateEventHandler()) - dispatcher.RegisterEventHandler(dispatcher.EventTypeFederation, federation.GetMembershipService().FederationEventHandler()) return ss } From db6a4a00c8ed467845a2d7f2805a98a47d989413 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 22 Oct 2020 14:14:04 -0700 Subject: [PATCH 03/14] [YUNIKORN-421] Define app gang scheduling info in API package. --- pkg/apis/yunikorn.apache.org/v1alpha1/type.go | 37 +++++++----- .../v1alpha1/zz_generated.deepcopy.go | 56 +++++++++++++++---- 2 files changed, 66 insertions(+), 27 deletions(-) diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go index bd20f8b32..ce638b7a9 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go @@ -19,6 +19,7 @@ package v1alpha1 import ( + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -37,29 +38,35 @@ type Application struct { // Spec part type ApplicationSpec struct { - Policy SchedulePolicy `json:"schedulingPolicy"` - Queue string `json:"queue"` - TaskGroup []Task `json:"taskGroups"` + Queue string `json:"queue"` + TaskGroups []TaskGroup `json:"taskGroups"` } -type SchedulePolicy struct { - Policy SchedulingPolicy `json:"name"` - Parameters map[string]string `json:"parameters,omitempty"` +type SchedulingPolicy struct { + Type SchedulingPolicyType `json:"type"` + Parameters map[string]string `json:"parameters,omitempty"` } -type SchedulingPolicy string +type SchedulingPolicyType string const ( - TryOnce SchedulingPolicy = "TryOnce" - MaxRetry SchedulingPolicy = "MaxRetry" - TryReserve SchedulingPolicy = "TryReserve" - TryPreempt SchedulingPolicy = "TryPreempt" + TryOnce SchedulingPolicyType = "TryOnce" + MaxRetry SchedulingPolicyType = "MaxRetry" + TryReserve SchedulingPolicyType = "TryReserve" + TryPreempt SchedulingPolicyType = "TryPreempt" ) -type Task struct { - GroupName string `json:"groupName"` - MinMember int32 `json:"minMember"` - MinResource map[string]resource.Quantity `json:"minResource"` +type TaskGroups struct { + SchedulingPolicy SchedulingPolicy `json:"schedulingPolicy"` + TaskGroups []TaskGroup `json:"taskGroups"` +} + +type TaskGroup struct { + Name string `json:"name"` + MinMember int32 `json:"minMember"` + MinResource map[string]resource.Quantity `json:"minResource"` + NodeSelector metav1.LabelSelector `json:"nodeSelector,omitempty"` + Tolerations []v1.Toleration `json:"tolerations,omitempty"` } // Status part diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go index 25f88eb17..02ebd1ce4 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go @@ -22,6 +22,7 @@ package v1alpha1 import ( + v1 "k8s.io/api/core/v1" resource "k8s.io/apimachinery/pkg/api/resource" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -90,10 +91,9 @@ func (in *ApplicationList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ApplicationSpec) DeepCopyInto(out *ApplicationSpec) { *out = *in - in.Policy.DeepCopyInto(&out.Policy) - if in.TaskGroup != nil { - in, out := &in.TaskGroup, &out.TaskGroup - *out = make([]Task, len(*in)) + if in.TaskGroups != nil { + in, out := &in.TaskGroups, &out.TaskGroups + *out = make([]TaskGroup, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -129,7 +129,7 @@ func (in *ApplicationStatus) DeepCopy() *ApplicationStatus { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SchedulePolicy) DeepCopyInto(out *SchedulePolicy) { +func (in *SchedulingPolicy) DeepCopyInto(out *SchedulingPolicy) { *out = *in if in.Parameters != nil { in, out := &in.Parameters, &out.Parameters @@ -141,18 +141,18 @@ func (in *SchedulePolicy) DeepCopyInto(out *SchedulePolicy) { return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulePolicy. -func (in *SchedulePolicy) DeepCopy() *SchedulePolicy { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulingPolicy. +func (in *SchedulingPolicy) DeepCopy() *SchedulingPolicy { if in == nil { return nil } - out := new(SchedulePolicy) + out := new(SchedulingPolicy) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Task) DeepCopyInto(out *Task) { +func (in *TaskGroup) DeepCopyInto(out *TaskGroup) { *out = *in if in.MinResource != nil { in, out := &in.MinResource, &out.MinResource @@ -161,15 +161,47 @@ func (in *Task) DeepCopyInto(out *Task) { (*out)[key] = val.DeepCopy() } } + in.NodeSelector.DeepCopyInto(&out.NodeSelector) + if in.Tolerations != nil { + in, out := &in.Tolerations, &out.Tolerations + *out = make([]v1.Toleration, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskGroup. +func (in *TaskGroup) DeepCopy() *TaskGroup { + if in == nil { + return nil + } + out := new(TaskGroup) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TaskGroups) DeepCopyInto(out *TaskGroups) { + *out = *in + in.SchedulingPolicy.DeepCopyInto(&out.SchedulingPolicy) + if in.TaskGroups != nil { + in, out := &in.TaskGroups, &out.TaskGroups + *out = make([]TaskGroup, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Task. -func (in *Task) DeepCopy() *Task { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskGroups. +func (in *TaskGroups) DeepCopy() *TaskGroups { if in == nil { return nil } - out := new(Task) + out := new(TaskGroups) in.DeepCopyInto(out) return out } From 3086bc5470cbf1e9cd69e4abb7c3568069badac7 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 22 Oct 2020 17:10:50 -0700 Subject: [PATCH 04/14] Revert "[YUNIKORN-421] Define app gang scheduling info in API package." This reverts commit db6a4a00c8ed467845a2d7f2805a98a47d989413. --- pkg/apis/yunikorn.apache.org/v1alpha1/type.go | 37 +++++------- .../v1alpha1/zz_generated.deepcopy.go | 56 ++++--------------- 2 files changed, 27 insertions(+), 66 deletions(-) diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go index ce638b7a9..bd20f8b32 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go @@ -19,7 +19,6 @@ package v1alpha1 import ( - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -38,35 +37,29 @@ type Application struct { // Spec part type ApplicationSpec struct { - Queue string `json:"queue"` - TaskGroups []TaskGroup `json:"taskGroups"` + Policy SchedulePolicy `json:"schedulingPolicy"` + Queue string `json:"queue"` + TaskGroup []Task `json:"taskGroups"` } -type SchedulingPolicy struct { - Type SchedulingPolicyType `json:"type"` - Parameters map[string]string `json:"parameters,omitempty"` +type SchedulePolicy struct { + Policy SchedulingPolicy `json:"name"` + Parameters map[string]string `json:"parameters,omitempty"` } -type SchedulingPolicyType string +type SchedulingPolicy string const ( - TryOnce SchedulingPolicyType = "TryOnce" - MaxRetry SchedulingPolicyType = "MaxRetry" - TryReserve SchedulingPolicyType = "TryReserve" - TryPreempt SchedulingPolicyType = "TryPreempt" + TryOnce SchedulingPolicy = "TryOnce" + MaxRetry SchedulingPolicy = "MaxRetry" + TryReserve SchedulingPolicy = "TryReserve" + TryPreempt SchedulingPolicy = "TryPreempt" ) -type TaskGroups struct { - SchedulingPolicy SchedulingPolicy `json:"schedulingPolicy"` - TaskGroups []TaskGroup `json:"taskGroups"` -} - -type TaskGroup struct { - Name string `json:"name"` - MinMember int32 `json:"minMember"` - MinResource map[string]resource.Quantity `json:"minResource"` - NodeSelector metav1.LabelSelector `json:"nodeSelector,omitempty"` - Tolerations []v1.Toleration `json:"tolerations,omitempty"` +type Task struct { + GroupName string `json:"groupName"` + MinMember int32 `json:"minMember"` + MinResource map[string]resource.Quantity `json:"minResource"` } // Status part diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go index 02ebd1ce4..25f88eb17 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go @@ -22,7 +22,6 @@ package v1alpha1 import ( - v1 "k8s.io/api/core/v1" resource "k8s.io/apimachinery/pkg/api/resource" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -91,9 +90,10 @@ func (in *ApplicationList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ApplicationSpec) DeepCopyInto(out *ApplicationSpec) { *out = *in - if in.TaskGroups != nil { - in, out := &in.TaskGroups, &out.TaskGroups - *out = make([]TaskGroup, len(*in)) + in.Policy.DeepCopyInto(&out.Policy) + if in.TaskGroup != nil { + in, out := &in.TaskGroup, &out.TaskGroup + *out = make([]Task, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -129,7 +129,7 @@ func (in *ApplicationStatus) DeepCopy() *ApplicationStatus { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SchedulingPolicy) DeepCopyInto(out *SchedulingPolicy) { +func (in *SchedulePolicy) DeepCopyInto(out *SchedulePolicy) { *out = *in if in.Parameters != nil { in, out := &in.Parameters, &out.Parameters @@ -141,18 +141,18 @@ func (in *SchedulingPolicy) DeepCopyInto(out *SchedulingPolicy) { return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulingPolicy. -func (in *SchedulingPolicy) DeepCopy() *SchedulingPolicy { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulePolicy. +func (in *SchedulePolicy) DeepCopy() *SchedulePolicy { if in == nil { return nil } - out := new(SchedulingPolicy) + out := new(SchedulePolicy) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *TaskGroup) DeepCopyInto(out *TaskGroup) { +func (in *Task) DeepCopyInto(out *Task) { *out = *in if in.MinResource != nil { in, out := &in.MinResource, &out.MinResource @@ -161,47 +161,15 @@ func (in *TaskGroup) DeepCopyInto(out *TaskGroup) { (*out)[key] = val.DeepCopy() } } - in.NodeSelector.DeepCopyInto(&out.NodeSelector) - if in.Tolerations != nil { - in, out := &in.Tolerations, &out.Tolerations - *out = make([]v1.Toleration, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskGroup. -func (in *TaskGroup) DeepCopy() *TaskGroup { - if in == nil { - return nil - } - out := new(TaskGroup) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *TaskGroups) DeepCopyInto(out *TaskGroups) { - *out = *in - in.SchedulingPolicy.DeepCopyInto(&out.SchedulingPolicy) - if in.TaskGroups != nil { - in, out := &in.TaskGroups, &out.TaskGroups - *out = make([]TaskGroup, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskGroups. -func (in *TaskGroups) DeepCopy() *TaskGroups { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Task. +func (in *Task) DeepCopy() *Task { if in == nil { return nil } - out := new(TaskGroups) + out := new(Task) in.DeepCopyInto(out) return out } From 0046327571b8e375fd3c041b316ff54fd8a3d4bf Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 22 Oct 2020 17:19:48 -0700 Subject: [PATCH 05/14] fix compile issue --- pkg/apis/yunikorn.apache.org/v1alpha1/type.go | 37 +++++++----- .../v1alpha1/zz_generated.deepcopy.go | 58 ++++++++++++++----- .../application/app_controller_test.go | 21 ++++--- 3 files changed, 77 insertions(+), 39 deletions(-) diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go index bd20f8b32..ade923550 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go @@ -19,6 +19,7 @@ package v1alpha1 import ( + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -37,29 +38,35 @@ type Application struct { // Spec part type ApplicationSpec struct { - Policy SchedulePolicy `json:"schedulingPolicy"` - Queue string `json:"queue"` - TaskGroup []Task `json:"taskGroups"` + Queue string `json:"queue"` + TaskGroups TaskGroups `json:"taskGroups"` } -type SchedulePolicy struct { - Policy SchedulingPolicy `json:"name"` - Parameters map[string]string `json:"parameters,omitempty"` +type SchedulingPolicy struct { + Type SchedulingPolicyType `json:"type"` + Parameters map[string]string `json:"parameters,omitempty"` } -type SchedulingPolicy string +type SchedulingPolicyType string const ( - TryOnce SchedulingPolicy = "TryOnce" - MaxRetry SchedulingPolicy = "MaxRetry" - TryReserve SchedulingPolicy = "TryReserve" - TryPreempt SchedulingPolicy = "TryPreempt" + TryOnce SchedulingPolicyType = "TryOnce" + MaxRetry SchedulingPolicyType = "MaxRetry" + TryReserve SchedulingPolicyType = "TryReserve" + TryPreempt SchedulingPolicyType = "TryPreempt" ) -type Task struct { - GroupName string `json:"groupName"` - MinMember int32 `json:"minMember"` - MinResource map[string]resource.Quantity `json:"minResource"` +type TaskGroups struct { + SchedulingPolicy SchedulingPolicy `json:"schedulingPolicy"` + Groups []TaskGroup `json:"groups"` +} + +type TaskGroup struct { + Name string `json:"name"` + MinMember int32 `json:"minMember"` + MinResource map[string]resource.Quantity `json:"minResource"` + NodeSelector metav1.LabelSelector `json:"nodeSelector,omitempty"` + Tolerations []v1.Toleration `json:"tolerations,omitempty"` } // Status part diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go index 25f88eb17..b4eeea417 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go @@ -22,6 +22,7 @@ package v1alpha1 import ( + v1 "k8s.io/api/core/v1" resource "k8s.io/apimachinery/pkg/api/resource" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -90,14 +91,7 @@ func (in *ApplicationList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ApplicationSpec) DeepCopyInto(out *ApplicationSpec) { *out = *in - in.Policy.DeepCopyInto(&out.Policy) - if in.TaskGroup != nil { - in, out := &in.TaskGroup, &out.TaskGroup - *out = make([]Task, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } + in.TaskGroups.DeepCopyInto(&out.TaskGroups) return } @@ -129,7 +123,7 @@ func (in *ApplicationStatus) DeepCopy() *ApplicationStatus { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SchedulePolicy) DeepCopyInto(out *SchedulePolicy) { +func (in *SchedulingPolicy) DeepCopyInto(out *SchedulingPolicy) { *out = *in if in.Parameters != nil { in, out := &in.Parameters, &out.Parameters @@ -141,18 +135,18 @@ func (in *SchedulePolicy) DeepCopyInto(out *SchedulePolicy) { return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulePolicy. -func (in *SchedulePolicy) DeepCopy() *SchedulePolicy { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulingPolicy. +func (in *SchedulingPolicy) DeepCopy() *SchedulingPolicy { if in == nil { return nil } - out := new(SchedulePolicy) + out := new(SchedulingPolicy) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Task) DeepCopyInto(out *Task) { +func (in *TaskGroup) DeepCopyInto(out *TaskGroup) { *out = *in if in.MinResource != nil { in, out := &in.MinResource, &out.MinResource @@ -161,15 +155,47 @@ func (in *Task) DeepCopyInto(out *Task) { (*out)[key] = val.DeepCopy() } } + in.NodeSelector.DeepCopyInto(&out.NodeSelector) + if in.Tolerations != nil { + in, out := &in.Tolerations, &out.Tolerations + *out = make([]v1.Toleration, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskGroup. +func (in *TaskGroup) DeepCopy() *TaskGroup { + if in == nil { + return nil + } + out := new(TaskGroup) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TaskGroups) DeepCopyInto(out *TaskGroups) { + *out = *in + in.SchedulingPolicy.DeepCopyInto(&out.SchedulingPolicy) + if in.Groups != nil { + in, out := &in.Groups, &out.Groups + *out = make([]TaskGroup, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Task. -func (in *Task) DeepCopy() *Task { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskGroups. +func (in *TaskGroups) DeepCopy() *TaskGroups { if in == nil { return nil } - out := new(Task) + out := new(TaskGroups) in.DeepCopyInto(out) return out } diff --git a/pkg/controller/application/app_controller_test.go b/pkg/controller/application/app_controller_test.go index 5b84da579..c23bd4820 100644 --- a/pkg/controller/application/app_controller_test.go +++ b/pkg/controller/application/app_controller_test.go @@ -165,14 +165,19 @@ func createApp(name string, namespace string, queue string) appv1.Application { UID: "UID-APP-00001", }, Spec: appv1.ApplicationSpec{ - Policy: appv1.SchedulePolicy{ - Policy: "TryOnce", - }, - Queue: queue, - TaskGroup: []appv1.Task{ - { - GroupName: "test-task-001", - MinMember: 0, + Queue: queue, + TaskGroups: appv1.TaskGroups{ + SchedulingPolicy: appv1.SchedulingPolicy{ + Type: "TryReserve", + Parameters: map[string]string{ + "timeout" : "2h", + }, + }, + Groups: []appv1.TaskGroup{ + { + Name: "test-task-001", + MinMember: 0, + }, }, }, }, From db60621949c2c860b1f6566f0fcd5ef56c38185f Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 22 Oct 2020 21:12:08 -0700 Subject: [PATCH 06/14] fix e2e tests --- test/e2e/app/app_test.go | 12 ++++++------ test/e2e/testdata/application.yaml | 12 +++++++----- test/e2e/testdata/application_error.yaml | 12 +++++++----- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/test/e2e/app/app_test.go b/test/e2e/app/app_test.go index 6d4ff994d..1b0d742bc 100644 --- a/test/e2e/app/app_test.go +++ b/test/e2e/app/app_test.go @@ -74,14 +74,14 @@ var _ = ginkgo.Describe("App", func() { gomega.Ω(appCRD.Spec.Queue).To(gomega.Equal("root.default")) gomega.Ω(appCRD.ObjectMeta.Name).To(gomega.Equal("example")) gomega.Ω(appCRD.ObjectMeta.Namespace).To(gomega.Equal(dev)) - policy := appCRD.Spec.Policy.Policy - gomega.Ω(string(policy)).To(gomega.Equal("TryOnce")) - gomega.Ω(appCRD.Spec.TaskGroup[0].GroupName).To(gomega.Equal("test-task-0001")) - gomega.Ω(appCRD.Spec.TaskGroup[0].MinMember).To(gomega.Equal(int32(1))) + policy := appCRD.Spec.TaskGroups.SchedulingPolicy + gomega.Ω(policy.Type).To(gomega.Equal("TryReserve")) + gomega.Ω(appCRD.Spec.TaskGroups.Groups[0].Name).To(gomega.Equal("test-task-0001")) + gomega.Ω(appCRD.Spec.TaskGroups.Groups[0].MinMember).To(gomega.Equal(int32(1))) anscpu := resource.MustParse("300m") ansmem := resource.MustParse("128Mi") - gomega.Ω(appCRD.Spec.TaskGroup[0].MinResource["cpu"]).To(gomega.Equal(anscpu)) - gomega.Ω(appCRD.Spec.TaskGroup[0].MinResource["memory"]).To(gomega.Equal(ansmem)) + gomega.Ω(appCRD.Spec.TaskGroups.Groups[0].MinResource["cpu"]).To(gomega.Equal(anscpu)) + gomega.Ω(appCRD.Spec.TaskGroups.Groups[0].MinResource["memory"]).To(gomega.Equal(ansmem)) }) ginkgo.AfterSuite(func() { diff --git a/test/e2e/testdata/application.yaml b/test/e2e/testdata/application.yaml index dcdc26ca1..63fde978d 100644 --- a/test/e2e/testdata/application.yaml +++ b/test/e2e/testdata/application.yaml @@ -24,9 +24,11 @@ spec: name: TryOnce queue: root.default taskGroups: - - groupName: "test-task-0001" - minMember: 1 - minResource: - cpu: "300m" - memory: "128Mi" + - schedulingPolicy: "TryReserve" + - groups: + - name: "test-task-0001" + minMember: 1 + minResource: + cpu: "300m" + memory: "128Mi" \ No newline at end of file diff --git a/test/e2e/testdata/application_error.yaml b/test/e2e/testdata/application_error.yaml index 32da826a4..59a6b4854 100644 --- a/test/e2e/testdata/application_error.yaml +++ b/test/e2e/testdata/application_error.yaml @@ -25,9 +25,11 @@ spec: schedulingPolicy: name: errorName taskGroups: - - groupName: 123 - minMember: "string" - minResource: - cpu: "ABC" - memory: "ABC" + - schedulingPolicy: "TryReserve" + - groups: + - name: "test-task-0001" + minMember: 1 + minResource: + cpu: "300m" + memory: "128Mi" \ No newline at end of file From 0892be0b94454718c2b22556f0350ec00502364f Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Fri, 23 Oct 2020 13:07:11 -0700 Subject: [PATCH 07/14] fix lint errors --- pkg/apis/yunikorn.apache.org/v1alpha1/type.go | 4 ++-- pkg/controller/application/app_controller_test.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go index ade923550..7fb2498cb 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go @@ -38,8 +38,8 @@ type Application struct { // Spec part type ApplicationSpec struct { - Queue string `json:"queue"` - TaskGroups TaskGroups `json:"taskGroups"` + Queue string `json:"queue"` + TaskGroups TaskGroups `json:"taskGroups"` } type SchedulingPolicy struct { diff --git a/pkg/controller/application/app_controller_test.go b/pkg/controller/application/app_controller_test.go index c23bd4820..6a92759aa 100644 --- a/pkg/controller/application/app_controller_test.go +++ b/pkg/controller/application/app_controller_test.go @@ -165,18 +165,18 @@ func createApp(name string, namespace string, queue string) appv1.Application { UID: "UID-APP-00001", }, Spec: appv1.ApplicationSpec{ - Queue: queue, + Queue: queue, TaskGroups: appv1.TaskGroups{ SchedulingPolicy: appv1.SchedulingPolicy{ - Type: "TryReserve", + Type: "TryReserve", Parameters: map[string]string{ - "timeout" : "2h", + "timeout": "2h", }, }, Groups: []appv1.TaskGroup{ { - Name: "test-task-001", - MinMember: 0, + Name: "test-task-001", + MinMember: 0, }, }, }, From 92f7c2c64d3f6846e66e73bd974e830bec2291ae Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Mon, 26 Oct 2020 14:44:27 -0700 Subject: [PATCH 08/14] use flat structs --- pkg/apis/yunikorn.apache.org/v1alpha1/type.go | 10 ++---- .../v1alpha1/zz_generated.deepcopy.go | 33 +++++-------------- test/e2e/app/app_test.go | 12 +++---- test/e2e/testdata/application.yaml | 12 +++---- test/e2e/testdata/application_error.yaml | 12 +++---- 5 files changed, 27 insertions(+), 52 deletions(-) diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go index 7fb2498cb..47fd4dafc 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go @@ -38,8 +38,9 @@ type Application struct { // Spec part type ApplicationSpec struct { - Queue string `json:"queue"` - TaskGroups TaskGroups `json:"taskGroups"` + SchedulingPolicy SchedulingPolicy `json:"SchedulingPolicy"` + Queue string `json:"queue"` + TaskGroups []TaskGroup `json:"taskGroups"` } type SchedulingPolicy struct { @@ -56,11 +57,6 @@ const ( TryPreempt SchedulingPolicyType = "TryPreempt" ) -type TaskGroups struct { - SchedulingPolicy SchedulingPolicy `json:"schedulingPolicy"` - Groups []TaskGroup `json:"groups"` -} - type TaskGroup struct { Name string `json:"name"` MinMember int32 `json:"minMember"` diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go index b4eeea417..bebfec861 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go @@ -91,7 +91,14 @@ func (in *ApplicationList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ApplicationSpec) DeepCopyInto(out *ApplicationSpec) { *out = *in - in.TaskGroups.DeepCopyInto(&out.TaskGroups) + in.SchedulingPolicy.DeepCopyInto(&out.SchedulingPolicy) + if in.TaskGroups != nil { + in, out := &in.TaskGroups, &out.TaskGroups + *out = make([]TaskGroup, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } @@ -175,27 +182,3 @@ func (in *TaskGroup) DeepCopy() *TaskGroup { in.DeepCopyInto(out) return out } - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *TaskGroups) DeepCopyInto(out *TaskGroups) { - *out = *in - in.SchedulingPolicy.DeepCopyInto(&out.SchedulingPolicy) - if in.Groups != nil { - in, out := &in.Groups, &out.Groups - *out = make([]TaskGroup, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskGroups. -func (in *TaskGroups) DeepCopy() *TaskGroups { - if in == nil { - return nil - } - out := new(TaskGroups) - in.DeepCopyInto(out) - return out -} diff --git a/test/e2e/app/app_test.go b/test/e2e/app/app_test.go index 1b0d742bc..f3b7b8075 100644 --- a/test/e2e/app/app_test.go +++ b/test/e2e/app/app_test.go @@ -74,14 +74,14 @@ var _ = ginkgo.Describe("App", func() { gomega.Ω(appCRD.Spec.Queue).To(gomega.Equal("root.default")) gomega.Ω(appCRD.ObjectMeta.Name).To(gomega.Equal("example")) gomega.Ω(appCRD.ObjectMeta.Namespace).To(gomega.Equal(dev)) - policy := appCRD.Spec.TaskGroups.SchedulingPolicy - gomega.Ω(policy.Type).To(gomega.Equal("TryReserve")) - gomega.Ω(appCRD.Spec.TaskGroups.Groups[0].Name).To(gomega.Equal("test-task-0001")) - gomega.Ω(appCRD.Spec.TaskGroups.Groups[0].MinMember).To(gomega.Equal(int32(1))) + policy := appCRD.Spec.SchedulingPolicy + gomega.Ω(policy.Type).To(gomega.Equal(v1alpha1.TryOnce)) + gomega.Ω(appCRD.Spec.TaskGroups[0].Name).To(gomega.Equal("test-task-0001")) + gomega.Ω(appCRD.Spec.TaskGroups[0].MinMember).To(gomega.Equal(int32(1))) anscpu := resource.MustParse("300m") ansmem := resource.MustParse("128Mi") - gomega.Ω(appCRD.Spec.TaskGroups.Groups[0].MinResource["cpu"]).To(gomega.Equal(anscpu)) - gomega.Ω(appCRD.Spec.TaskGroups.Groups[0].MinResource["memory"]).To(gomega.Equal(ansmem)) + gomega.Ω(appCRD.Spec.TaskGroups[0].MinResource["cpu"]).To(gomega.Equal(anscpu)) + gomega.Ω(appCRD.Spec.TaskGroups[0].MinResource["memory"]).To(gomega.Equal(ansmem)) }) ginkgo.AfterSuite(func() { diff --git a/test/e2e/testdata/application.yaml b/test/e2e/testdata/application.yaml index 63fde978d..80f84cd9a 100644 --- a/test/e2e/testdata/application.yaml +++ b/test/e2e/testdata/application.yaml @@ -24,11 +24,9 @@ spec: name: TryOnce queue: root.default taskGroups: - - schedulingPolicy: "TryReserve" - - groups: - - name: "test-task-0001" - minMember: 1 - minResource: - cpu: "300m" - memory: "128Mi" + - name: "test-task-0001" + minMember: 1 + minResource: + cpu: "300m" + memory: "128Mi" \ No newline at end of file diff --git a/test/e2e/testdata/application_error.yaml b/test/e2e/testdata/application_error.yaml index 59a6b4854..5ee95c21d 100644 --- a/test/e2e/testdata/application_error.yaml +++ b/test/e2e/testdata/application_error.yaml @@ -25,11 +25,9 @@ spec: schedulingPolicy: name: errorName taskGroups: - - schedulingPolicy: "TryReserve" - - groups: - - name: "test-task-0001" - minMember: 1 - minResource: - cpu: "300m" - memory: "128Mi" + - name: "test-task-0001" + minMember: 1 + minResource: + cpu: "300m" + memory: "128Mi" \ No newline at end of file From 3bfbd3cc24e99ec3f756614171270ccb1f723793 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Mon, 26 Oct 2020 15:42:52 -0700 Subject: [PATCH 09/14] fix test failures --- .../application/app_controller_test.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/pkg/controller/application/app_controller_test.go b/pkg/controller/application/app_controller_test.go index 6a92759aa..49aa1a791 100644 --- a/pkg/controller/application/app_controller_test.go +++ b/pkg/controller/application/app_controller_test.go @@ -166,18 +166,13 @@ func createApp(name string, namespace string, queue string) appv1.Application { }, Spec: appv1.ApplicationSpec{ Queue: queue, - TaskGroups: appv1.TaskGroups{ - SchedulingPolicy: appv1.SchedulingPolicy{ - Type: "TryReserve", - Parameters: map[string]string{ - "timeout": "2h", - }, - }, - Groups: []appv1.TaskGroup{ - { - Name: "test-task-001", - MinMember: 0, - }, + SchedulingPolicy: appv1.SchedulingPolicy{ + Type: appv1.TryReserve, + }, + TaskGroups: []appv1.TaskGroup{ + { + Name: "test-task-001", + MinMember: 0, }, }, }, From 85a5de282e832ffe4459461e079f4e7347e7c4df Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Mon, 26 Oct 2020 16:59:51 -0700 Subject: [PATCH 10/14] fix lint issue --- pkg/apis/yunikorn.apache.org/v1alpha1/type.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go index 47fd4dafc..8b904dc51 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go @@ -38,9 +38,9 @@ type Application struct { // Spec part type ApplicationSpec struct { - SchedulingPolicy SchedulingPolicy `json:"SchedulingPolicy"` - Queue string `json:"queue"` - TaskGroups []TaskGroup `json:"taskGroups"` + SchedulingPolicy SchedulingPolicy `json:"SchedulingPolicy"` + Queue string `json:"queue"` + TaskGroups []TaskGroup `json:"taskGroups"` } type SchedulingPolicy struct { From b51239ab9dae81c988926c95189ef25b34675645 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Mon, 26 Oct 2020 22:03:59 -0700 Subject: [PATCH 11/14] fix e2e test --- test/e2e/testdata/application_error.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/testdata/application_error.yaml b/test/e2e/testdata/application_error.yaml index 5ee95c21d..7c2b091dd 100644 --- a/test/e2e/testdata/application_error.yaml +++ b/test/e2e/testdata/application_error.yaml @@ -19,7 +19,7 @@ apiVersion: "yunikorn.apache.org/v1alpha1" kind: Application metadata: - name: example_test + name: example.test spec: queue: ////// schedulingPolicy: From 8e8ce3c50742730d00a8d1daf9e0a20ccc17d729 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Tue, 27 Oct 2020 00:47:00 -0700 Subject: [PATCH 12/14] fix e2e test --- test/e2e/testdata/application_error.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/e2e/testdata/application_error.yaml b/test/e2e/testdata/application_error.yaml index 7c2b091dd..5be8d6e83 100644 --- a/test/e2e/testdata/application_error.yaml +++ b/test/e2e/testdata/application_error.yaml @@ -19,14 +19,14 @@ apiVersion: "yunikorn.apache.org/v1alpha1" kind: Application metadata: - name: example.test + name: example_test spec: queue: ////// schedulingPolicy: - name: errorName + type: TryReserve taskGroups: - name: "test-task-0001" - minMember: 1 + minMember: "string" minResource: cpu: "300m" memory: "128Mi" From c7103e464e4b50848746f8b6373d561dddd7e6b5 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Tue, 27 Oct 2020 09:27:29 -0700 Subject: [PATCH 13/14] fix e2e test --- test/e2e/testdata/application.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/testdata/application.yaml b/test/e2e/testdata/application.yaml index 80f84cd9a..ef6345517 100644 --- a/test/e2e/testdata/application.yaml +++ b/test/e2e/testdata/application.yaml @@ -21,7 +21,7 @@ metadata: name: example spec: schedulingPolicy: - name: TryOnce + type: TryOnce queue: root.default taskGroups: - name: "test-task-0001" From ea9c087d66b71f56158d225c1fef1e36f2e7c15d Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Tue, 27 Oct 2020 09:29:24 -0700 Subject: [PATCH 14/14] fix schedulingPolicy name --- pkg/apis/yunikorn.apache.org/v1alpha1/type.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go index 8b904dc51..a21497823 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go @@ -38,7 +38,7 @@ type Application struct { // Spec part type ApplicationSpec struct { - SchedulingPolicy SchedulingPolicy `json:"SchedulingPolicy"` + SchedulingPolicy SchedulingPolicy `json:"schedulingPolicy"` Queue string `json:"queue"` TaskGroups []TaskGroup `json:"taskGroups"` }