Skip to content

Commit

Permalink
arch png
Browse files Browse the repository at this point in the history
  • Loading branch information
yangwwei committed Sep 18, 2020
1 parent 1056620 commit bb50cc6
Show file tree
Hide file tree
Showing 14 changed files with 320 additions and 12 deletions.
7 changes: 1 addition & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,7 @@ sched_image: scheduler
@coreSHA=$$(go list -m "github.com/apache/incubator-yunikorn-core" | cut -d "-" -f5) ; \
siSHA=$$(go list -m "github.com/apache/incubator-yunikorn-scheduler-interface" | cut -d "-" -f6) ; \
shimSHA=$$(git rev-parse --short=12 HEAD) ; \
docker build ./deployments/image/configmap -t ${REGISTRY}/yunikorn:scheduler-${VERSION} \
--label "yunikorn-core-revision=$${coreSHA}" \
--label "yunikorn-scheduler-interface-revision=$${siSHA}" \
--label "yunikorn-k8shim-revision=$${shimSHA}" \
--label "BuildTimeStamp=${DATE}" \
--label "Version=${VERSION}"
docker build ./deployments/image/configmap -t yunikorn/yunikorn-member:latest
@mv -f ./deployments/image/configmap/Dockerfile.bkp ./deployments/image/configmap/Dockerfile
@rm -f ./deployments/image/configmap/${BINARY}
@rm -rf ./deployments/image/configmap/admission-controller-init-scripts/
Expand Down
29 changes: 25 additions & 4 deletions conf/queues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,31 @@

partitions:
- name: default
placementrules:
- name: tag
value: namespace
create: true
queues:
- name: root
submitacl: '*'
queues:
- name: advertisement
resources:
guaranteed:
memory: 500000
vcore: 50000
max:
memory: 800000
vcore: 80000
- name: search
resources:
guaranteed:
memory: 400000
vcore: 40000
max:
memory: 600000
vcore: 60000
- name: sandbox
resources:
guaranteed:
memory: 100000
vcore: 10000
max:
memory: 100000
vcore: 10000
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ go 1.12
require (
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200817155620-c19d2b8660d8
github.com/apache/incubator-yunikorn-core v0.0.0-20200827055746-57d663e73cb1
github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200827013520-ec2f681ecb5b
github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319
github.com/google/uuid v1.1.1
github.com/looplab/fsm v0.1.0
github.com/onsi/ginkgo v1.11.0
github.com/onsi/gomega v1.7.0
go.uber.org/zap v1.13.0
google.golang.org/grpc v1.26.0
gopkg.in/yaml.v2 v2.2.8
gotest.tools v2.2.0+incompatible
k8s.io/api v0.16.13
Expand Down Expand Up @@ -61,3 +62,5 @@ replace (
k8s.io/metrics => k8s.io/metrics v0.16.13
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.16.13
)

replace github.com/apache/incubator-yunikorn-scheduler-interface => /Users/wyang/workspace/github/wyang/incubator-yunikorn-scheduler-interface
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ github.com/apache/incubator-yunikorn-core v0.0.0-20200827055746-57d663e73cb1 h1:
github.com/apache/incubator-yunikorn-core v0.0.0-20200827055746-57d663e73cb1/go.mod h1:0fxPfo3PViD/Bd6uuczucdi+tp3uH+vjhZ9Sjymx+uc=
github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200827013520-ec2f681ecb5b h1:giSmjG7Ay/8hzql/oG1fNcij8rAsXOsLmHN5d40ZqYs=
github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200827013520-ec2f681ecb5b/go.mod h1:ObMs03XFbnmpGD81jYvdUDEVZbHvz8W6dWH5nGDCjc0=
github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319 h1:I12nCcXdHe6W4oysVejFHfQDy0Ix0r+ZHdfooyEoldo=
github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319/go.mod h1:ObMs03XFbnmpGD81jYvdUDEVZbHvz8W6dWH5nGDCjc0=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
Expand Down
6 changes: 6 additions & 0 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,12 @@ func (ctx *Context) GetApplication(appID string) interfaces.ManagedApp {
return nil
}

func (ctx *Context) GetNode(nodeID string) *SchedulerNode {
ctx.lock.RLock()
defer ctx.lock.RUnlock()
return ctx.nodes.getNode(nodeID)
}

func (ctx *Context) RemoveApplication(appID string) error {
ctx.lock.Lock()
defer ctx.lock.Unlock()
Expand Down
12 changes: 12 additions & 0 deletions pkg/cache/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,15 @@ func (n *SchedulerNode) enterState(event *fsm.Event) {
zap.String("destination", event.Dst),
zap.String("event", event.Event))
}

func (n *SchedulerNode) GetCapacity() *si.Resource {
n.lock.RLock()
defer n.lock.RUnlock()
return n.capacity
}

func (n *SchedulerNode) GetOccupiedResources() *si.Resource {
n.lock.RLock()
defer n.lock.RUnlock()
return n.occupied
}
85 changes: 85 additions & 0 deletions pkg/callback/scheduler_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ package callback
import (
"fmt"

"github.com/apache/incubator-yunikorn-k8shim/pkg/common"
"github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants"
"github.com/apache/incubator-yunikorn-k8shim/pkg/conf"
"github.com/apache/incubator-yunikorn-k8shim/pkg/federation"
"go.uber.org/zap"

"github.com/apache/incubator-yunikorn-k8shim/pkg/cache"
Expand Down Expand Up @@ -53,6 +57,30 @@ func (callback *AsyncRMCallback) RecvUpdateResponse(response *si.UpdateResponse)
NodeID: node.NodeID,
Event: events.NodeAccepted,
})

