Skip to content

Commit

Permalink
[YUNIKORN-500] Make sure placeholder/taskGroupName are passed back to…
Browse files Browse the repository at this point in the history
… the core.
  • Loading branch information
yangwwei committed Jan 7, 2021
1 parent 0c25c0e commit 99776be
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 43 deletions.
6 changes: 6 additions & 0 deletions pkg/appmgmt/general/general.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ func (os *Manager) getTaskMetadata(pod *v1.Pod) (interfaces.TaskMetadata, bool)
return interfaces.TaskMetadata{}, false
}

placeholder := utils.GetPlaceholderFlagFromPodSpec(pod)
taskGroupName := utils.GetTaskGroupFromPodSpec(pod)

return interfaces.TaskMetadata{
ApplicationID: appId,
TaskID: string(pod.UID),
Pod: pod,
Placeholder: placeholder,
TaskGroupName: taskGroupName,
}, true
}
Expand Down Expand Up @@ -303,6 +305,8 @@ func (os *Manager) GetExistingAllocation(pod *v1.Pod) *si.Allocation {
// when submit a task, we use pod UID as the allocationKey,
// to keep consistent, during recovery, the pod UID is also used
// for an Allocation.
placeholder := utils.GetPlaceholderFlagFromPodSpec(pod)
taskGroupName := utils.GetTaskGroupFromPodSpec(pod)
return &si.Allocation{
AllocationKey: string(pod.UID),
AllocationTags: meta.Tags,
Expand All @@ -311,6 +315,8 @@ func (os *Manager) GetExistingAllocation(pod *v1.Pod) *si.Allocation {
QueueName: meta.QueueName,
NodeID: pod.Spec.NodeName,
ApplicationID: meta.ApplicationID,
Placeholder: placeholder,
TaskGroupName: taskGroupName,
PartitionName: constants.DefaultPartition,
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/appmgmt/interfaces/amprotocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,6 @@ type TaskMetadata struct {
ApplicationID string
TaskID string
Pod *v1.Pod
Placeholder bool
TaskGroupName string
}
13 changes: 9 additions & 4 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,14 @@ func (app *Application) onReservationStateChange(event *fsm.Event) {

actualCounts := utils.NewTaskGroupInstanceCountMap()
for _, t := range app.getTasks(events.States().Task.Bound) {
actualCounts.AddOne(t.taskGroupName)
if t.placeholder {
actualCounts.AddOne(t.taskGroupName)
}
}

// min member all satisfied
if desireCounts.Equals(actualCounts) {
ev := NewRunApplicationEvent(app.GetApplicationID())
ev := NewRunApplicationEvent(app.applicationID)
dispatcher.Dispatch(ev)
}
}
Expand All @@ -463,16 +465,19 @@ func (app *Application) handleCompleteApplicationEvent(event *fsm.Event) {
func (app *Application) handleReleaseAppAllocationEvent(event *fsm.Event) {
eventArgs := make([]string, 2)
if err := events.GetEventArgsAsStrings(eventArgs, event.Args); err != nil {
log.Logger().Error("fail to paser event arg", zap.Error(err))
log.Logger().Error("fail to parse event arg", zap.Error(err))
return
}
allocUUID := eventArgs[0]
terminationTypeStr := eventArgs[1]
log.Logger().Info("try to release pod from application",
zap.String("appID", app.applicationID),
zap.String("allocationUUID", allocUUID))
zap.String("allocationUUID", allocUUID),
zap.String("terminationType", terminationTypeStr))

for _, task := range app.taskMap {
if task.allocationUUID == allocUUID {
task.setTaskTerminationType(terminationTypeStr)
err := task.DeleteTaskPod(task.pod)
if err != nil {
log.Logger().Error("failed to release allocation from application", zap.Error(err))
Expand Down
3 changes: 2 additions & 1 deletion pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ func (ctx *Context) AddApplication(request *interfaces.AddApplicationRequest) in
request.Metadata.User,
request.Metadata.Tags,
ctx.apiProvider.GetAPIs().SchedulerAPI)
app.setTaskGroups(request.Metadata.TaskGroups)

// add into cache
ctx.applications[app.applicationID] = app
Expand Down Expand Up @@ -572,7 +573,7 @@ func (ctx *Context) AddTask(request *interfaces.AddTaskRequest) interfaces.Manag
if app, valid := managedApp.(*Application); valid {
existingTask, err := app.GetTask(request.Metadata.TaskID)
if err != nil {
task := NewTask(request.Metadata.TaskID, app, ctx, request.Metadata.Pod)
task := NewFromTaskMeta(request.Metadata.TaskID, app, ctx, request.Metadata)
// in recovery mode, task is considered as allocated
if request.Recovery {
// in scheduling, allocationUUID is assigned by scheduler-core
Expand Down
1 change: 1 addition & 0 deletions pkg/cache/placeholder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup v1alpha1
constants.LabelQueueName: app.GetQueue(),
},
Annotations: map[string]string{
constants.AnnotationPlaceholderFlag: "true",
constants.AnnotationTaskGroupName: taskGroup.Name,
},
},
Expand Down
14 changes: 13 additions & 1 deletion pkg/cache/placeholder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,21 @@ type PlaceholderManager struct {
var placeholderMgr *PlaceholderManager
var once sync.Once

func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
var r atomic.Value
r.Store(false)
placeholderMgr = &PlaceholderManager{
clients: clients,
running: r,
}
return placeholderMgr
}

func GetPlaceholderManager() *PlaceholderManager {
once.Do(func() {
placeholderMgr = &PlaceholderManager{}
if placeholderMgr == nil {
log.Logger().Fatal("PlaceholderManager is not initiated")
}
})
return placeholderMgr
}
Expand Down
78 changes: 48 additions & 30 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"sync"
"time"

"github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/incubator-yunikorn-k8shim/pkg/common"
"github.com/apache/incubator-yunikorn-k8shim/pkg/common/events"
Expand All @@ -38,40 +38,44 @@ import (
)

type Task struct {
taskID string
alias string
applicationID string
application *Application
allocationUUID string
resource *si.Resource
pod *v1.Pod
context *Context
nodeName string
createTime time.Time
taskGroupName string
placeholder bool
sm *fsm.FSM
lock *sync.RWMutex
taskID string
alias string
applicationID string
application *Application
allocationUUID string
resource *si.Resource
pod *v1.Pod
context *Context
nodeName string
createTime time.Time
taskGroupName string
placeholder bool
terminationType string
sm *fsm.FSM
lock *sync.RWMutex
}

// TODO to be removed
func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task {
taskResource := common.GetPodResource(pod)
return createTaskInternal(tid, app, taskResource, pod, ctx)
return createTaskInternal(tid, app, taskResource, pod, false, "", ctx)
}

// test only
func CreateTaskForTest(tid string, app *Application, resource *si.Resource, ctx *Context) *Task {
// for testing purpose, the pod name is same as the taskID
taskPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: tid,
},
}
return createTaskInternal(tid, app, resource, taskPod, ctx)
func NewFromTaskMeta(tid string, app *Application, ctx *Context, metadata interfaces.TaskMetadata) *Task {
taskPod := metadata.Pod
taskResource := common.GetPodResource(taskPod)
return createTaskInternal(
tid,
app,
taskResource,
metadata.Pod,
metadata.Placeholder,
metadata.TaskGroupName,
ctx)
}

func createTaskInternal(tid string, app *Application, resource *si.Resource,
pod *v1.Pod, ctx *Context) *Task {
pod *v1.Pod, placeholder bool, taskGroupName string, ctx *Context) *Task {
task := &Task{
taskID: tid,
alias: fmt.Sprintf("%s/%s", pod.Namespace, pod.Name),
Expand All @@ -80,7 +84,8 @@ func createTaskInternal(tid string, app *Application, resource *si.Resource,
pod: pod,
resource: resource,
createTime: pod.GetCreationTimestamp().Time,
placeholder: false,
placeholder: placeholder,
taskGroupName: taskGroupName,
context: ctx,
lock: &sync.RWMutex{},
}
Expand Down Expand Up @@ -188,6 +193,12 @@ func (task *Task) setTaskGroupName(groupName string) {
task.taskGroupName = groupName
}

func (task *Task) setTaskTerminationType(terminationTyp string) {
task.lock.Lock()
defer task.lock.Unlock()
task.terminationType = terminationTyp
}

func (task *Task) getTaskGroupName() string {
task.lock.RLock()
defer task.lock.RUnlock()
Expand Down Expand Up @@ -240,7 +251,13 @@ func (task *Task) handleSubmitTaskEvent(event *fsm.Event) {
log.Logger().Debug("scheduling pod",
zap.String("podName", task.pod.Name))
// convert the request
rr := common.CreateUpdateRequestForTask(task.applicationID, task.taskID, task.resource, task.pod)
rr := common.CreateUpdateRequestForTask(
task.applicationID,
task.taskID,
task.resource,
task.placeholder,
task.taskGroupName,
task.pod)
log.Logger().Debug("send update request", zap.String("request", rr.String()))
if err := task.context.apiProvider.GetAPIs().SchedulerAPI.Update(&rr); err != nil {
log.Logger().Debug("failed to send scheduling request to scheduler", zap.Error(err))
Expand Down Expand Up @@ -378,7 +395,8 @@ func (task *Task) releaseAllocation() {
zap.String("taskID", task.taskID),
zap.String("taskAlias", task.alias),
zap.String("allocationUUID", task.allocationUUID),
zap.String("task", task.GetTaskState()))
zap.String("task", task.GetTaskState()),
zap.String("terminationType", task.terminationType))

// depends on current task state, generate requests accordingly.
// if task is already allocated, which means the scheduler core already,
Expand Down Expand Up @@ -406,7 +424,7 @@ func (task *Task) releaseAllocation() {
return
}
releaseRequest = common.CreateReleaseAllocationRequestForTask(
task.applicationID, task.allocationUUID, task.application.partition)
task.applicationID, task.allocationUUID, task.application.partition, task.terminationType)
}

if releaseRequest.Releases != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const SchedulerName = "yunikorn"

// Application crd
const AppManagerHandlerName = "yunikorn-app"
const AnnotationPlaceholderFlag = "yunikorn.apache.org/placeholder"
const AnnotationTaskGroupName = "yunikorn.apache.org/task-group-name"
const AnnotationTaskGroups = "yunikorn.apache.org/task-groups"

Expand Down
22 changes: 16 additions & 6 deletions pkg/common/si_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ func createTagsForTask(pod *v1.Pod) map[string]string {
return tags
}

func CreateUpdateRequestForTask(appID, taskID string, resource *si.Resource, pod *v1.Pod) si.UpdateRequest {
func CreateUpdateRequestForTask(appID, taskID string, resource *si.Resource, placeholder bool, taskGroupName string, pod *v1.Pod) si.UpdateRequest {
ask := si.AllocationAsk{
AllocationKey: taskID,
ResourceAsk: resource,
ApplicationID: appID,
MaxAllocations: 1,
Tags: createTagsForTask(pod),
Placeholder: placeholder,
TaskGroupName: taskGroupName,
}

result := si.UpdateRequest{
Expand Down Expand Up @@ -84,13 +86,21 @@ func CreateReleaseAskRequestForTask(appID, taskId, partition string) si.UpdateRe
return result
}

func CreateReleaseAllocationRequestForTask(appID, allocUUID, partition string) si.UpdateRequest {
func GetTerminationTypeFromString(terminationTypeStr string) si.AllocationRelease_TerminationType{
if v, ok := si.AllocationRelease_TerminationType_value[terminationTypeStr]; ok {
return si.AllocationRelease_TerminationType(v)
}
return si.AllocationRelease_STOPPED_BY_RM
}

func CreateReleaseAllocationRequestForTask(appID, allocUUID, partition, terminationType string) si.UpdateRequest {
toReleases := make([]*si.AllocationRelease, 0)
toReleases = append(toReleases, &si.AllocationRelease{
ApplicationID: appID,
UUID: allocUUID,
PartitionName: partition,
Message: "task completed",
ApplicationID: appID,
UUID: allocUUID,
PartitionName: partition,
TerminationType: GetTerminationTypeFromString(terminationType),
Message: "task completed",
})

releaseRequest := si.AllocationReleasesRequest{
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/si_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestCreateUpdateRequestForTask(t *testing.T) {
},
}

updateRequest := CreateUpdateRequestForTask("appId1", "taskId1", res, pod)
updateRequest := CreateUpdateRequestForTask("appId1", "taskId1", res, false, "", pod)
asks := updateRequest.Asks
assert.Equal(t, len(asks), 1)
allocAsk := asks[0]
Expand Down
10 changes: 10 additions & 0 deletions pkg/common/utils/gang_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package utils
import (
"encoding/json"
"fmt"
"strconv"
"sync"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -71,6 +72,15 @@ func GetPlaceholderResourceRequest(resources map[string]resource.Quantity) v1.Re
return resourceReq
}

func GetPlaceholderFlagFromPodSpec(pod *v1.Pod) bool {
if value, ok := pod.Annotations[constants.AnnotationPlaceholderFlag]; ok {
if v, err := strconv.ParseBool(value); err == nil {
return v
}
}
return false
}

func GetTaskGroupFromPodSpec(pod *v1.Pod) string {
if value, ok := pod.Annotations[constants.AnnotationTaskGroupName]; ok {
return value
Expand Down

0 comments on commit 99776be

Please sign in to comment.