confirmedNode := callback.context.GetNode(node.NodeID)
if confirmedNode != nil {
log.Logger().Info("reporting new node to the head server",
zap.String("nodeID", node.NodeID))
dispatcher.Dispatch(federation.AsyncUpdateFederationEvent{
ConfirmedRequests: []*si.UpdateRequest{
{
NewSchedulableNodes: []*si.NewNodeInfo{
{
NodeID: node.NodeID,
Attributes: map[string]string{ "compute-node": "yes"},
SchedulableResource: confirmedNode.GetCapacity(),
OccupiedResource: confirmedNode.GetOccupiedResources(),
},
},
RmID: conf.GetSchedulerConf().ClusterID,
},
},
})
} else {
log.Logger().Error("node accepted in sub cluster, but not found in cache",
zap.String("nodeID", node.NodeID))
}
}

for _, node := range response.RejectedNodes {
Expand All @@ -75,6 +103,31 @@ func (callback *AsyncRMCallback) RecvUpdateResponse(response *si.UpdateResponse)
ev := cache.NewSimpleApplicationEvent(app.GetApplicationID(), events.AcceptApplication)
dispatcher.Dispatch(ev)
}

if appInfo := callback.context.GetApplication(app.ApplicationID); appInfo != nil {
log.Logger().Info("reporting new application to the head server",
zap.String("appID", app.ApplicationID))
dispatcher.Dispatch(federation.AsyncUpdateFederationEvent{
ConfirmedRequests: []*si.UpdateRequest{
{
NewApplications: []*si.AddApplicationRequest{
{
ApplicationID: app.ApplicationID,
QueueName: appInfo.GetQueue(),
PartitionName: "default",
Ugi: &si.UserGroupInformation{
User: "anonymous",
},
},
},
RmID: conf.GetSchedulerConf().ClusterID,
},
},
})
} else {
log.Logger().Error("app accepted in sub cluster, but not found in cache",
zap.String("appID", app.ApplicationID))
}
}

for _, app := range response.RejectedApplications {
Expand All @@ -100,6 +153,38 @@ func (callback *AsyncRMCallback) RecvUpdateResponse(response *si.UpdateResponse)
if app := callback.context.GetApplication(alloc.ApplicationID); app != nil {
ev := cache.NewAllocateTaskEvent(app.GetApplicationID(), alloc.AllocationKey, alloc.UUID, alloc.NodeID)
dispatcher.Dispatch(ev)

if updatedNode := callback.context.GetNode(alloc.NodeID); updatedNode != nil {
if task, err := app.GetTask(alloc.AllocationKey); err == nil {
dispatcher.Dispatch(federation.AsyncUpdateFederationEvent{
ConfirmedRequests: []*si.UpdateRequest{
{
UpdatedNodes: []*si.UpdateNodeInfo{
{
NodeID: alloc.NodeID,
Attributes: map[string]string{"compute-node": "yes"},
SchedulableResource: updatedNode.GetCapacity(),
OccupiedResource: updatedNode.GetOccupiedResources(),
ExistingAllocations: []*si.Allocation{
{
AllocationKey: string(task.GetTaskPod().UID),
UUID: string(task.GetTaskPod().UID),
ResourcePerAlloc: common.GetPodResource(task.GetTaskPod()),
QueueName: app.GetQueue(),
NodeID: alloc.NodeID,
ApplicationID: app.GetApplicationID(),
PartitionName: constants.DefaultPartition,
},
},
Action: si.UpdateNodeInfo_UPDATE,
},
},
RmID: conf.GetSchedulerConf().ClusterID,
},
},
})
}
}
}
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/common/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package events

import "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"

const EnterState = "enter_state"

//----------------------------------------------
Expand Down Expand Up @@ -148,3 +150,18 @@ type SchedulerNodeEvent interface {
// state machines' callbacks when doing state transition
GetArgs() []interface{}
}

type FederationEventType string

const (
Update FederationEventType = "Update"
)

type FederationEvent interface {
// the type of this event
GetEvent() FederationEventType

GetConfirmedUpdates() []*si.UpdateRequest

GetArgs() []interface{}
}
2 changes: 1 addition & 1 deletion pkg/conf/schedulerconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func initConfigs() {
predicateList := flag.String("predicates", "",
fmt.Sprintf("comma-separated list of predicates, valid predicates are: %s, "+
"the program will exit if any invalid predicates exist.", predicates.Ordering()))
operatorPluginList := flag.String("operatorPlugins", "general,"+constants.AppManagerHandlerName,
operatorPluginList := flag.String("operatorPlugins", "general",
"comma-separated list of operator plugin names, currently, only \"spark-operator-service\""+
"and"+constants.AppManagerHandlerName+"is supported.")

Expand Down
3 changes: 3 additions & 0 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
EventTypeNode
EventTypeScheduler
EventTypeAppStatus
EventTypeFederation
)

var (
Expand Down Expand Up @@ -186,6 +187,8 @@ func Start() {
select {
case event := <-getDispatcher().eventChan:
switch v := event.(type) {
case events.FederationEvent:
getEventHandler(EventTypeFederation)(v)
case events.ApplicationStatusEvent:
getEventHandler(EventTypeAppStatus)(v)
case events.ApplicationEvent:
Expand Down
22 changes: 22 additions & 0 deletions pkg/federation/federation_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package federation

import (
"github.com/apache/incubator-yunikorn-k8shim/pkg/common/events"
"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
)

type AsyncUpdateFederationEvent struct {
ConfirmedRequests []*si.UpdateRequest
}

func (a AsyncUpdateFederationEvent) GetEvent() events.FederationEventType {
return events.Update
}

func (a AsyncUpdateFederationEvent) GetArgs() []interface{} {
return nil
}

func (a AsyncUpdateFederationEvent) GetConfirmedUpdates() []*si.UpdateRequest {
return a.ConfirmedRequests
}
Loading

0 comments on commit bb50cc6

Please sign in to comment.